This changes the Channel#wait call to a loop that checks that the timeout has not been exceeded. If the timeout has been exceeded, an SshTimeout error (subclass of SshError) is raised. This should catch any shell commands that take too long to execute, as well as any hangs in the SSH connections, which should prevent most current causes of apoptosis. I'm including the timeout in the Clog emit, since it may be useful. This switches the newly added `sudo timeout 10s` in VM nexus command with a `timeout: 10` argument. We can consider expanding the use of explicit timeouts to other commands, because the default timeout is fairly long (2-10 seconds less than the apoptosis timeout). This removes the SSH Channel#wait call inside the Channel#exec block. Session#open_channel returns the same channel it yields, and Channel#exec yields the receiver, so previously, we were waiting twice on the same channel. This removes the duplicated wait inside the block, as the wait outside the block will always be called.
96 lines
2.4 KiB
Ruby
96 lines
2.4 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class Scheduling::Dispatcher
|
|
attr_reader :notifiers, :shutting_down
|
|
|
|
def initialize
|
|
@apoptosis_timeout = Strand::LEASE_EXPIRATION - 29
|
|
@notifiers = []
|
|
@shutting_down = false
|
|
end
|
|
|
|
def shutdown
|
|
@shutting_down = true
|
|
end
|
|
|
|
def scan
|
|
return [] if shutting_down
|
|
idle_connections = Config.db_pool - @notifiers.count - 1
|
|
if idle_connections < 1
|
|
Clog.emit("Not enough database connections.") do
|
|
{pool: {db_pool: Config.db_pool, active_threads: @notifiers.count}}
|
|
end
|
|
return []
|
|
end
|
|
|
|
Strand.dataset.where(
|
|
Sequel.lit("(lease IS NULL OR lease < now()) AND schedule < now() AND exitval IS NULL")
|
|
).order_by(:schedule).limit(idle_connections)
|
|
end
|
|
|
|
def start_strand(strand)
|
|
strand_ubid = strand.ubid.freeze
|
|
apoptosis_r, apoptosis_w = IO.pipe
|
|
notify_r, notify_w = IO.pipe
|
|
|
|
Thread.new do
|
|
Thread.current[:created_at] = Time.now
|
|
ready, _, _ = IO.select([apoptosis_r], nil, nil, @apoptosis_timeout)
|
|
|
|
if ready.nil?
|
|
# Timed out, dump threads and exit
|
|
ThreadPrinter.run
|
|
Kernel.exit!
|
|
|
|
# rubocop:disable Lint/UnreachableCode
|
|
# Reachable in test only.
|
|
next
|
|
# rubocop:enable Lint/UnreachableCode
|
|
end
|
|
|
|
ready.first.close
|
|
end.tap { _1.name = "apoptosis:" + strand_ubid }
|
|
|
|
Thread.new do
|
|
t = Time.now
|
|
Thread.current[:created_at] = t
|
|
Thread.current[:apoptosis_at] = t + @apoptosis_timeout
|
|
strand.run Strand::LEASE_EXPIRATION / 4
|
|
rescue => ex
|
|
Clog.emit("exception terminates thread") { Util.exception_to_hash(ex) }
|
|
|
|
loop do
|
|
ex = ex.cause
|
|
break unless ex
|
|
Clog.emit("nested exception") { Util.exception_to_hash(ex) }
|
|
end
|
|
ensure
|
|
# Adequate to unblock IO.select.
|
|
apoptosis_w.close
|
|
notify_w.close
|
|
end.tap { _1.name = strand_ubid }
|
|
|
|
notify_r
|
|
end
|
|
|
|
def start_cohort
|
|
scan.each do |strand|
|
|
break if shutting_down
|
|
@notifiers << start_strand(strand)
|
|
end
|
|
end
|
|
|
|
def wait_cohort
|
|
if shutting_down
|
|
Clog.emit("Shutting down.") { {unfinished_strand_count: @notifiers.length} }
|
|
Kernel.exit if @notifiers.empty?
|
|
end
|
|
|
|
return 0 if @notifiers.empty?
|
|
ready, _, _ = IO.select(@notifiers)
|
|
ready.each(&:close)
|
|
@notifiers.delete_if { ready.include?(_1) }
|
|
ready.count
|
|
end
|
|
end
|