ubicloud/spec/prog/postgres/converge_postgres_resource_spec.rb
shikharbhardwaj 615050d2f8 Add Upgrade steps to Prog::Postgres::ConvergePostgresResource
The Converge prog is now also responsible for matching the current
Postgres version to the desired version. If there is a mismatch (current
< desired), the Converge prog is launched.

Roughly, the Converge prog does the following:
1. Provisions new servers. In case of upgrades, it only provisions upto
   one new standby if no existing standby is suitable for
   upgrades.
2. Wait for the required servers to be ready.
3. Wait for the maintenance window to start.
4. Fence the primary server, and launch pg_upgrade.
5. If the upgrade is successful, replace the current primary with the
   candidate standby.

In case the upgrade fails, we delete the candidate standby and unfence the
primary to bring the database back. During the Upgrade health checking
is effectively disabled as the auto-recovery causes conflicts with the
several restarts of various versions on the candidate.
2025-09-24 01:45:32 +02:00

347 lines
18 KiB
Ruby

# frozen_string_literal: true
require_relative "../../model/spec_helper"
RSpec.describe Prog::Postgres::ConvergePostgresResource do
subject(:nx) { described_class.new(Strand.new(id: "8148ebdf-66b8-8ed0-9c2f-8cfe93f5aa77")) }
let(:postgres_resource) {
instance_double(
PostgresResource,
id: "8148ebdf-66b8-8ed0-9c2f-8cfe93f5aa77",
servers: [
instance_double(PostgresServer),
instance_double(PostgresServer)
],
timeline: instance_double(PostgresTimeline, id: "timeline-id"),
location: instance_double(Location, aws?: false)
)
}
before do
allow(nx).to receive(:postgres_resource).and_return(postgres_resource)
end
describe "#start" do
it "registers a deadline" do
expect(nx).to receive(:register_deadline).with("prune_servers", 2 * 60 * 60)
expect { nx.start }.to hop("provision_servers")
end
end
describe "#provision_servers" do
before do
allow(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(false)
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, id: "vmh-id-1")))
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, id: "vmh-id-2")))
end
it "hops to wait_servers_to_be_ready if there are enough fresh servers" do
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(true)
expect { nx.provision_servers }.to hop("wait_servers_to_be_ready")
end
it "does not provision a new server if there is a server that is not assigned to a vm_host" do
expect(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: nil))
expect(Prog::Postgres::PostgresServerNexus).not_to receive(:assemble)
expect { nx.provision_servers }.to nap
end
it "provisions a new server without excluding hosts in development environment" do
allow(Config).to receive(:development?).and_return(true)
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::HETZNER_PROVIDER_NAME).at_least(:once)
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(exclude_host_ids: []))
expect { nx.provision_servers }.to nap
end
it "provisions a new server but excludes currently used data centers" do
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::HETZNER_PROVIDER_NAME).at_least(:once)
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, data_center: "dc1")))
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, data_center: "dc2")))
expect(VmHost).to receive(:where).with(data_center: ["dc1", "dc2"]).and_return([instance_double(VmHost, id: "vmh-id-1"), instance_double(VmHost, id: "vmh-id-2")])
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(exclude_host_ids: ["vmh-id-1", "vmh-id-2"]))
expect { nx.provision_servers }.to nap
end
it "provisions a new server but excludes currently used az for aws" do
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::AWS_PROVIDER_NAME).at_least(:once)
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "a"))))
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "b"))))
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(exclude_availability_zones: ["a", "b"]))
expect(postgres_resource).to receive(:use_different_az_set?).and_return(true)
expect { nx.provision_servers }.to nap
end
it "provisions a new server in a used az for aws if use_different_az_set? is false" do
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::AWS_PROVIDER_NAME).at_least(:once)
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "a"))))
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "b"))))
expect(postgres_resource).to receive(:representative_server).and_return(postgres_resource.servers[0])
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(availability_zone: "a"))
expect(postgres_resource).to receive(:use_different_az_set?).and_return(false)
expect { nx.provision_servers }.to nap
end
end
describe "#wait_servers_to_be_ready" do
it "hops to provision_servers if there is not enough fresh servers" do
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(false)
expect { nx.wait_servers_to_be_ready }.to hop("provision_servers")
end
it "hops to wait_for_maintenance_window if there are enough ready servers" do
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(true)
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(true)
expect { nx.wait_servers_to_be_ready }.to hop("wait_for_maintenance_window")
end
it "waits if there are not enough ready servers" do
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(true)
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(false)
expect { nx.wait_servers_to_be_ready }.to nap
end
end
describe "#recycle_representative_server" do
it "waits until there is a representative server to act on it" do
expect(postgres_resource).to receive(:representative_server).and_return(nil)
expect { nx.recycle_representative_server }.to nap
end
it "hops to prune_servers if the representative server does not need recycling" do
expect(postgres_resource).to receive(:representative_server).and_return(instance_double(PostgresServer, needs_recycling?: false)).at_least(:once)
expect(postgres_resource).to receive(:ongoing_failover?).and_return(false)
expect { nx.recycle_representative_server }.to hop("prune_servers")
end
it "hops to provision_servers if there are not enough ready servers" do
expect(postgres_resource).to receive(:representative_server).and_return(instance_double(PostgresServer, needs_recycling?: true)).at_least(:once)
expect(postgres_resource).to receive(:ongoing_failover?).and_return(false)
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(false)
expect { nx.recycle_representative_server }.to hop("provision_servers")
end
it "triggers failover directly when called" do
expect(postgres_resource).to receive(:representative_server).and_return(instance_double(PostgresServer, needs_recycling?: true)).at_least(:once)
expect(postgres_resource).to receive(:ongoing_failover?).and_return(false)
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(true)
expect(postgres_resource.representative_server).to receive(:trigger_failover)
expect { nx.recycle_representative_server }.to nap(60)
end
end
describe "#wait_for_maintenance_window" do
it "hops to recycle_representative_server if in maintenance window and not upgrading" do
expect(postgres_resource).to receive(:in_maintenance_window?).and_return(true)
expect(postgres_resource).to receive(:needs_upgrade?).and_return(false)
expect { nx.wait_for_maintenance_window }.to hop("recycle_representative_server")
end
it "fences primary and hops to wait_fence_primary if in maintenance window and upgrading" do
expect(postgres_resource).to receive(:in_maintenance_window?).and_return(true)
expect(postgres_resource).to receive(:needs_upgrade?).and_return(true)
primary = instance_double(PostgresServer)
expect(postgres_resource).to receive(:representative_server).and_return(primary)
expect(primary).to receive(:incr_fence)
expect { nx.wait_for_maintenance_window }.to hop("wait_fence_primary")
end
it "waits if not in maintenance window" do
expect(postgres_resource).to receive(:in_maintenance_window?).and_return(false)
expect { nx.wait_for_maintenance_window }.to nap(10 * 60)
end
end
describe "#wait_fence_primary" do
it "hops to upgrade_standby when primary is fenced" do
primary = instance_double(PostgresServer, strand: instance_double(Strand, label: "wait_fence"))
expect(postgres_resource).to receive(:representative_server).and_return(primary)
expect { nx.wait_fence_primary }.to hop("upgrade_standby")
end
it "waits when primary is not yet fenced" do
primary = instance_double(PostgresServer, strand: instance_double(Strand, label: "wait"))
expect(postgres_resource).to receive(:representative_server).and_return(primary)
expect { nx.wait_fence_primary }.to nap(5)
end
end
describe "#upgrade_standby" do
let(:candidate) { instance_double(PostgresServer, vm: instance_double(Vm, sshable: instance_double(Sshable))) }
before do
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
end
it "hops to update_metadata when upgrade succeeds" do
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("Succeeded")
expect(candidate.vm.sshable).to receive(:d_clean).with("upgrade_postgres")
expect { nx.upgrade_standby }.to hop("update_metadata")
end
it "hops to upgrade_failed when upgrade fails" do
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("Failed")
expect { nx.upgrade_standby }.to hop("upgrade_failed")
end
it "starts upgrade when not started" do
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("NotStarted")
expect(candidate.vm.sshable).to receive(:d_run).with("upgrade_postgres", "sudo", "postgres/bin/upgrade", anything)
expect(postgres_resource).to receive(:version).and_return("17")
expect { nx.upgrade_standby }.to nap(5)
end
it "naps if status of the upgrade is unknown" do
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("Unknown")
expect { nx.upgrade_standby }.to nap(5)
end
end
describe "#update_metadata" do
let(:candidate) { instance_double(PostgresServer) }
let(:new_timeline) { instance_double(Strand, id: "new_timeline_id") }
before do
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
end
it "creates new timeline and updates candidate server metadata" do
expect(Prog::Postgres::PostgresTimelineNexus).to receive(:assemble).with(location_id: anything).and_return(new_timeline)
expect(postgres_resource).to receive(:location_id).and_return("location_id")
expect(postgres_resource).to receive(:version).and_return("17")
expect(candidate).to receive(:update).with(version: "17", timeline_id: "new_timeline_id")
expect(candidate).to receive(:incr_refresh_walg_credentials)
expect(candidate).to receive(:incr_configure)
expect(candidate).to receive(:incr_restart)
expect(candidate).to receive(:incr_unplanned_take_over)
expect { nx.update_metadata }.to hop("wait_takeover")
end
end
describe "#wait_takeover" do
it "hops to prune_servers when representative server is in wait state" do
rep_server = instance_double(PostgresServer, strand: instance_double(Strand, label: "wait"))
expect(postgres_resource).to receive(:representative_server).and_return(rep_server)
expect { nx.wait_takeover }.to hop("prune_servers")
end
it "waits when representative server is not in wait state" do
rep_server = instance_double(PostgresServer, strand: instance_double(Strand, label: "taking_over"))
expect(postgres_resource).to receive(:representative_server).and_return(rep_server)
expect { nx.wait_takeover }.to nap(5)
end
it "waits when there is no representative server" do
expect(postgres_resource).to receive(:representative_server).and_return(nil)
expect { nx.wait_takeover }.to nap(5)
end
end
describe "#upgrade_failed" do
let(:candidate) { instance_double(PostgresServer, vm: instance_double(Vm, sshable: instance_double(Sshable))) }
let(:primary) { instance_double(PostgresServer, strand: instance_double(Strand, label: "wait_fence")) }
before do
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
allow(postgres_resource).to receive(:representative_server).and_return(primary)
end
it "logs failure and destroys candidate server" do
expect(candidate).to receive(:destroy_set?).and_return(false)
expect(candidate.vm.sshable).to receive(:cmd).with("sudo journalctl -u upgrade_postgres").and_return("log line 1\nlog line 2")
expect(Clog).to receive(:emit).with("Postgres resource upgrade failed").and_yield.twice
expect(candidate).to receive(:incr_destroy)
expect(primary).to receive(:incr_unfence)
expect(postgres_resource).to receive(:id).and_return("resource_id").twice
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
end
it "unfences primary if it is fenced" do
allow(candidate).to receive(:destroy_set?).and_return(false)
allow(candidate.vm.sshable).to receive(:cmd).and_return("")
allow(Clog).to receive(:emit)
expect(candidate).to receive(:incr_destroy)
expect(primary).to receive(:incr_unfence)
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
end
it "does not unfence if primary is not fenced" do
allow(primary).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
allow(candidate.vm.sshable).to receive(:cmd).and_return("")
allow(candidate).to receive(:destroy_set?).and_return(false)
allow(Clog).to receive(:emit)
expect(candidate).to receive(:incr_destroy)
expect(primary).not_to receive(:incr_unfence)
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
end
it "handles case when candidate is nil" do
allow(nx).to receive(:upgrade_candidate).and_return(nil)
allow(primary).to receive(:incr_unfence) # Allow but don't expect since logic still runs
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
end
it "handles case when candidate is not nil but destroy_set? is true" do
allow(candidate).to receive(:destroy_set?).and_return(true)
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
allow(primary).to receive(:incr_unfence) # Allow but don't expect since logic still runs
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
end
end
describe "#prune_servers" do
it "destroys extra servers but keeps those that don't need recycling and match current version" do
expect(postgres_resource).to receive(:servers).and_return([
instance_double(PostgresServer, representative_at: "yesterday", needs_recycling?: false, created_at: 1, strand: instance_double(Strand, label: "wait"), version: "17"),
instance_double(PostgresServer, representative_at: nil, needs_recycling?: true, created_at: 5, strand: instance_double(Strand, label: "wait"), version: "17"),
instance_double(PostgresServer, representative_at: nil, needs_recycling?: false, created_at: 4, strand: instance_double(Strand, label: "unavailable"), version: "17"),
instance_double(PostgresServer, representative_at: nil, needs_recycling?: false, created_at: 3, strand: instance_double(Strand, label: "wait"), version: "17"),
instance_double(PostgresServer, representative_at: nil, needs_recycling?: false, created_at: 2, strand: instance_double(Strand, label: "wait"), version: "17")
]).at_least(:once)
expect(postgres_resource).to receive(:version).and_return("17").at_least(:once)
expect(postgres_resource).to receive(:representative_server).and_return(postgres_resource.servers[0])
expect(postgres_resource).to receive(:target_standby_count).and_return(1).at_least(:once)
expect(postgres_resource.servers[1]).to receive(:incr_destroy)
expect(postgres_resource.servers[2]).to receive(:incr_destroy)
expect(postgres_resource.servers[4]).to receive(:incr_destroy)
expect(postgres_resource.servers[0]).to receive(:incr_configure)
expect(postgres_resource.servers[3]).to receive(:incr_configure)
expect(postgres_resource).to receive(:incr_update_billing_records)
expect { nx.prune_servers }.to exit
end
it "destroys servers with older versions" do
old_server = instance_double(PostgresServer, version: "16", representative_at: nil, needs_recycling?: false, created_at: 1, strand: instance_double(Strand, label: "wait"))
new_server = instance_double(PostgresServer, version: "17", representative_at: "yesterday", needs_recycling?: false, created_at: 2, strand: instance_double(Strand, label: "wait"))
expect(postgres_resource).to receive(:servers).and_return([old_server, new_server]).at_least(:once)
expect(postgres_resource).to receive(:version).and_return("17").at_least(:once)
expect(old_server).to receive(:incr_destroy)
# Mock the normal pruning logic
expect(postgres_resource).to receive(:representative_server).and_return(new_server)
expect(postgres_resource).to receive(:target_standby_count).and_return(0)
expect(new_server).to receive(:incr_configure)
expect(postgres_resource).to receive(:incr_update_billing_records)
expect { nx.prune_servers }.to exit
end
end
describe "#upgrade_candidate" do
it "returns the upgrade candidate server" do
expect(postgres_resource).to receive(:upgrade_candidate_server).at_least(:once).and_return(instance_double(PostgresServer, version: "16"))
expect(nx.upgrade_candidate).to eq(postgres_resource.upgrade_candidate_server)
end
end
end