Files
ubicloud/scheduling/dispatcher.rb
Benjamin Satzger 761f89565e Graceful shutdown of respirate process
When the Heroku dyno manager restarts a dyno, it sends SIGTERM. It then
waits for graceful shutdown for 30 seconds before it sends SIGKILL.

We now trap SIGTERM in the dispatcher to trigger graceful shutdown,
during which:
- no new strands are started
- running strands are allowed to finish
- if no more running strands -> exit the process

Without this graceful shutdown, strands running when SIGTERM is
received aren't given an opportunity to release their lease. These
strands are then locked for up to two minutes, until the lease expires.
2024-04-12 11:46:26 +02:00

92 lines
2.2 KiB
Ruby

# frozen_string_literal: true
class Scheduling::Dispatcher
attr_reader :notifiers, :shutting_down
def initialize
@apoptosis_timeout = Strand::LEASE_EXPIRATION - 29
@notifiers = []
@shutting_down = false
end
def shutdown
@shutting_down = true
end
def scan
return [] if shutting_down
idle_connections = Config.db_pool - @notifiers.count - 1
if idle_connections < 1
Clog.emit("Not enough database connections.") do
{pool: {db_pool: Config.db_pool, active_threads: @notifiers.count}}
end
return []
end
Strand.dataset.where(
Sequel.lit("(lease IS NULL OR lease < now()) AND schedule < now() AND exitval IS NULL")
).order_by(:schedule).limit(idle_connections)
end
def start_strand(strand)
strand_ubid = strand.ubid.freeze
apoptosis_r, apoptosis_w = IO.pipe
notify_r, notify_w = IO.pipe
Thread.new do
ready, _, _ = IO.select([apoptosis_r], nil, nil, @apoptosis_timeout)
if ready.nil?
# Timed out, dump threads and exit
ThreadPrinter.run
Kernel.exit!
# rubocop:disable Lint/UnreachableCode
# Reachable in test only.
next
# rubocop:enable Lint/UnreachableCode
end
ready.first.close
end.tap { _1.name = "apoptosis:" + strand_ubid }
Thread.new do
strand.run Strand::LEASE_EXPIRATION / 4
rescue => ex
Clog.emit("exception terminates thread") { Util.exception_to_hash(ex) }
loop do
ex = ex.cause
break unless ex
Clog.emit("nested exception") { Util.exception_to_hash(ex) }
end
ensure
# Adequate to unblock IO.select.
apoptosis_w.close
notify_w.close
end.tap { _1.name = strand_ubid }
notify_r
end
def start_cohort
scan.each do |strand|
break if shutting_down
@notifiers << start_strand(strand)
end
end
def wait_cohort
if shutting_down
Clog.emit("Shutting down.") { {unfinished_strand_count: @notifiers.length} }
Kernel.exit if @notifiers.empty?
end
return 0 if @notifiers.empty?
ready, _, _ = IO.select(@notifiers)
ready.each(&:close)
@notifiers.delete_if { ready.include?(_1) }
ready.count
end
end