Files
ubicloud/scheduling/dispatcher.rb
Jeremy Evans e6b7e5e879 Change rubocop TargetRubyVersion to 3.4
Disable Style/RedundantLineContinuation, as it incorrectly removes
line continutations in rhizome/host/lib/vm_setup.rb that are not
redundant.

All code changes are for _1 => it in blocks.
2025-04-26 06:51:19 +09:00

94 lines
2.3 KiB
Ruby

# frozen_string_literal: true
class Scheduling::Dispatcher
attr_reader :notifiers, :shutting_down
def initialize
@apoptosis_timeout = Strand::LEASE_EXPIRATION - 29
@notifiers = []
@shutting_down = false
end
def shutdown
@shutting_down = true
end
def scan
return [] if shutting_down
idle_connections = Config.db_pool - @notifiers.count - 1
if idle_connections < 1
Clog.emit("Not enough database connections.") do
{pool: {db_pool: Config.db_pool, active_threads: @notifiers.count}}
end
return []
end
Strand.dataset.where(
Sequel.lit("(lease IS NULL OR lease < now()) AND schedule < now() AND exitval IS NULL")
).order_by(:schedule).limit(idle_connections)
end
def start_strand(strand)
strand_ubid = strand.ubid.freeze
apoptosis_r, apoptosis_w = IO.pipe
notify_r, notify_w = IO.pipe
Thread.new do
Thread.current[:created_at] = Time.now
ready, _, _ = IO.select([apoptosis_r], nil, nil, @apoptosis_timeout)
if ready.nil?
# Timed out, dump threads and exit
ThreadPrinter.run
Kernel.exit!
# rubocop:disable Lint/UnreachableCode
# Reachable in test only.
next
# rubocop:enable Lint/UnreachableCode
end
ready.first.close
end.tap { it.name = "apoptosis:" + strand_ubid }
Thread.new do
Thread.current[:created_at] = Time.now
strand.run Strand::LEASE_EXPIRATION / 4
rescue => ex
Clog.emit("exception terminates thread") { Util.exception_to_hash(ex) }
loop do
ex = ex.cause
break unless ex
Clog.emit("nested exception") { Util.exception_to_hash(ex) }
end
ensure
# Adequate to unblock IO.select.
apoptosis_w.close
notify_w.close
end.tap { it.name = strand_ubid }
notify_r
end
def start_cohort
scan.each do |strand|
break if shutting_down
@notifiers << start_strand(strand)
end
end
def wait_cohort
if shutting_down
Clog.emit("Shutting down.") { {unfinished_strand_count: @notifiers.length} }
Kernel.exit if @notifiers.empty?
end
return 0 if @notifiers.empty?
ready, _, _ = IO.select(@notifiers)
ready.each(&:close)
@notifiers.delete_if { ready.include?(it) }
ready.count
end
end