mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-06 23:01:56 +08:00
The Converge prog is now also responsible for matching the current Postgres version to the desired version. If there is a mismatch (current < desired), the Converge prog is launched. Roughly, the Converge prog does the following: 1. Provisions new servers. In case of upgrades, it only provisions upto one new standby if no existing standby is suitable for upgrades. 2. Wait for the required servers to be ready. 3. Wait for the maintenance window to start. 4. Fence the primary server, and launch pg_upgrade. 5. If the upgrade is successful, replace the current primary with the candidate standby. In case the upgrade fails, we delete the candidate standby and unfence the primary to bring the database back. During the Upgrade health checking is effectively disabled as the auto-recovery causes conflicts with the several restarts of various versions on the candidate.
347 lines
18 KiB
Ruby
347 lines
18 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "../../model/spec_helper"
|
|
|
|
RSpec.describe Prog::Postgres::ConvergePostgresResource do
|
|
subject(:nx) { described_class.new(Strand.new(id: "8148ebdf-66b8-8ed0-9c2f-8cfe93f5aa77")) }
|
|
|
|
let(:postgres_resource) {
|
|
instance_double(
|
|
PostgresResource,
|
|
id: "8148ebdf-66b8-8ed0-9c2f-8cfe93f5aa77",
|
|
servers: [
|
|
instance_double(PostgresServer),
|
|
instance_double(PostgresServer)
|
|
],
|
|
timeline: instance_double(PostgresTimeline, id: "timeline-id"),
|
|
location: instance_double(Location, aws?: false)
|
|
)
|
|
}
|
|
|
|
before do
|
|
allow(nx).to receive(:postgres_resource).and_return(postgres_resource)
|
|
end
|
|
|
|
describe "#start" do
|
|
it "registers a deadline" do
|
|
expect(nx).to receive(:register_deadline).with("prune_servers", 2 * 60 * 60)
|
|
expect { nx.start }.to hop("provision_servers")
|
|
end
|
|
end
|
|
|
|
describe "#provision_servers" do
|
|
before do
|
|
allow(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(false)
|
|
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, id: "vmh-id-1")))
|
|
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, id: "vmh-id-2")))
|
|
end
|
|
|
|
it "hops to wait_servers_to_be_ready if there are enough fresh servers" do
|
|
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(true)
|
|
expect { nx.provision_servers }.to hop("wait_servers_to_be_ready")
|
|
end
|
|
|
|
it "does not provision a new server if there is a server that is not assigned to a vm_host" do
|
|
expect(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: nil))
|
|
expect(Prog::Postgres::PostgresServerNexus).not_to receive(:assemble)
|
|
expect { nx.provision_servers }.to nap
|
|
end
|
|
|
|
it "provisions a new server without excluding hosts in development environment" do
|
|
allow(Config).to receive(:development?).and_return(true)
|
|
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::HETZNER_PROVIDER_NAME).at_least(:once)
|
|
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(exclude_host_ids: []))
|
|
expect { nx.provision_servers }.to nap
|
|
end
|
|
|
|
it "provisions a new server but excludes currently used data centers" do
|
|
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::HETZNER_PROVIDER_NAME).at_least(:once)
|
|
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, data_center: "dc1")))
|
|
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost, data_center: "dc2")))
|
|
expect(VmHost).to receive(:where).with(data_center: ["dc1", "dc2"]).and_return([instance_double(VmHost, id: "vmh-id-1"), instance_double(VmHost, id: "vmh-id-2")])
|
|
|
|
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(exclude_host_ids: ["vmh-id-1", "vmh-id-2"]))
|
|
expect { nx.provision_servers }.to nap
|
|
end
|
|
|
|
it "provisions a new server but excludes currently used az for aws" do
|
|
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::AWS_PROVIDER_NAME).at_least(:once)
|
|
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "a"))))
|
|
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "b"))))
|
|
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(exclude_availability_zones: ["a", "b"]))
|
|
expect(postgres_resource).to receive(:use_different_az_set?).and_return(true)
|
|
expect { nx.provision_servers }.to nap
|
|
end
|
|
|
|
it "provisions a new server in a used az for aws if use_different_az_set? is false" do
|
|
expect(postgres_resource.location).to receive(:provider).and_return(HostProvider::AWS_PROVIDER_NAME).at_least(:once)
|
|
allow(postgres_resource.servers[0]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "a"))))
|
|
allow(postgres_resource.servers[1]).to receive(:vm).and_return(instance_double(Vm, vm_host: instance_double(VmHost), nic: instance_double(Nic, nic_aws_resource: instance_double(NicAwsResource, subnet_az: "b"))))
|
|
expect(postgres_resource).to receive(:representative_server).and_return(postgres_resource.servers[0])
|
|
expect(Prog::Postgres::PostgresServerNexus).to receive(:assemble).with(hash_including(availability_zone: "a"))
|
|
expect(postgres_resource).to receive(:use_different_az_set?).and_return(false)
|
|
expect { nx.provision_servers }.to nap
|
|
end
|
|
end
|
|
|
|
describe "#wait_servers_to_be_ready" do
|
|
it "hops to provision_servers if there is not enough fresh servers" do
|
|
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(false)
|
|
expect { nx.wait_servers_to_be_ready }.to hop("provision_servers")
|
|
end
|
|
|
|
it "hops to wait_for_maintenance_window if there are enough ready servers" do
|
|
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(true)
|
|
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(true)
|
|
expect { nx.wait_servers_to_be_ready }.to hop("wait_for_maintenance_window")
|
|
end
|
|
|
|
it "waits if there are not enough ready servers" do
|
|
expect(postgres_resource).to receive(:has_enough_fresh_servers?).and_return(true)
|
|
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(false)
|
|
expect { nx.wait_servers_to_be_ready }.to nap
|
|
end
|
|
end
|
|
|
|
describe "#recycle_representative_server" do
|
|
it "waits until there is a representative server to act on it" do
|
|
expect(postgres_resource).to receive(:representative_server).and_return(nil)
|
|
expect { nx.recycle_representative_server }.to nap
|
|
end
|
|
|
|
it "hops to prune_servers if the representative server does not need recycling" do
|
|
expect(postgres_resource).to receive(:representative_server).and_return(instance_double(PostgresServer, needs_recycling?: false)).at_least(:once)
|
|
expect(postgres_resource).to receive(:ongoing_failover?).and_return(false)
|
|
expect { nx.recycle_representative_server }.to hop("prune_servers")
|
|
end
|
|
|
|
it "hops to provision_servers if there are not enough ready servers" do
|
|
expect(postgres_resource).to receive(:representative_server).and_return(instance_double(PostgresServer, needs_recycling?: true)).at_least(:once)
|
|
expect(postgres_resource).to receive(:ongoing_failover?).and_return(false)
|
|
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(false)
|
|
expect { nx.recycle_representative_server }.to hop("provision_servers")
|
|
end
|
|
|
|
it "triggers failover directly when called" do
|
|
expect(postgres_resource).to receive(:representative_server).and_return(instance_double(PostgresServer, needs_recycling?: true)).at_least(:once)
|
|
expect(postgres_resource).to receive(:ongoing_failover?).and_return(false)
|
|
expect(postgres_resource).to receive(:has_enough_ready_servers?).and_return(true)
|
|
expect(postgres_resource.representative_server).to receive(:trigger_failover)
|
|
expect { nx.recycle_representative_server }.to nap(60)
|
|
end
|
|
end
|
|
|
|
describe "#wait_for_maintenance_window" do
|
|
it "hops to recycle_representative_server if in maintenance window and not upgrading" do
|
|
expect(postgres_resource).to receive(:in_maintenance_window?).and_return(true)
|
|
expect(postgres_resource).to receive(:needs_upgrade?).and_return(false)
|
|
expect { nx.wait_for_maintenance_window }.to hop("recycle_representative_server")
|
|
end
|
|
|
|
it "fences primary and hops to wait_fence_primary if in maintenance window and upgrading" do
|
|
expect(postgres_resource).to receive(:in_maintenance_window?).and_return(true)
|
|
expect(postgres_resource).to receive(:needs_upgrade?).and_return(true)
|
|
primary = instance_double(PostgresServer)
|
|
expect(postgres_resource).to receive(:representative_server).and_return(primary)
|
|
expect(primary).to receive(:incr_fence)
|
|
expect { nx.wait_for_maintenance_window }.to hop("wait_fence_primary")
|
|
end
|
|
|
|
it "waits if not in maintenance window" do
|
|
expect(postgres_resource).to receive(:in_maintenance_window?).and_return(false)
|
|
expect { nx.wait_for_maintenance_window }.to nap(10 * 60)
|
|
end
|
|
end
|
|
|
|
describe "#wait_fence_primary" do
|
|
it "hops to upgrade_standby when primary is fenced" do
|
|
primary = instance_double(PostgresServer, strand: instance_double(Strand, label: "wait_fence"))
|
|
expect(postgres_resource).to receive(:representative_server).and_return(primary)
|
|
expect { nx.wait_fence_primary }.to hop("upgrade_standby")
|
|
end
|
|
|
|
it "waits when primary is not yet fenced" do
|
|
primary = instance_double(PostgresServer, strand: instance_double(Strand, label: "wait"))
|
|
expect(postgres_resource).to receive(:representative_server).and_return(primary)
|
|
expect { nx.wait_fence_primary }.to nap(5)
|
|
end
|
|
end
|
|
|
|
describe "#upgrade_standby" do
|
|
let(:candidate) { instance_double(PostgresServer, vm: instance_double(Vm, sshable: instance_double(Sshable))) }
|
|
|
|
before do
|
|
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
|
|
end
|
|
|
|
it "hops to update_metadata when upgrade succeeds" do
|
|
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("Succeeded")
|
|
expect(candidate.vm.sshable).to receive(:d_clean).with("upgrade_postgres")
|
|
expect { nx.upgrade_standby }.to hop("update_metadata")
|
|
end
|
|
|
|
it "hops to upgrade_failed when upgrade fails" do
|
|
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("Failed")
|
|
expect { nx.upgrade_standby }.to hop("upgrade_failed")
|
|
end
|
|
|
|
it "starts upgrade when not started" do
|
|
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("NotStarted")
|
|
expect(candidate.vm.sshable).to receive(:d_run).with("upgrade_postgres", "sudo", "postgres/bin/upgrade", anything)
|
|
expect(postgres_resource).to receive(:version).and_return("17")
|
|
expect { nx.upgrade_standby }.to nap(5)
|
|
end
|
|
|
|
it "naps if status of the upgrade is unknown" do
|
|
expect(candidate.vm.sshable).to receive(:d_check).with("upgrade_postgres").and_return("Unknown")
|
|
expect { nx.upgrade_standby }.to nap(5)
|
|
end
|
|
end
|
|
|
|
describe "#update_metadata" do
|
|
let(:candidate) { instance_double(PostgresServer) }
|
|
let(:new_timeline) { instance_double(Strand, id: "new_timeline_id") }
|
|
|
|
before do
|
|
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
|
|
end
|
|
|
|
it "creates new timeline and updates candidate server metadata" do
|
|
expect(Prog::Postgres::PostgresTimelineNexus).to receive(:assemble).with(location_id: anything).and_return(new_timeline)
|
|
expect(postgres_resource).to receive(:location_id).and_return("location_id")
|
|
expect(postgres_resource).to receive(:version).and_return("17")
|
|
|
|
expect(candidate).to receive(:update).with(version: "17", timeline_id: "new_timeline_id")
|
|
expect(candidate).to receive(:incr_refresh_walg_credentials)
|
|
expect(candidate).to receive(:incr_configure)
|
|
expect(candidate).to receive(:incr_restart)
|
|
expect(candidate).to receive(:incr_unplanned_take_over)
|
|
|
|
expect { nx.update_metadata }.to hop("wait_takeover")
|
|
end
|
|
end
|
|
|
|
describe "#wait_takeover" do
|
|
it "hops to prune_servers when representative server is in wait state" do
|
|
rep_server = instance_double(PostgresServer, strand: instance_double(Strand, label: "wait"))
|
|
expect(postgres_resource).to receive(:representative_server).and_return(rep_server)
|
|
expect { nx.wait_takeover }.to hop("prune_servers")
|
|
end
|
|
|
|
it "waits when representative server is not in wait state" do
|
|
rep_server = instance_double(PostgresServer, strand: instance_double(Strand, label: "taking_over"))
|
|
expect(postgres_resource).to receive(:representative_server).and_return(rep_server)
|
|
expect { nx.wait_takeover }.to nap(5)
|
|
end
|
|
|
|
it "waits when there is no representative server" do
|
|
expect(postgres_resource).to receive(:representative_server).and_return(nil)
|
|
expect { nx.wait_takeover }.to nap(5)
|
|
end
|
|
end
|
|
|
|
describe "#upgrade_failed" do
|
|
let(:candidate) { instance_double(PostgresServer, vm: instance_double(Vm, sshable: instance_double(Sshable))) }
|
|
let(:primary) { instance_double(PostgresServer, strand: instance_double(Strand, label: "wait_fence")) }
|
|
|
|
before do
|
|
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
|
|
allow(postgres_resource).to receive(:representative_server).and_return(primary)
|
|
end
|
|
|
|
it "logs failure and destroys candidate server" do
|
|
expect(candidate).to receive(:destroy_set?).and_return(false)
|
|
expect(candidate.vm.sshable).to receive(:cmd).with("sudo journalctl -u upgrade_postgres").and_return("log line 1\nlog line 2")
|
|
expect(Clog).to receive(:emit).with("Postgres resource upgrade failed").and_yield.twice
|
|
expect(candidate).to receive(:incr_destroy)
|
|
expect(primary).to receive(:incr_unfence)
|
|
expect(postgres_resource).to receive(:id).and_return("resource_id").twice
|
|
|
|
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
|
|
end
|
|
|
|
it "unfences primary if it is fenced" do
|
|
allow(candidate).to receive(:destroy_set?).and_return(false)
|
|
allow(candidate.vm.sshable).to receive(:cmd).and_return("")
|
|
allow(Clog).to receive(:emit)
|
|
expect(candidate).to receive(:incr_destroy)
|
|
expect(primary).to receive(:incr_unfence)
|
|
|
|
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
|
|
end
|
|
|
|
it "does not unfence if primary is not fenced" do
|
|
allow(primary).to receive(:strand).and_return(instance_double(Strand, label: "wait"))
|
|
allow(candidate.vm.sshable).to receive(:cmd).and_return("")
|
|
allow(candidate).to receive(:destroy_set?).and_return(false)
|
|
allow(Clog).to receive(:emit)
|
|
expect(candidate).to receive(:incr_destroy)
|
|
expect(primary).not_to receive(:incr_unfence)
|
|
|
|
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
|
|
end
|
|
|
|
it "handles case when candidate is nil" do
|
|
allow(nx).to receive(:upgrade_candidate).and_return(nil)
|
|
allow(primary).to receive(:incr_unfence) # Allow but don't expect since logic still runs
|
|
|
|
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
|
|
end
|
|
|
|
it "handles case when candidate is not nil but destroy_set? is true" do
|
|
allow(candidate).to receive(:destroy_set?).and_return(true)
|
|
allow(nx).to receive(:upgrade_candidate).and_return(candidate)
|
|
allow(primary).to receive(:incr_unfence) # Allow but don't expect since logic still runs
|
|
|
|
expect { nx.upgrade_failed }.to nap(6 * 60 * 60)
|
|
end
|
|
end
|
|
|
|
describe "#prune_servers" do
|
|
it "destroys extra servers but keeps those that don't need recycling and match current version" do
|
|
expect(postgres_resource).to receive(:servers).and_return([
|
|
instance_double(PostgresServer, representative_at: "yesterday", needs_recycling?: false, created_at: 1, strand: instance_double(Strand, label: "wait"), version: "17"),
|
|
instance_double(PostgresServer, representative_at: nil, needs_recycling?: true, created_at: 5, strand: instance_double(Strand, label: "wait"), version: "17"),
|
|
instance_double(PostgresServer, representative_at: nil, needs_recycling?: false, created_at: 4, strand: instance_double(Strand, label: "unavailable"), version: "17"),
|
|
instance_double(PostgresServer, representative_at: nil, needs_recycling?: false, created_at: 3, strand: instance_double(Strand, label: "wait"), version: "17"),
|
|
instance_double(PostgresServer, representative_at: nil, needs_recycling?: false, created_at: 2, strand: instance_double(Strand, label: "wait"), version: "17")
|
|
]).at_least(:once)
|
|
expect(postgres_resource).to receive(:version).and_return("17").at_least(:once)
|
|
expect(postgres_resource).to receive(:representative_server).and_return(postgres_resource.servers[0])
|
|
expect(postgres_resource).to receive(:target_standby_count).and_return(1).at_least(:once)
|
|
|
|
expect(postgres_resource.servers[1]).to receive(:incr_destroy)
|
|
expect(postgres_resource.servers[2]).to receive(:incr_destroy)
|
|
expect(postgres_resource.servers[4]).to receive(:incr_destroy)
|
|
|
|
expect(postgres_resource.servers[0]).to receive(:incr_configure)
|
|
expect(postgres_resource.servers[3]).to receive(:incr_configure)
|
|
expect(postgres_resource).to receive(:incr_update_billing_records)
|
|
|
|
expect { nx.prune_servers }.to exit
|
|
end
|
|
|
|
it "destroys servers with older versions" do
|
|
old_server = instance_double(PostgresServer, version: "16", representative_at: nil, needs_recycling?: false, created_at: 1, strand: instance_double(Strand, label: "wait"))
|
|
new_server = instance_double(PostgresServer, version: "17", representative_at: "yesterday", needs_recycling?: false, created_at: 2, strand: instance_double(Strand, label: "wait"))
|
|
expect(postgres_resource).to receive(:servers).and_return([old_server, new_server]).at_least(:once)
|
|
expect(postgres_resource).to receive(:version).and_return("17").at_least(:once)
|
|
expect(old_server).to receive(:incr_destroy)
|
|
|
|
# Mock the normal pruning logic
|
|
expect(postgres_resource).to receive(:representative_server).and_return(new_server)
|
|
expect(postgres_resource).to receive(:target_standby_count).and_return(0)
|
|
expect(new_server).to receive(:incr_configure)
|
|
expect(postgres_resource).to receive(:incr_update_billing_records)
|
|
|
|
expect { nx.prune_servers }.to exit
|
|
end
|
|
end
|
|
|
|
describe "#upgrade_candidate" do
|
|
it "returns the upgrade candidate server" do
|
|
expect(postgres_resource).to receive(:upgrade_candidate_server).at_least(:once).and_return(instance_double(PostgresServer, version: "16"))
|
|
expect(nx.upgrade_candidate).to eq(postgres_resource.upgrade_candidate_server)
|
|
end
|
|
end
|
|
end
|