Files
ubicloud/model/load_balancer_vm_port.rb
Daniel Farina 9772ffd8d0 Add support for stale connection retry on connection reset by peer
My change for aa2e0c0749, with the fix
in 8df8d6c7ac, appears effective, but
only handles system call failures on write: sometimes the program only
receives notification of the connection being closed upon a read, like
this:

    /app/vendor/bundle/ruby/3.4.0/gems/net-ssh-7.3.0/lib/net/ssh/buffered_io.rb:64:in 'BasicSocket#recv'
    /app/vendor/bundle/ruby/3.4.0/gems/net-ssh-7.3.0/lib/net/ssh/buffered_io.rb:64:in 'Net::SSH::BufferedIo#fill'
    /app/vendor/bundle/ruby/3.4.0/gems/net-ssh-7.3.0/lib/net/ssh/connection/session.rb:275:in 'block in Net::SSH::Connection::Session#ev_do_handle_events'

The exception is an instance of `Errno::ECONNRESET` with a message
like `Connection reset by peer - recvfrom(2)`.

So, handle it much the same way, and refactor the tests to test with
either exception.
2025-07-31 13:04:39 -07:00

108 lines
4.2 KiB
Ruby

# frozen_string_literal: true
require_relative "../model"
class LoadBalancerVmPort < Sequel::Model
many_to_one :load_balancer_vm, class: :LoadBalancersVms, key: :load_balancer_vm_id
many_to_one :load_balancer_port
plugin ResourceMethods
include HealthMonitorMethods
def load_balancer
load_balancer_port.load_balancer
end
def vm
load_balancer_vm.vm
end
def init_health_monitor_session
{
ssh_session: vm.vm_host.sshable.start_fresh_session
}
end
def health_check(session:)
[
check_probe(session, :ipv4),
check_probe(session, :ipv6)
]
end
def check_probe(session, type)
if type == :ipv4
return "up" unless load_balancer.ipv4_enabled?
elsif type == :ipv6
return "up" unless load_balancer.ipv6_enabled?
else
raise "Invalid type: #{type}"
end
stale_retry = false
begin
((session[:ssh_session].exec!(health_check_cmd(type)).strip == "200") ? "up" : "down").tap { session[:last_pulse] = Time.now }
rescue => e
# "Staleness" of last_pulse should be somewhat less than
# sshd_config ClientAlive setting.
if !stale_retry &&
(
# Seen when sending on a broken connection.
e.is_a?(IOError) && e.message == "closed stream" ||
# Seen when receiving on a broken connection.
e.is_a?(Errno::ECONNRESET) && e.message.start_with?("Connection reset by peer")
) &&
session[:last_pulse]&.<(Time.now - 8)
stale_retry = true
session.merge!(init_health_monitor_session)
retry
end
Clog.emit("Exception in LoadBalancerVmPort #{ubid}") { Util.exception_to_hash(e) }
"down"
end
end
def health_check_cmd(type)
address = (type == :ipv4) ? vm.private_ipv4 : vm.ephemeral_net6.nth(2)
if load_balancer.health_check_protocol == "tcp"
"sudo ip netns exec #{vm.inhost_name} nc -z -w #{load_balancer.health_check_timeout} #{address} #{load_balancer_port.dst_port} >/dev/null 2>&1 && echo 200 || echo 400"
else
"sudo ip netns exec #{vm.inhost_name} curl --insecure --resolve #{load_balancer.hostname}:#{load_balancer_port.dst_port}:#{(address.version == 6) ? "[#{address}]" : address} --max-time #{load_balancer.health_check_timeout} --silent --output /dev/null --write-out '%{http_code}' #{load_balancer.health_check_protocol}://#{load_balancer.hostname}:#{load_balancer_port.dst_port}#{load_balancer.health_check_endpoint}"
end
end
def check_pulse(session:, previous_pulse:)
reading_ipv4, reading_ipv6 = health_check(session:)
reading = (reading_ipv4 == "up" && reading_ipv6 == "up") ? "up" : "down"
pulse = aggregate_readings(previous_pulse:, reading:, data: {ipv4: reading_ipv4, ipv6: reading_ipv6})
time_passed_health_check_interval = Time.now - pulse[:reading_chg] > load_balancer.health_check_interval
if state == "up" && pulse[:reading] == "down" && pulse[:reading_rpt] > load_balancer.health_check_down_threshold && time_passed_health_check_interval && !load_balancer.reload.update_load_balancer_set?
update(state: "down")
load_balancer.incr_update_load_balancer
end
if state == "down" && pulse[:reading] == "up" && pulse[:reading_rpt] > load_balancer.health_check_up_threshold && time_passed_health_check_interval && !load_balancer.reload.update_load_balancer_set?
update(state: "up")
load_balancer.incr_update_load_balancer
end
pulse
end
end
# Table: load_balancer_vm_port
# Columns:
# id | uuid | PRIMARY KEY
# load_balancer_vm_id | uuid | NOT NULL
# load_balancer_port_id | uuid | NOT NULL
# state | lb_node_state | NOT NULL DEFAULT 'down'::lb_node_state
# last_checked_at | timestamp with time zone | NOT NULL DEFAULT CURRENT_TIMESTAMP
# Indexes:
# load_balancer_vm_port_pkey | PRIMARY KEY btree (id)
# lb_vm_port_unique_index | UNIQUE btree (load_balancer_port_id, load_balancer_vm_id)
# Foreign key constraints:
# load_balancer_vm_port_load_balancer_port_id_fkey | (load_balancer_port_id) REFERENCES load_balancer_port(id)
# load_balancer_vm_port_load_balancer_vm_id_fkey | (load_balancer_vm_id) REFERENCES load_balancers_vms(id) ON DELETE CASCADE