mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-05 06:12:09 +08:00
If the event loop for the pulse check fails, close the SSH session immediately. Otherwise, we run into an issue with Postgres server pulse checks. These pulse checks rely on forwarding a local socket through the SSH connection to establish a database connection. If the SSH connection breaks in certain ways, such as the peer resets the SSH connection, the pulse check that relies on the database connection will hang indefinitely. The critical step is calling `close` on the SSH object. `shutdown!` alone is not sufficient.
175 lines
7.9 KiB
Ruby
175 lines
7.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "../model/spec_helper"
|
|
|
|
RSpec.describe MonitorableResource do
|
|
let(:postgres_server) { PostgresServer.new { it.id = "c068cac7-ed45-82db-bf38-a003582b36ee" } }
|
|
let(:r_w_event_loop) { described_class.new(postgres_server) }
|
|
let(:vm_host) { VmHost.new { it.id = "46683a25-acb1-4371-afe9-d39f303e44b4" } }
|
|
let(:r_without_event_loop) { described_class.new(vm_host) }
|
|
|
|
describe "#open_resource_session" do
|
|
it "returns if session is not nil and pulse reading is up" do
|
|
r_w_event_loop.instance_variable_set(:@session, "not nil")
|
|
r_w_event_loop.instance_variable_set(:@pulse, {reading: "up"})
|
|
|
|
expect(postgres_server).not_to receive(:reload)
|
|
r_w_event_loop.open_resource_session
|
|
end
|
|
|
|
it "sets session to resource's init_health_monitor_session" do
|
|
expect(postgres_server).to receive(:reload).and_return(postgres_server)
|
|
expect(postgres_server).to receive(:init_health_monitor_session).and_return("session")
|
|
expect { r_w_event_loop.open_resource_session }.to change { r_w_event_loop.instance_variable_get(:@session) }.from(nil).to("session")
|
|
end
|
|
|
|
it "sets deleted to true if resource is deleted" do
|
|
expect(postgres_server).to receive(:reload).and_raise(Sequel::NoExistingObject)
|
|
expect { r_w_event_loop.open_resource_session }.to change(r_w_event_loop, :deleted).from(false).to(true)
|
|
end
|
|
|
|
it "raises the exception if it is not Sequel::NoExistingObject" do
|
|
expect(postgres_server).to receive(:reload).and_raise(StandardError)
|
|
expect { r_w_event_loop.open_resource_session }.to raise_error(StandardError)
|
|
end
|
|
end
|
|
|
|
describe "#check_pulse" do
|
|
it "does not create thread if session is nil or resource does not need event loop" do
|
|
expect(Thread).not_to receive(:new)
|
|
|
|
# session is nil
|
|
r_w_event_loop.check_pulse
|
|
|
|
# resource does not need event loop
|
|
r_without_event_loop.instance_variable_set(:@session, "not nil")
|
|
r_without_event_loop.check_pulse
|
|
end
|
|
|
|
it "swallows exception and logs it if event loop fails" do
|
|
session = {ssh_session: instance_double(Net::SSH::Connection::Session)}
|
|
r_w_event_loop.instance_variable_set(:@session, session)
|
|
expect(session[:ssh_session]).to receive(:shutdown!)
|
|
expect(session[:ssh_session]).to receive(:close)
|
|
expect(Thread).to receive(:new).and_call_original
|
|
expect(session[:ssh_session]).to receive(:loop).and_raise(StandardError)
|
|
expect(Clog).to receive(:emit).twice.and_call_original
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
|
|
it "swallows exception and logs it if check_pulse fails" do
|
|
session = {ssh_session: instance_double(Net::SSH::Connection::Session)}
|
|
r_without_event_loop.instance_variable_set(:@session, session)
|
|
expect(vm_host).to receive(:check_pulse).and_raise(StandardError)
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
expect { r_without_event_loop.check_pulse }.not_to raise_error
|
|
end
|
|
end
|
|
|
|
describe "#check_pulse with session and event loop" do
|
|
let(:session) { {ssh_session: instance_double(Net::SSH::Connection::Session)} }
|
|
|
|
before do
|
|
r_w_event_loop.instance_variable_set(:@session, session)
|
|
expect(session[:ssh_session]).to receive(:loop)
|
|
end
|
|
|
|
it "creates a new thread and runs the event loop" do
|
|
expect(Thread).to receive(:new).and_call_original
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
|
|
it "calls check_pulse on resource and sets pulse" do
|
|
expect(postgres_server).to receive(:check_pulse).and_return({reading: "up"})
|
|
expect { r_w_event_loop.check_pulse }.to change { r_w_event_loop.instance_variable_get(:@pulse) }.from({}).to({reading: "up"})
|
|
end
|
|
|
|
it "swallows exception and logs it if check_pulse fails" do
|
|
expect(postgres_server).to receive(:check_pulse).and_raise(StandardError)
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
expect { r_w_event_loop.check_pulse }.not_to raise_error
|
|
end
|
|
|
|
it "does not log the pulse if reading is up and reading_rpt is not every 5th and reading_rpt is large enough" do
|
|
expect(postgres_server).to receive(:check_pulse).and_return({reading: "up", reading_rpt: 13})
|
|
expect(Clog).not_to receive(:emit).and_call_original
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
|
|
it "logs the pulse if reading is not up" do
|
|
expect(postgres_server).to receive(:check_pulse).and_return({reading: "down", reading_rpt: 13})
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
|
|
it "logs the pulse if reading is up and reading_rpt is every 5th reading" do
|
|
expect(postgres_server).to receive(:check_pulse).and_return({reading: "up", reading_rpt: 6})
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
|
|
it "logs the pulse if reading is up and reading_rpt is recent enough" do
|
|
expect(postgres_server).to receive(:check_pulse).and_return({reading: "up", reading_rpt: 3})
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
end
|
|
|
|
[IOError.new("closed stream"), Errno::ECONNRESET.new("recvfrom(2)")].each do |ex|
|
|
describe "#check_pulse", "stale connection retry behavior with #{ex.class}" do
|
|
let(:session) { {ssh_session: instance_double(Net::SSH::Connection::Session)} }
|
|
|
|
before do
|
|
r_w_event_loop.instance_variable_set(:@session, session)
|
|
expect(session[:ssh_session]).to receive(:loop)
|
|
end
|
|
|
|
it "retries if the last pulse is not set" do
|
|
expect(postgres_server).to receive(:check_pulse).and_raise(ex)
|
|
second_session = instance_double(Net::SSH::Connection::Session)
|
|
expect(postgres_server).to receive(:init_health_monitor_session).and_return(second_session)
|
|
expect(session[:ssh_session]).to receive(:shutdown!)
|
|
expect(session).to receive(:merge!).with(second_session)
|
|
expect(postgres_server).to receive(:check_pulse).and_return({reading: "up", reading_rpt: 1})
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
|
|
it "does not retry if the connection is fresh" do
|
|
session[:last_pulse] = Time.now - 1
|
|
expect(postgres_server).to receive(:check_pulse).and_raise(ex)
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
expect { r_w_event_loop.check_pulse }.not_to raise_error
|
|
end
|
|
|
|
it "is up on a retry on a stale connection that works the second time" do
|
|
session[:last_pulse] = Time.now - 10
|
|
expect(postgres_server).to receive(:check_pulse).and_raise(ex)
|
|
second_session = instance_double(Net::SSH::Connection::Session)
|
|
expect(postgres_server).to receive(:init_health_monitor_session).and_return(second_session)
|
|
expect(session[:ssh_session]).to receive(:shutdown!)
|
|
expect(session).to receive(:merge!).with(second_session)
|
|
expect(postgres_server).to receive(:check_pulse).and_return({reading: "up", reading_rpt: 1})
|
|
r_w_event_loop.check_pulse
|
|
end
|
|
|
|
it "is down if consecutive errors are raised even on a stale connection" do
|
|
session[:last_pulse] = Time.now - 10
|
|
expect(postgres_server).to receive(:check_pulse).and_raise(ex)
|
|
second_session = instance_double(Net::SSH::Connection::Session)
|
|
expect(postgres_server).to receive(:init_health_monitor_session).and_return(second_session)
|
|
expect(session[:ssh_session]).to receive(:shutdown!)
|
|
expect(session).to receive(:merge!).with(second_session)
|
|
expect(postgres_server).to receive(:check_pulse).and_return(ex)
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
expect { r_w_event_loop.check_pulse }.not_to raise_error
|
|
end
|
|
|
|
it "is down for a matching exception without a matching message" do
|
|
session[:last_pulse] = Time.now - 10
|
|
expect(postgres_server).to receive(:check_pulse).and_raise(ex.class.new("something else"))
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
expect { r_w_event_loop.check_pulse }.not_to raise_error
|
|
end
|
|
end
|
|
end
|
|
end
|