Files
ubicloud/scheduling/dispatcher.rb
Enes Cakir beeb7a0d32 Increase the strand's try count by 2 if it causes apoptosis
If the strand can't finish its job in the allowed time, the respirate
exits, and all threads will be terminated.

Currently, we run 3 respirate dynos on Heroku. When they crash, Heroku
restarts them automatically. However, if they crash multiple times in a
very short period, it delays the following restarts.

When a strand consistently causes apoptosis, it affects all respirate
dynos and crashes them.

If a strand causes apoptosis, we should delay its consecutive iterations
to avoid crashing all dynos. Since it crashes due to apoptosis, its try
count is currently not increasing.

As a first option, we can schedule it for 30 minutes later to allow time
for recovery. However, if the strand is performing a critical job, such
as failover, we may not want to delay it that much if apoptosis is
intermittent.

Instead, we can increase its try count, so our scheduler delays it if it
consistently causes apoptosis.
2025-03-19 13:28:34 +03:00

95 lines
2.3 KiB
Ruby

# frozen_string_literal: true
class Scheduling::Dispatcher
attr_reader :notifiers, :shutting_down
def initialize
@apoptosis_timeout = Strand::LEASE_EXPIRATION - 29
@notifiers = []
@shutting_down = false
end
def shutdown
@shutting_down = true
end
def scan
return [] if shutting_down
idle_connections = Config.db_pool - @notifiers.count - 1
if idle_connections < 1
Clog.emit("Not enough database connections.") do
{pool: {db_pool: Config.db_pool, active_threads: @notifiers.count}}
end
return []
end
Strand.dataset.where(
Sequel.lit("(lease IS NULL OR lease < now()) AND schedule < now() AND exitval IS NULL")
).order_by(:schedule).limit(idle_connections)
end
def start_strand(strand)
strand_ubid = strand.ubid.freeze
apoptosis_r, apoptosis_w = IO.pipe
notify_r, notify_w = IO.pipe
Thread.new do
Thread.current[:created_at] = Time.now
ready, _, _ = IO.select([apoptosis_r], nil, nil, @apoptosis_timeout)
if ready.nil?
# Timed out, dump threads and exit
ThreadPrinter.run
strand.this.update(try: Sequel[:try] + 2)
Kernel.exit!
# rubocop:disable Lint/UnreachableCode
# Reachable in test only.
next
# rubocop:enable Lint/UnreachableCode
end
ready.first.close
end.tap { _1.name = "apoptosis:" + strand_ubid }
Thread.new do
Thread.current[:created_at] = Time.now
strand.run Strand::LEASE_EXPIRATION / 4
rescue => ex
Clog.emit("exception terminates thread") { Util.exception_to_hash(ex) }
loop do
ex = ex.cause
break unless ex
Clog.emit("nested exception") { Util.exception_to_hash(ex) }
end
ensure
# Adequate to unblock IO.select.
apoptosis_w.close
notify_w.close
end.tap { _1.name = strand_ubid }
notify_r
end
def start_cohort
scan.each do |strand|
break if shutting_down
@notifiers << start_strand(strand)
end
end
def wait_cohort
if shutting_down
Clog.emit("Shutting down.") { {unfinished_strand_count: @notifiers.length} }
Kernel.exit if @notifiers.empty?
end
return 0 if @notifiers.empty?
ready, _, _ = IO.select(@notifiers)
ready.each(&:close)
@notifiers.delete_if { ready.include?(_1) }
ready.count
end
end