ubicloud/spec/model/postgres/postgres_server_spec.rb
shikharbhardwaj a0b83b3ee6 Add Upgrade steps to Prog::Postgres::ConvergePostgresResource
The Converge prog is now also responsible for matching the current
Postgres version to the desired version. If there is a mismatch (current
< desired), the Converge prog is launched.

Roughly, the Converge prog does the following:
1. Provisions new servers. In case of upgrades, it only provisions upto
   one new standby if no existing standby is suitable for
   upgrades.
2. Wait for the required servers to be ready.
3. Wait for the maintenance window to start.
4. Fence the primary server, and launch pg_upgrade.
5. If the upgrade is successful, replace the current primary with the
   candidate standby.

In case the upgrade fails, we delete the candidate standby and unfence the
primary to bring the database back. During the Upgrade health checking
is effectively disabled as the auto-recovery causes conflicts with the
several restarts of various versions on the candidate.
2025-10-07 14:30:39 +02:00

471 lines
23 KiB
Ruby

# frozen_string_literal: true
require_relative "../spec_helper"
RSpec.describe PostgresServer do
subject(:postgres_server) {
described_class.new { it.id = "c068cac7-ed45-82db-bf38-a003582b36ee" }
}
let(:resource) {
instance_double(
PostgresResource,
representative_server: postgres_server,
identity: "pgubid.postgres.ubicloud.com",
ha_type: PostgresResource::HaType::NONE,
user_config: {},
pgbouncer_user_config: {},
location_id: "12b8ad7f-8178-4d27-a61d-e6dc396d69cc",
target_version: "16"
)
}
let(:vm) {
instance_double(
Vm,
sshable: instance_double(Sshable),
vcpus: 4,
memory_gib: 8,
ephemeral_net4: "1.2.3.4",
ip6: "fdfa:b5aa:14a3:4a3d::2",
private_subnets: [
instance_double(
PrivateSubnet,
net4: NetAddr::IPv4Net.parse("172.0.0.0/26"),
net6: NetAddr::IPv6Net.parse("fdfa:b5aa:14a3:4a3d::/64")
)
],
nics: [
instance_double(Nic, private_ipv4: NetAddr::IPv4Net.parse("10.70.205.205/32"))
],
private_ipv4: NetAddr::IPv4Net.parse("10.70.205.205/32").network,
location: instance_double(Location, aws?: false)
)
}
before do
allow(postgres_server).to receive_messages(resource: resource, vm: vm, version: "16")
end
describe "#configure" do
before do
allow(postgres_server).to receive_messages(timeline: instance_double(PostgresTimeline, blob_storage: "dummy-blob-storage", aws?: false), read_replica?: false)
allow(resource).to receive(:flavor).and_return(PostgresResource::Flavor::STANDARD)
end
it "does not set archival related configs if blob storage is not configured" do
expect(postgres_server).to receive(:timeline).and_return(instance_double(PostgresTimeline, blob_storage: nil))
expect(postgres_server.configure_hash[:configs]).not_to include(:archive_mode, :archive_timeout, :archive_command, :synchronous_standby_names, :primary_conninfo, :recovery_target_time, :restore_command)
end
it "sets configs that are specific to primary" do
postgres_server.timeline_access = "push"
expect(postgres_server.configure_hash[:configs]).to include(:archive_mode, :archive_timeout, :archive_command)
end
it "sets synchronous_standby_names for sync replication mode" do
postgres_server.timeline_access = "push"
expect(resource).to receive(:ha_type).and_return(PostgresResource::HaType::SYNC)
expect(resource).to receive(:servers).and_return([
postgres_server,
instance_double(described_class, ubid: "pgubidstandby1", standby?: true, synchronization_status: "catching_up"),
instance_double(described_class, ubid: "pgubidstandby2", standby?: true, synchronization_status: "ready")
])
expect(postgres_server.configure_hash[:configs]).to include(synchronous_standby_names: "'ANY 1 (pgubidstandby2)'")
end
it "sets synchronous_standby_names as empty if there is no caught up standby" do
postgres_server.timeline_access = "push"
expect(resource).to receive(:ha_type).and_return(PostgresResource::HaType::SYNC)
expect(resource).to receive(:servers).and_return([
postgres_server,
instance_double(described_class, ubid: "pgubidstandby1", standby?: true, synchronization_status: "catching_up"),
instance_double(described_class, ubid: "pgubidstandby2", standby?: true, synchronization_status: "catching_up")
])
expect(postgres_server.configure_hash[:configs]).not_to include(:synchronous_standby_names)
end
it "sets configs that are specific to standby" do
postgres_server.timeline_access = "fetch"
expect(postgres_server).to receive(:doing_pitr?).and_return(false).at_least(:once)
expect(resource).to receive(:replication_connection_string)
expect(postgres_server.configure_hash[:configs]).to include(:primary_conninfo, :restore_command)
end
it "sets configs that are specific to restoring servers" do
postgres_server.timeline_access = "fetch"
expect(resource).to receive(:restore_target)
expect(postgres_server.configure_hash[:configs]).to include(:recovery_target_time, :restore_command)
end
it "puts pg_analytics to shared_preload_libraries for ParadeDB" do
postgres_server.timeline_access = "push"
expect(resource).to receive(:flavor).and_return(PostgresResource::Flavor::PARADEDB)
expect(postgres_server.configure_hash[:configs]).to include("shared_preload_libraries" => "'pg_cron,pg_stat_statements,pg_analytics,pg_search'")
end
it "puts lantern_extras to shared_preload_libraries for Lantern" do
postgres_server.timeline_access = "push"
expect(resource).to receive(:flavor).and_return(PostgresResource::Flavor::LANTERN).at_least(:once)
expect(postgres_server.configure_hash[:configs]).to include("shared_preload_libraries" => "'pg_cron,pg_stat_statements,lantern_extras'")
end
it "puts extra logging options for AWS" do
expect(postgres_server.timeline).to receive(:aws?).and_return(true)
postgres_server.timeline_access = "push"
expect(postgres_server.configure_hash[:configs]).to include(:log_line_prefix, :log_connections, :log_disconnections)
end
end
describe "#trigger_failover" do
it "logs error when server is not primary" do
expect(postgres_server).to receive(:representative_at).and_return(nil)
expect(Clog).to receive(:emit).with("Cannot trigger failover on a non-representative server")
expect(postgres_server.trigger_failover(mode: "planned")).to be false
end
it "logs error when no suitable standby found" do
expect(postgres_server).to receive(:representative_at).and_return(Time.now)
expect(postgres_server).to receive(:failover_target).and_return(nil)
expect(Clog).to receive(:emit).with("No suitable standby found for failover")
expect(postgres_server.trigger_failover(mode: "planned")).to be false
end
it "returns true only when failover is successfully triggered" do
standby = instance_double(described_class)
expect(postgres_server).to receive(:representative_at).and_return(Time.now)
expect(postgres_server).to receive(:failover_target).and_return(standby)
expect(standby).to receive(:incr_planned_take_over)
expect(postgres_server.trigger_failover(mode: "planned")).to be true
end
end
it "#read_replica?" do
expect(postgres_server.resource).to receive(:read_replica?).and_return(true)
expect(postgres_server).to be_read_replica
expect(postgres_server.resource).to receive(:read_replica?).and_return(false)
expect(postgres_server).not_to be_read_replica
end
describe "#failover_target" do
before do
postgres_server.representative_at = Time.now
allow(postgres_server).to receive(:read_replica?).and_return(false)
allow(resource).to receive(:servers).and_return([
postgres_server,
instance_double(described_class, ubid: "pgubidstandby1", representative_at: nil, strand: instance_double(Strand, label: "wait_catch_up"), needs_recycling?: false, read_replica?: false),
instance_double(described_class, ubid: "pgubidstandby2", representative_at: nil, current_lsn: "1/5", strand: instance_double(Strand, label: "wait"), needs_recycling?: false, read_replica?: false),
instance_double(described_class, ubid: "pgubidstandby3", representative_at: nil, current_lsn: "1/10", strand: instance_double(Strand, label: "wait"), needs_recycling?: false, read_replica?: false)
])
end
it "returns nil if there is no standby" do
expect(resource).to receive(:servers).and_return([postgres_server]).at_least(:once)
expect(postgres_server.failover_target).to be_nil
end
it "returns nil if there is no fresh standby" do
expect(postgres_server).to receive(:representative_at).and_return(Time.now)
standby_server = described_class.new { it.id = "c068cac7-ed45-82db-bf38-a003582b36ef" }
expect(standby_server).to receive(:resource).and_return(resource)
expect(standby_server).to receive(:representative_at).and_return(nil).at_least(:once)
expect(standby_server).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
expect(standby_server).to receive(:vm).and_return(instance_double(Vm, display_size: "standard-4", sshable: instance_double(Sshable)))
expect(resource).to receive(:servers).and_return([postgres_server, standby_server]).at_least(:once)
expect(resource).to receive(:target_vm_size).and_return("standard-2")
expect(postgres_server.failover_target).to be_nil
end
it "returns the standby with highest lsn in sync replication" do
expect(postgres_server).to receive(:representative_at).and_return(Time.now)
expect(resource).to receive(:ha_type).and_return(PostgresResource::HaType::SYNC)
expect(postgres_server.failover_target.ubid).to eq("pgubidstandby3")
end
it "returns nil if last_known_lsn in unknown for async replication" do
expect(resource).to receive(:ha_type).and_return(PostgresResource::HaType::ASYNC)
expect(postgres_server).to receive(:lsn_monitor).and_return(instance_double(PostgresLsnMonitor, last_known_lsn: nil))
expect(postgres_server.failover_target).to be_nil
end
it "returns nil if lsn difference is too hign for async replication" do
expect(resource).to receive(:ha_type).and_return(PostgresResource::HaType::ASYNC)
expect(postgres_server).to receive(:lsn_monitor).and_return(instance_double(PostgresLsnMonitor, last_known_lsn: "2/0")).twice
expect(postgres_server.failover_target).to be_nil
end
it "returns the standby with highest lsn if lsn difference is not high in async replication" do
expect(resource).to receive(:ha_type).and_return(PostgresResource::HaType::ASYNC)
expect(postgres_server).to receive(:lsn_monitor).and_return(instance_double(PostgresLsnMonitor, last_known_lsn: "1/11")).twice
expect(postgres_server.failover_target.ubid).to eq("pgubidstandby3")
end
end
describe "#failover_target read_replica" do
before do
expect(postgres_server).to receive(:representative_at).and_return(Time.now)
allow(postgres_server).to receive(:read_replica?).and_return(true)
allow(resource).to receive(:servers).and_return([
postgres_server,
instance_double(described_class, ubid: "pgubidstandby1", representative_at: nil, strand: instance_double(Strand, label: "wait_catch_up"), needs_recycling?: false, read_replica?: true),
instance_double(described_class, ubid: "pgubidstandby2", representative_at: nil, current_lsn: "1/5", strand: instance_double(Strand, label: "wait"), needs_recycling?: false, read_replica?: true),
instance_double(described_class, ubid: "pgubidstandby3", representative_at: nil, current_lsn: "1/10", strand: instance_double(Strand, label: "wait"), needs_recycling?: false, read_replica?: true)
])
end
it "returns nil if there is no replica to failover" do
expect(resource).to receive(:servers).and_return([postgres_server]).at_least(:once)
expect(postgres_server.failover_target).to be_nil
end
it "returns nil if there is no fresh read_replica" do
replica_server = described_class.new { it.id = "c068cac7-ed45-82db-bf38-a003582b36ef" }
expect(replica_server).to receive(:resource).and_return(resource)
expect(replica_server).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
expect(replica_server).to receive(:vm).and_return(instance_double(Vm, display_size: "standard-4"))
expect(resource).to receive(:servers).and_return([postgres_server, replica_server]).at_least(:once)
expect(resource).to receive(:target_vm_size).and_return("standard-2")
expect(postgres_server.failover_target).to be_nil
end
it "returns the replica with highest lsn" do
expect(postgres_server.failover_target.ubid).to eq("pgubidstandby3")
end
end
describe "storage_size_gib" do
it "returns the storage size in GiB" do
volume_dataset = instance_double(Sequel::Dataset)
expect(volume_dataset).to receive(:reject).and_return([instance_double(VmStorageVolume, boot: false, size_gib: 64)])
expect(vm).to receive(:vm_storage_volumes_dataset).and_return(volume_dataset)
expect(postgres_server.storage_size_gib).to eq(64)
end
it "returns nil if there is no storage volume" do
volume_dataset = instance_double(Sequel::Dataset)
expect(volume_dataset).to receive(:reject).and_return([])
expect(vm).to receive(:vm_storage_volumes_dataset).and_return(volume_dataset)
expect(postgres_server.storage_size_gib).to be_zero
end
end
describe "lsn_caught_up" do
let(:parent_resource) {
instance_double(PostgresResource, representative_server: instance_double(described_class, current_lsn: "F/F"))
}
before do
allow(resource).to receive(:parent).and_return(parent_resource)
end
it "returns true if the diff is less than 80MB" do
expect(postgres_server).to receive(:read_replica?).and_return(true)
expect(postgres_server).to receive(:run_query).with("SELECT pg_last_wal_replay_lsn()").and_return("F/F")
expect(postgres_server.lsn_caught_up).to be_truthy
end
it "returns true if read replica and the parent representative server is nil" do
expect(postgres_server).to receive(:read_replica?).and_return(true)
expect(parent_resource).to receive(:representative_server).and_return(nil)
expect(postgres_server).to receive(:run_query).with("SELECT pg_last_wal_replay_lsn()").and_return("F/F").twice
expect(postgres_server.lsn_caught_up).to be_truthy
end
it "returns true if read replica and the parent is nil" do
expect(postgres_server).to receive(:read_replica?).and_return(true)
expect(postgres_server.resource).to receive(:parent).and_return(nil)
expect(postgres_server).to receive(:run_query).with("SELECT pg_last_wal_replay_lsn()").and_return("F/F").twice
expect(postgres_server.lsn_caught_up).to be_truthy
end
it "returns false if the diff is less than 80MB" do
expect(postgres_server).to receive(:read_replica?).and_return(true)
expect(postgres_server).to receive(:run_query).with("SELECT pg_last_wal_replay_lsn()").and_return("1/00000000")
expect(postgres_server.lsn_caught_up).to be_falsey
end
it "returns true if the diff is less than 80MB for not read replica and uses the main representative server" do
expect(postgres_server).to receive(:read_replica?).and_return(false)
expect(resource).to receive(:representative_server).and_return(postgres_server)
expect(postgres_server).to receive(:run_query).with("SELECT pg_last_wal_replay_lsn()").and_return("F/F", "F/F")
expect(postgres_server.lsn_caught_up).to be_truthy
end
end
it "initiates a new health monitor session" do
forward = instance_double(Net::SSH::Service::Forward)
expect(forward).to receive(:local_socket)
session = instance_double(Net::SSH::Connection::Session)
expect(session).to receive(:forward).and_return(forward)
expect(postgres_server.vm.sshable).to receive(:start_fresh_session).and_return(session)
postgres_server.init_health_monitor_session
end
it "initiates a new metrics export session" do
session = instance_double(Net::SSH::Connection::Session)
expect(postgres_server.vm.sshable).to receive(:start_fresh_session).and_return(session)
postgres_server.init_metrics_export_session
end
it "checks pulse" do
session = {
ssh_session: instance_double(Net::SSH::Connection::Session),
db_connection: DB
}
pulse = {
reading: "down",
reading_rpt: 5,
reading_chg: Time.now - 30
}
expect(postgres_server).not_to receive(:incr_checkup)
expect(postgres_server).to receive(:primary?).and_return(false)
expect(postgres_server).to receive(:standby?).and_return(false)
postgres_server.check_pulse(session: session, previous_pulse: pulse)
end
it "increments checkup semaphore if pulse is down for a while and the resource is not upgrading" do
session = {
ssh_session: instance_double(Net::SSH::Connection::Session),
db_connection: instance_double(Sequel::Postgres::Database)
}
pulse = {
reading: "down",
reading_rpt: 5,
reading_chg: Time.now - 30
}
expect(session[:db_connection]).to receive(:[]).and_raise(Sequel::DatabaseConnectionError)
expect(postgres_server).to receive(:reload).and_return(postgres_server)
expect(postgres_server).to receive(:incr_checkup)
expect(postgres_server).to receive(:primary?).and_return(false)
expect(postgres_server).to receive(:standby?).and_return(true)
postgres_server.check_pulse(session: session, previous_pulse: pulse)
end
it "uses pg_current_wal_lsn to track lsn for primaries" do
session = {
ssh_session: instance_double(Net::SSH::Connection::Session),
db_connection: instance_double(Sequel::Postgres::Database)
}
pulse = {
reading: "down",
reading_rpt: 5,
reading_chg: Time.now - 30
}
expect(session[:db_connection]).to receive(:[]).with("SELECT pg_current_wal_lsn() AS lsn").and_raise(Sequel::DatabaseConnectionError)
expect(postgres_server).to receive(:primary?).and_return(true)
expect(postgres_server).to receive(:reload).and_return(postgres_server)
expect(postgres_server).to receive(:incr_checkup)
postgres_server.check_pulse(session: session, previous_pulse: pulse)
end
it "uses pg_last_wal_replay_lsn to track lsn for read replicas" do
session = {
ssh_session: instance_double(Net::SSH::Connection::Session),
db_connection: instance_double(Sequel::Postgres::Database)
}
pulse = {
reading: "down",
reading_rpt: 5,
reading_chg: Time.now - 30
}
expect(session[:db_connection]).to receive(:[]).with("SELECT pg_last_wal_replay_lsn() AS lsn").and_raise(Sequel::DatabaseConnectionError)
expect(postgres_server).to receive(:primary?).and_return(false)
expect(postgres_server).to receive(:standby?).and_return(false)
expect(postgres_server).to receive(:reload).and_return(postgres_server)
expect(postgres_server).to receive(:incr_checkup)
postgres_server.check_pulse(session: session, previous_pulse: pulse)
end
it "catches Sequel::Error if updating PostgresLsnMonitor fails" do
lsn_monitor = instance_double(PostgresLsnMonitor, last_known_lsn: "1/5")
expect(PostgresLsnMonitor).to receive(:new).and_return(lsn_monitor)
expect(lsn_monitor).to receive(:insert_conflict).and_return(lsn_monitor)
expect(lsn_monitor).to receive(:save_changes).and_raise(Sequel::Error)
expect(Clog).to receive(:emit).with("Failed to update PostgresLsnMonitor").and_call_original
expect(postgres_server).to receive(:primary?).and_return(true)
postgres_server.check_pulse(session: {db_connection: DB}, previous_pulse: {})
end
it "runs query on vm" do
expect(postgres_server.vm.sshable).to receive(:cmd).with("PGOPTIONS='-c statement_timeout=60s' psql -U postgres -t --csv -v 'ON_ERROR_STOP=1'", stdin: "SELECT 1").and_return("1\n")
expect(postgres_server.run_query("SELECT 1")).to eq("1")
end
it "returns the right storage_device_paths for AWS" do
expect(postgres_server.vm.location).to receive(:aws?).and_return(true)
expect(postgres_server.vm).to receive(:vm_storage_volumes).and_return([instance_double(VmStorageVolume, boot: true, device_path: "/dev/vda"), instance_double(VmStorageVolume, boot: false, device_path: "/dev/nvme1n1"), instance_double(VmStorageVolume, boot: false, device_path: "/dev/nvme2n1")])
expect(postgres_server.vm.sshable).to receive(:cmd).with("lsblk -b -d -o NAME,SIZE | sort -n -k2 | tail -n2 | awk '{print \"/dev/\"$1}'").and_return("/dev/nvme1n1\n/dev/nvme2n1\n")
expect(postgres_server.storage_device_paths).to eq(["/dev/nvme1n1", "/dev/nvme2n1"])
end
it "returns the right storage_device_paths for Hetzner" do
expect(postgres_server.vm).to receive(:vm_storage_volumes).and_return([instance_double(VmStorageVolume, boot: false, device_path: "/dev/vdb")])
expect(postgres_server.storage_device_paths).to eq(["/dev/vdb"])
end
describe "#taking_over?" do
it "returns true if the strand label is 'taking_over'" do
expect(postgres_server).to receive(:strand).and_return(instance_double(Strand, label: "taking_over"))
expect(postgres_server.taking_over?).to be true
end
it "returns false if the strand label is not 'wait'" do
expect(postgres_server).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
expect(postgres_server.taking_over?).to be false
end
end
describe "#switch_to_new_timeline" do
it "switches to new timeline with current parent" do
expect(postgres_server).to receive(:timeline).and_return(instance_double(PostgresTimeline, id: "12b8ad7f-8178-4d27-a61d-e6dc396d69cc"))
expect(Prog::Postgres::PostgresTimelineNexus).to receive(:assemble).and_return(instance_double(PostgresTimeline, id: "1ff21ff9-7534-4d28-820b-1da97199e39e"))
expect(postgres_server).to receive(:update).with(timeline_id: "1ff21ff9-7534-4d28-820b-1da97199e39e", timeline_access: "push")
expect(postgres_server).to receive(:refresh_walg_credentials)
expect { postgres_server.switch_to_new_timeline }.not_to raise_error
end
it "switches to new timeline without current parent" do
expect(Prog::Postgres::PostgresTimelineNexus).to receive(:assemble).and_return(instance_double(PostgresTimeline, id: "98637404-a37b-4991-a70f-1b7e3ffcbf31"))
expect(postgres_server).to receive(:update).with(timeline_id: "98637404-a37b-4991-a70f-1b7e3ffcbf31", timeline_access: "push")
expect(postgres_server).to receive(:refresh_walg_credentials)
expect { postgres_server.switch_to_new_timeline(parent_id: nil) }.not_to raise_error
end
end
describe "#refresh_walg_credentials" do
it "does nothing if timeline has no blob storage" do
expect(postgres_server).to receive(:timeline).and_return(instance_double(PostgresTimeline, blob_storage: nil))
expect(vm.sshable).not_to receive(:cmd)
expect { postgres_server.refresh_walg_credentials }.not_to raise_error
end
it "refreshes walg credentials if timeline has blob storage not on aws" do
timeline = instance_double(PostgresTimeline, blob_storage: instance_double(MinioCluster, root_certs: "root_certs"), aws?: false)
expect(postgres_server).to receive(:timeline).at_least(:once).and_return(timeline)
expect(timeline).to receive(:generate_walg_config).and_return("walg_config")
expect(vm.sshable).to receive(:cmd).with("sudo -u postgres tee /etc/postgresql/wal-g.env > /dev/null", stdin: "walg_config")
expect(vm.sshable).to receive(:cmd).with("sudo tee /usr/lib/ssl/certs/blob_storage_ca.crt > /dev/null", stdin: "root_certs")
expect { postgres_server.refresh_walg_credentials }.not_to raise_error
end
it "refreshes walg credentials if timeline has blob storage on aws" do
timeline = instance_double(PostgresTimeline, blob_storage: instance_double(MinioCluster, root_certs: "root_certs"), aws?: true)
expect(postgres_server).to receive(:timeline).at_least(:once).and_return(timeline)
expect(timeline).to receive(:generate_walg_config).and_return("walg_config")
expect(vm.sshable).to receive(:cmd).with("sudo -u postgres tee /etc/postgresql/wal-g.env > /dev/null", stdin: "walg_config")
expect(vm.sshable).not_to receive(:cmd).with("sudo tee /usr/lib/ssl/certs/blob_storage_ca.crt > /dev/null", stdin: "root_certs")
expect { postgres_server.refresh_walg_credentials }.not_to raise_error
end
end
end