Files
ubicloud/prog/postgres/postgres_server_nexus.rb
Enes Cakir 6944422dff Nap 6 hours while waiting for the semaphore to increase
When we increase the semaphore, we have already scheduled the strand for
now.

If the label is just waiting for the semaphore to increase, there's no
need for short naps.

Most of our wait labels are just waiting for the semaphore to increase,
so I extended their naps to 6 hours.

It will help decrease the load on the respirate on production

[^1]: 28dacb968b/model/semaphore.rb (L10)
2025-03-24 09:10:18 +03:00

468 lines
15 KiB
Ruby

# frozen_string_literal: true
require "forwardable"
require_relative "../../lib/util"
class Prog::Postgres::PostgresServerNexus < Prog::Base
subject_is :postgres_server
extend Forwardable
def_delegators :postgres_server, :vm
def self.assemble(resource_id:, timeline_id:, timeline_access:, representative_at: nil, exclude_host_ids: [])
DB.transaction do
ubid = PostgresServer.generate_ubid
postgres_resource = PostgresResource[resource_id]
boot_image = case postgres_resource.flavor
when PostgresResource::Flavor::STANDARD then "postgres#{postgres_resource.version}-ubuntu-2204"
when PostgresResource::Flavor::PARADEDB then "postgres#{postgres_resource.version}-paradedb-ubuntu-2204"
when PostgresResource::Flavor::LANTERN then "postgres#{postgres_resource.version}-lantern-ubuntu-2204"
else raise "Unknown PostgreSQL flavor: #{postgres_resource.flavor}"
end
vm_st = Prog::Vm::Nexus.assemble_with_sshable(
Config.postgres_service_project_id,
sshable_unix_user: "ubi",
location_id: postgres_resource.location_id,
name: ubid.to_s,
size: postgres_resource.target_vm_size,
storage_volumes: [
{encrypted: true, size_gib: 30},
{encrypted: true, size_gib: postgres_resource.target_storage_size_gib}
],
boot_image: boot_image,
private_subnet_id: postgres_resource.private_subnet_id,
enable_ip4: true,
exclude_host_ids: exclude_host_ids
)
synchronization_status = representative_at ? "ready" : "catching_up"
postgres_server = PostgresServer.create(
resource_id: resource_id,
timeline_id: timeline_id,
timeline_access: timeline_access,
representative_at: representative_at,
synchronization_status: synchronization_status,
vm_id: vm_st.id
) { _1.id = ubid.to_uuid }
Strand.create(prog: "Postgres::PostgresServerNexus", label: "start") { _1.id = postgres_server.id }
end
end
def before_run
when_destroy_set? do
should_destroy = if ["destroy", nil].include?(postgres_server.resource&.strand&.label)
true
else
!(@snap.set?(:take_over) || ["prepare_for_take_over", "taking_over"].include?(strand.label))
end
if should_destroy
if strand.label != "destroy"
hop_destroy
elsif strand.stack.count > 1
pop "operation is cancelled due to the destruction of the postgres server"
end
else
Clog.emit("Postgres server deletion is cancelled, because it is in the process of taking over the primary role")
decr_destroy
end
end
end
label def start
nap 5 unless vm.strand.label == "wait"
postgres_server.incr_initial_provisioning
hop_bootstrap_rhizome
end
label def bootstrap_rhizome
if postgres_server.primary?
register_deadline("wait", 10 * 60)
else
register_deadline("wait", 120 * 60)
end
bud Prog::BootstrapRhizome, {"target_folder" => "postgres", "subject_id" => vm.id, "user" => "ubi"}
hop_wait_bootstrap_rhizome
end
label def wait_bootstrap_rhizome
reap
hop_mount_data_disk if leaf?
donate
end
label def mount_data_disk
case vm.sshable.cmd("common/bin/daemonizer --check format_disk")
when "Succeeded"
vm.sshable.cmd("sudo mkdir -p /dat")
device_path = vm.vm_storage_volumes.find { _1.boot == false }.device_path.shellescape
vm.sshable.cmd("sudo common/bin/add_to_fstab #{device_path} /dat ext4 defaults 0 0")
vm.sshable.cmd("sudo mount #{device_path} /dat")
hop_configure_walg_credentials
when "Failed", "NotStarted"
device_path = vm.vm_storage_volumes.find { _1.boot == false }.device_path.shellescape
vm.sshable.cmd("common/bin/daemonizer 'sudo mkfs --type ext4 #{device_path}' format_disk")
end
nap 5
end
label def configure_walg_credentials
refresh_walg_credentials
hop_initialize_empty_database if postgres_server.primary?
hop_initialize_database_from_backup
end
label def initialize_empty_database
case vm.sshable.cmd("common/bin/daemonizer --check initialize_empty_database")
when "Succeeded"
hop_refresh_certificates
when "Failed", "NotStarted"
vm.sshable.cmd("common/bin/daemonizer 'sudo postgres/bin/initialize-empty-database #{postgres_server.resource.version}' initialize_empty_database")
end
nap 5
end
label def initialize_database_from_backup
case vm.sshable.cmd("common/bin/daemonizer --check initialize_database_from_backup")
when "Succeeded"
hop_refresh_certificates
when "Failed", "NotStarted"
backup_label = if postgres_server.standby?
"LATEST"
else
postgres_server.timeline.latest_backup_label_before_target(target: postgres_server.resource.restore_target)
end
vm.sshable.cmd("common/bin/daemonizer 'sudo postgres/bin/initialize-database-from-backup #{postgres_server.resource.version} #{backup_label}' initialize_database_from_backup")
end
nap 5
end
label def refresh_certificates
decr_refresh_certificates
nap 5 if postgres_server.resource.server_cert.nil?
ca_bundle = postgres_server.resource.ca_certificates
vm.sshable.cmd("sudo tee /etc/ssl/certs/ca.crt > /dev/null", stdin: ca_bundle)
vm.sshable.cmd("sudo tee /etc/ssl/certs/server.crt > /dev/null", stdin: postgres_server.resource.server_cert)
vm.sshable.cmd("sudo tee /etc/ssl/certs/server.key > /dev/null", stdin: postgres_server.resource.server_cert_key)
vm.sshable.cmd("sudo chgrp cert_readers /etc/ssl/certs/ca.crt && sudo chmod 640 /etc/ssl/certs/ca.crt")
vm.sshable.cmd("sudo chgrp cert_readers /etc/ssl/certs/server.crt && sudo chmod 640 /etc/ssl/certs/server.crt")
vm.sshable.cmd("sudo chgrp cert_readers /etc/ssl/certs/server.key && sudo chmod 640 /etc/ssl/certs/server.key")
# MinIO cluster certificate rotation timelines are similar to postgres
# servers' timelines. So we refresh the wal-g credentials which uses MinIO
# certificates when we refresh the certificates of the postgres server.
refresh_walg_credentials
when_initial_provisioning_set? do
hop_configure_prometheus
end
vm.sshable.cmd("sudo -u postgres pg_ctlcluster #{postgres_server.resource.version} main reload")
vm.sshable.cmd("sudo systemctl reload pgbouncer")
hop_wait
end
label def configure_prometheus
web_config = <<CONFIG
tls_server_config:
cert_file: /etc/ssl/certs/server.crt
key_file: /etc/ssl/certs/server.key
CONFIG
vm.sshable.cmd("sudo -u prometheus tee /home/prometheus/web-config.yml > /dev/null", stdin: web_config)
metric_destinations = postgres_server.resource.metric_destinations.map {
<<METRIC_DESTINATION
- url: '#{_1.url}'
basic_auth:
username: '#{_1.username}'
password: '#{_1.password}'
METRIC_DESTINATION
}.prepend("remote_write:").join("\n")
prometheus_config = <<CONFIG
global:
scrape_interval: 10s
external_labels:
ubicloud_resource_id: #{postgres_server.resource.ubid}
ubicloud_resource_role: #{(postgres_server.id == postgres_server.resource.representative_server.id) ? "primary" : "standby"}
scrape_configs:
- job_name: node
static_configs:
- targets: ['localhost:9100']
labels:
instance: '#{postgres_server.ubid}'
- job_name: postgres
static_configs:
- targets: ['localhost:9187']
labels:
instance: '#{postgres_server.ubid}'
#{metric_destinations}
CONFIG
vm.sshable.cmd("sudo -u prometheus tee /home/prometheus/prometheus.yml > /dev/null", stdin: prometheus_config)
when_initial_provisioning_set? do
vm.sshable.cmd("sudo systemctl enable --now postgres_exporter")
vm.sshable.cmd("sudo systemctl enable --now node_exporter")
vm.sshable.cmd("sudo systemctl enable --now prometheus")
hop_configure
end
vm.sshable.cmd("sudo systemctl reload postgres_exporter || sudo systemctl restart postgres_exporter")
vm.sshable.cmd("sudo systemctl reload node_exporter || sudo systemctl restart node_exporter")
vm.sshable.cmd("sudo systemctl reload prometheus || sudo systemctl restart prometheus")
hop_wait
end
label def configure
case vm.sshable.cmd("common/bin/daemonizer --check configure_postgres")
when "Succeeded"
vm.sshable.cmd("common/bin/daemonizer --clean configure_postgres")
when_initial_provisioning_set? do
hop_update_superuser_password if postgres_server.primary?
hop_wait_catch_up if postgres_server.standby?
hop_wait_recovery_completion
end
hop_wait_catch_up if postgres_server.standby? && postgres_server.synchronization_status != "ready"
hop_wait
when "Failed", "NotStarted"
configure_hash = postgres_server.configure_hash
vm.sshable.cmd("common/bin/daemonizer 'sudo postgres/bin/configure #{postgres_server.resource.version}' configure_postgres", stdin: JSON.generate(configure_hash))
end
nap 5
end
label def update_superuser_password
decr_update_superuser_password
encrypted_password = DB.synchronize do |conn|
# This uses PostgreSQL's PQencryptPasswordConn function, but it needs a connection, because
# the encryption is made by PostgreSQL, not by control plane. We use our own control plane
# database to do the encryption.
conn.encrypt_password(postgres_server.resource.superuser_password, "postgres", "scram-sha-256")
end
commands = <<SQL
BEGIN;
SET LOCAL log_statement = 'none';
ALTER ROLE postgres WITH PASSWORD #{DB.literal(encrypted_password)};
COMMIT;
SQL
postgres_server.run_query(commands)
when_initial_provisioning_set? do
if retval&.dig("msg") == "postgres server is restarted"
hop_run_post_installation_script if postgres_server.primary? && postgres_server.resource.flavor != PostgresResource::Flavor::STANDARD
hop_wait
end
push self.class, frame, "restart"
end
hop_wait
end
label def run_post_installation_script
command = <<~COMMAND
set -ueo pipefail
[[ -f /etc/postgresql-partners/post-installation-script ]] || { echo "Post-installation script not found. Exiting..."; exit 0; }
sudo cp /etc/postgresql-partners/post-installation-script postgres/bin/post-installation-script
sudo chown ubi:ubi postgres/bin/post-installation-script
sudo chmod +x postgres/bin/post-installation-script
postgres/bin/post-installation-script
COMMAND
vm.sshable.cmd(command)
hop_wait
end
label def wait_catch_up
query = "SELECT pg_current_wal_lsn() - replay_lsn FROM pg_stat_replication WHERE application_name = '#{postgres_server.ubid}'"
lag = postgres_server.resource.representative_server.run_query(query).chomp
nap 30 if lag.empty? || lag.to_i > 80 * 1024 * 1024 # 80 MB or ~5 WAL files
postgres_server.update(synchronization_status: "ready")
postgres_server.resource.representative_server.incr_configure
hop_wait_synchronization if postgres_server.resource.ha_type == PostgresResource::HaType::SYNC
hop_wait
end
label def wait_synchronization
query = "SELECT sync_state FROM pg_stat_replication WHERE application_name = '#{postgres_server.ubid}'"
sync_state = postgres_server.resource.representative_server.run_query(query).chomp
hop_wait if ["quorum", "sync"].include?(sync_state)
nap 30
end
label def wait_recovery_completion
is_in_recovery = begin
postgres_server.run_query("SELECT pg_is_in_recovery()").chomp == "t"
rescue => ex
raise ex unless ex.stderr.include?("Consistent recovery state has not been yet reached.")
nap 5
end
if is_in_recovery
is_wal_replay_paused = postgres_server.run_query("SELECT pg_get_wal_replay_pause_state()").chomp == "paused"
if is_wal_replay_paused
postgres_server.run_query("SELECT pg_wal_replay_resume()")
is_in_recovery = false
end
end
if !is_in_recovery
timeline_id = Prog::Postgres::PostgresTimelineNexus.assemble(location_id: postgres_server.resource.location_id, parent_id: postgres_server.timeline.id).id
postgres_server.timeline_id = timeline_id
postgres_server.timeline_access = "push"
postgres_server.save_changes
refresh_walg_credentials
hop_configure
end
nap 5
end
label def wait
decr_initial_provisioning
when_take_over_set? do
hop_prepare_for_take_over
end
when_refresh_certificates_set? do
hop_refresh_certificates
end
when_update_superuser_password_set? do
hop_update_superuser_password
end
when_checkup_set? do
hop_unavailable if !available?
decr_checkup
end
when_configure_prometheus_set? do
decr_configure_prometheus
hop_configure_prometheus
end
when_configure_set? do
decr_configure
hop_configure
end
when_restart_set? do
push self.class, frame, "restart"
end
nap 6 * 60 * 60
end
label def unavailable
register_deadline("wait", 10 * 60)
nap 0 if postgres_server.trigger_failover
reap
nap 5 unless strand.children.select { _1.prog == "Postgres::PostgresServerNexus" && _1.label == "restart" }.empty?
if available?
decr_checkup
hop_wait
end
bud self.class, frame, :restart
nap 5
end
label def prepare_for_take_over
decr_take_over
hop_taking_over if postgres_server.resource.representative_server.nil?
postgres_server.resource.representative_server.incr_destroy
nap 5
end
label def taking_over
case vm.sshable.cmd("common/bin/daemonizer --check promote_postgres")
when "Succeeded"
postgres_server.update(timeline_access: "push", representative_at: Time.now)
postgres_server.resource.incr_refresh_dns_record
postgres_server.resource.servers.each(&:incr_configure)
postgres_server.resource.servers.reject(&:primary?).each { _1.update(synchronization_status: "catching_up") }
postgres_server.incr_restart
hop_configure
when "Failed", "NotStarted"
vm.sshable.cmd("common/bin/daemonizer 'sudo pg_ctlcluster #{postgres_server.resource.version} main promote' promote_postgres")
nap 0
end
nap 5
end
label def destroy
decr_destroy
strand.children.each { _1.destroy }
vm.incr_destroy
postgres_server.destroy
pop "postgres server is deleted"
end
label def restart
decr_restart
vm.sshable.cmd("sudo postgres/bin/restart #{postgres_server.resource.version}")
vm.sshable.cmd("sudo systemctl restart pgbouncer")
pop "postgres server is restarted"
end
def refresh_walg_credentials
return if postgres_server.timeline.blob_storage.nil?
walg_config = postgres_server.timeline.generate_walg_config
vm.sshable.cmd("sudo -u postgres tee /etc/postgresql/wal-g.env > /dev/null", stdin: walg_config)
vm.sshable.cmd("sudo tee /usr/lib/ssl/certs/blob_storage_ca.crt > /dev/null", stdin: postgres_server.timeline.blob_storage.root_certs)
end
def available?
vm.sshable.invalidate_cache_entry
begin
postgres_server.run_query("SELECT 1")
return true
rescue
end
# Do not declare unavailability if Postgres is in crash recovery
begin
return true if vm.sshable.cmd("sudo tail -n 5 /dat/#{postgres_server.resource.version}/data/pg_log/postgresql.log").include?("redo in progress")
rescue
end
false
end
end