Files
ubicloud/scheduling/dispatcher.rb
Daniel Farina fce1ce0eb4 Improve thread dump reporting
It's been a while since this has been revised.  This revision seeks to
make the following easier:

It's hard to find the beginning and end of thread dumps, particularly
if they are consecutive.  Solution: add `--BEGIN...` and `--END...`
framing, plus a disambiguating timestamp that matches between the two.
The rough inspiration is MIME format of email.

The thread dumps are often triggered by apoptosis, which needs to exit
the entire process in a timely manner to maintain mutual exclusion.
In doing so, it does collatoral damage to other threads that happen to
be running in the same process at the time.

The thread dump emitted likewise includes every thread, not just the
problematic one.  So, print the thread age, when available.  The
oldest are most likely to be implicated.

To make the thread age available in the case I care about most, the
`created_at` thread local variable is set in the dispatcher which runs
Strands.
2025-01-25 15:44:27 -08:00

94 lines
2.3 KiB
Ruby

# frozen_string_literal: true
class Scheduling::Dispatcher
attr_reader :notifiers, :shutting_down
def initialize
@apoptosis_timeout = Strand::LEASE_EXPIRATION - 29
@notifiers = []
@shutting_down = false
end
def shutdown
@shutting_down = true
end
def scan
return [] if shutting_down
idle_connections = Config.db_pool - @notifiers.count - 1
if idle_connections < 1
Clog.emit("Not enough database connections.") do
{pool: {db_pool: Config.db_pool, active_threads: @notifiers.count}}
end
return []
end
Strand.dataset.where(
Sequel.lit("(lease IS NULL OR lease < now()) AND schedule < now() AND exitval IS NULL")
).order_by(:schedule).limit(idle_connections)
end
def start_strand(strand)
strand_ubid = strand.ubid.freeze
apoptosis_r, apoptosis_w = IO.pipe
notify_r, notify_w = IO.pipe
Thread.new do
Thread.current[:created_at] = Time.now
ready, _, _ = IO.select([apoptosis_r], nil, nil, @apoptosis_timeout)
if ready.nil?
# Timed out, dump threads and exit
ThreadPrinter.run
Kernel.exit!
# rubocop:disable Lint/UnreachableCode
# Reachable in test only.
next
# rubocop:enable Lint/UnreachableCode
end
ready.first.close
end.tap { _1.name = "apoptosis:" + strand_ubid }
Thread.new do
Thread.current[:created_at] = Time.now
strand.run Strand::LEASE_EXPIRATION / 4
rescue => ex
Clog.emit("exception terminates thread") { Util.exception_to_hash(ex) }
loop do
ex = ex.cause
break unless ex
Clog.emit("nested exception") { Util.exception_to_hash(ex) }
end
ensure
# Adequate to unblock IO.select.
apoptosis_w.close
notify_w.close
end.tap { _1.name = strand_ubid }
notify_r
end
def start_cohort
scan.each do |strand|
break if shutting_down
@notifiers << start_strand(strand)
end
end
def wait_cohort
if shutting_down
Clog.emit("Shutting down.") { {unfinished_strand_count: @notifiers.length} }
Kernel.exit if @notifiers.empty?
end
return 0 if @notifiers.empty?
ready, _, _ = IO.select(@notifiers)
ready.each(&:close)
@notifiers.delete_if { ready.include?(_1) }
ready.count
end
end