ubicloud/lib/monitorable_resource.rb
Benjamin Satzger 82b577bb55 Close SSH session on event loop failure
If the event loop for the pulse check fails, close the SSH session
immediately. Otherwise, we run into an issue with Postgres server
pulse checks. These pulse checks rely on forwarding a local socket
through the SSH connection to establish a database connection.
If the SSH connection breaks in certain ways, such as the peer
resets the SSH connection, the pulse check that relies on the database
connection will hang indefinitely. The critical step is calling
`close` on the SSH object. `shutdown!` alone is not sufficient.
2025-10-01 22:28:56 +02:00

76 lines
2.5 KiB
Ruby

# frozen_string_literal: true
class MonitorableResource
attr_reader :deleted, :resource
attr_accessor :monitor_job_started_at, :monitor_job_finished_at
def initialize(resource)
@resource = resource
@session = nil
@pulse = {}
@pulse_check_started_at = Time.now
@pulse_thread = nil
@deleted = false
end
def open_resource_session
return if @session && @pulse[:reading] == "up"
@session = @resource.reload.init_health_monitor_session
rescue Sequel::NoExistingObject
Clog.emit("Resource is deleted.") { {resource_deleted: {ubid: @resource.ubid}} }
@session = nil
@deleted = true
end
def check_pulse
return unless @session
if @resource.needs_event_loop_for_pulse_check?
run_event_loop = true
event_loop_failed = false
pulse_thread = Thread.new do
@session[:ssh_session].loop(0.01) { run_event_loop }
rescue => ex
event_loop_failed = true
Clog.emit("SSH event loop has failed.") { {event_loop_failure: {ubid: @resource.ubid, exception: Util.exception_to_hash(ex)}} }
@session[:ssh_session].shutdown!
begin
@session[:ssh_session].close
rescue
end
end
end
stale_retry = false
@pulse_check_started_at = Time.now
begin
@pulse = @resource.check_pulse(session: @session, previous_pulse: @pulse)
@session[:last_pulse] = Time.now
Clog.emit("Got new pulse.") { {got_pulse: {ubid: @resource.ubid, pulse: @pulse}} } if (rpt = @pulse[:reading_rpt]) && (rpt < 6 || rpt % 5 == 1) || @pulse[:reading] != "up"
rescue => ex
if !stale_retry &&
(
# Seen when sending on a broken connection.
ex.is_a?(IOError) && ex.message == "closed stream" ||
# Seen when receiving on a broken connection.
ex.is_a?(Errno::ECONNRESET) && ex.message.start_with?("Connection reset by peer - recvfrom(2)")
) &&
(@session[:last_pulse].nil? || @session[:last_pulse] < (Time.now - 8))
stale_retry = true
begin
@session[:ssh_session].shutdown!
rescue
end
@session.merge!(@resource.init_health_monitor_session)
retry
end
Clog.emit("Pulse checking has failed.") { {pulse_check_failure: {ubid: @resource.ubid, exception: Util.exception_to_hash(ex)}} }
# TODO: Consider raising the exception here, and let the caller handle it.
end
run_event_loop = false
pulse_thread&.join
@session = nil if event_loop_failed
end
end