It's been a while since this has been revised. This revision seeks to make the following easier: It's hard to find the beginning and end of thread dumps, particularly if they are consecutive. Solution: add `--BEGIN...` and `--END...` framing, plus a disambiguating timestamp that matches between the two. The rough inspiration is MIME format of email. The thread dumps are often triggered by apoptosis, which needs to exit the entire process in a timely manner to maintain mutual exclusion. In doing so, it does collatoral damage to other threads that happen to be running in the same process at the time. The thread dump emitted likewise includes every thread, not just the problematic one. So, print the thread age, when available. The oldest are most likely to be implicated. To make the thread age available in the case I care about most, the `created_at` thread local variable is set in the dispatcher which runs Strands.
94 lines
2.3 KiB
Ruby
94 lines
2.3 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class Scheduling::Dispatcher
|
|
attr_reader :notifiers, :shutting_down
|
|
|
|
def initialize
|
|
@apoptosis_timeout = Strand::LEASE_EXPIRATION - 29
|
|
@notifiers = []
|
|
@shutting_down = false
|
|
end
|
|
|
|
def shutdown
|
|
@shutting_down = true
|
|
end
|
|
|
|
def scan
|
|
return [] if shutting_down
|
|
idle_connections = Config.db_pool - @notifiers.count - 1
|
|
if idle_connections < 1
|
|
Clog.emit("Not enough database connections.") do
|
|
{pool: {db_pool: Config.db_pool, active_threads: @notifiers.count}}
|
|
end
|
|
return []
|
|
end
|
|
|
|
Strand.dataset.where(
|
|
Sequel.lit("(lease IS NULL OR lease < now()) AND schedule < now() AND exitval IS NULL")
|
|
).order_by(:schedule).limit(idle_connections)
|
|
end
|
|
|
|
def start_strand(strand)
|
|
strand_ubid = strand.ubid.freeze
|
|
apoptosis_r, apoptosis_w = IO.pipe
|
|
notify_r, notify_w = IO.pipe
|
|
|
|
Thread.new do
|
|
Thread.current[:created_at] = Time.now
|
|
ready, _, _ = IO.select([apoptosis_r], nil, nil, @apoptosis_timeout)
|
|
|
|
if ready.nil?
|
|
# Timed out, dump threads and exit
|
|
ThreadPrinter.run
|
|
Kernel.exit!
|
|
|
|
# rubocop:disable Lint/UnreachableCode
|
|
# Reachable in test only.
|
|
next
|
|
# rubocop:enable Lint/UnreachableCode
|
|
end
|
|
|
|
ready.first.close
|
|
end.tap { _1.name = "apoptosis:" + strand_ubid }
|
|
|
|
Thread.new do
|
|
Thread.current[:created_at] = Time.now
|
|
strand.run Strand::LEASE_EXPIRATION / 4
|
|
rescue => ex
|
|
Clog.emit("exception terminates thread") { Util.exception_to_hash(ex) }
|
|
|
|
loop do
|
|
ex = ex.cause
|
|
break unless ex
|
|
Clog.emit("nested exception") { Util.exception_to_hash(ex) }
|
|
end
|
|
ensure
|
|
# Adequate to unblock IO.select.
|
|
apoptosis_w.close
|
|
notify_w.close
|
|
end.tap { _1.name = strand_ubid }
|
|
|
|
notify_r
|
|
end
|
|
|
|
def start_cohort
|
|
scan.each do |strand|
|
|
break if shutting_down
|
|
@notifiers << start_strand(strand)
|
|
end
|
|
end
|
|
|
|
def wait_cohort
|
|
if shutting_down
|
|
Clog.emit("Shutting down.") { {unfinished_strand_count: @notifiers.length} }
|
|
Kernel.exit if @notifiers.empty?
|
|
end
|
|
|
|
return 0 if @notifiers.empty?
|
|
ready, _, _ = IO.select(@notifiers)
|
|
ready.each(&:close)
|
|
@notifiers.delete_if { ready.include?(_1) }
|
|
ready.count
|
|
end
|
|
end
|