ubicloud/model/postgres/postgres_server.rb
shikharbhardwaj a0b83b3ee6 Add Upgrade steps to Prog::Postgres::ConvergePostgresResource
The Converge prog is now also responsible for matching the current
Postgres version to the desired version. If there is a mismatch (current
< desired), the Converge prog is launched.

Roughly, the Converge prog does the following:
1. Provisions new servers. In case of upgrades, it only provisions upto
   one new standby if no existing standby is suitable for
   upgrades.
2. Wait for the required servers to be ready.
3. Wait for the maintenance window to start.
4. Fence the primary server, and launch pg_upgrade.
5. If the upgrade is successful, replace the current primary with the
   candidate standby.

In case the upgrade fails, we delete the candidate standby and unfence the
primary to bring the database back. During the Upgrade health checking
is effectively disabled as the auto-recovery causes conflicts with the
several restarts of various versions on the candidate.
2025-10-07 14:30:39 +02:00

351 lines
12 KiB
Ruby

# frozen_string_literal: true
require "net/ssh"
require "uri"
require_relative "../../model"
class PostgresServer < Sequel::Model
one_to_one :strand, key: :id
many_to_one :resource, class: :PostgresResource, key: :resource_id
many_to_one :timeline, class: :PostgresTimeline, key: :timeline_id
one_to_one :vm, key: :id, primary_key: :vm_id
one_to_one :lsn_monitor, class: :PostgresLsnMonitor, key: :postgres_server_id
plugin :association_dependencies, lsn_monitor: :destroy
plugin ResourceMethods
plugin SemaphoreMethods, :initial_provisioning, :refresh_certificates, :update_superuser_password, :checkup,
:restart, :configure, :fence, :unfence, :planned_take_over, :unplanned_take_over, :configure_metrics,
:destroy, :recycle, :promote, :refresh_walg_credentials
include HealthMonitorMethods
include MetricsTargetMethods
def self.victoria_metrics_client
VictoriaMetricsResource.client_for_project(Config.postgres_service_project_id)
end
def configure_hash
configs = {
"listen_addresses" => "'*'",
"max_connections" => "500",
"superuser_reserved_connections" => "3",
"shared_buffers" => "#{vm.memory_gib * 1024 / 4}MB",
"work_mem" => "#{[vm.memory_gib / 8, 1].max}MB",
"maintenance_work_mem" => "#{vm.memory_gib * 1024 / 16}MB",
"max_parallel_workers" => "4",
"max_parallel_workers_per_gather" => "2",
"max_parallel_maintenance_workers" => "2",
"min_wal_size" => "80MB",
"max_wal_size" => "5GB",
"random_page_cost" => "1.1",
"effective_cache_size" => "#{vm.memory_gib * 1024 * 3 / 4}MB",
"effective_io_concurrency" => "200",
"tcp_keepalives_count" => "4",
"tcp_keepalives_idle" => "2",
"tcp_keepalives_interval" => "2",
"ssl" => "on",
"ssl_min_protocol_version" => "TLSv1.3",
"ssl_ca_file" => "'/etc/ssl/certs/ca.crt'",
"ssl_cert_file" => "'/etc/ssl/certs/server.crt'",
"ssl_key_file" => "'/etc/ssl/certs/server.key'",
"log_timezone" => "'UTC'",
"log_directory" => "'pg_log'",
"log_filename" => "'postgresql.log'",
"log_truncate_on_rotation" => "true",
"logging_collector" => "on",
"timezone" => "'UTC'",
"lc_messages" => "'C.UTF-8'",
"lc_monetary" => "'C.UTF-8'",
"lc_numeric" => "'C.UTF-8'",
"lc_time" => "'C.UTF-8'",
"shared_preload_libraries" => "'pg_cron,pg_stat_statements'",
"cron.use_background_workers" => "on"
}
if resource.flavor == PostgresResource::Flavor::PARADEDB
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,pg_analytics,pg_search'"
elsif resource.flavor == PostgresResource::Flavor::LANTERN
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,lantern_extras'"
configs["lantern.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
configs["lantern.external_index_port"] = "443"
configs["lantern.external_index_secure"] = "true"
configs["hnsw.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
configs["hnsw.external_index_port"] = "443"
configs["hnsw.external_index_secure"] = "true"
end
if timeline.blob_storage
configs[:archive_mode] = "on"
configs[:archive_timeout] = "60"
configs[:archive_command] = "'/usr/bin/wal-g wal-push %p --config /etc/postgresql/wal-g.env'"
if primary?
if resource.ha_type == PostgresResource::HaType::SYNC
caught_up_standbys = resource.servers.select { it.standby? && it.synchronization_status == "ready" }
configs[:synchronous_standby_names] = "'ANY 1 (#{caught_up_standbys.map(&:ubid).join(",")})'" unless caught_up_standbys.empty?
end
end
if standby?
configs[:primary_conninfo] = "'#{resource.replication_connection_string(application_name: ubid)}'"
end
if doing_pitr?
configs[:recovery_target_time] = "'#{resource.restore_target}'"
end
if standby? || doing_pitr?
configs[:restore_command] = "'/usr/bin/wal-g wal-fetch %f %p --config /etc/postgresql/wal-g.env'"
end
if timeline.aws?
configs[:log_line_prefix] = "'%m [%p:%l] (%x,%v): host=%r,db=%d,user=%u,app=%a,client=%h '"
configs[:log_connections] = "on"
configs[:log_disconnections] = "on"
end
end
{
configs: configs,
user_config: resource.user_config,
pgbouncer_user_config: resource.pgbouncer_user_config,
private_subnets: vm.private_subnets.map {
{
net4: it.net4.to_s,
net6: it.net6.to_s
}
},
identity: resource.identity,
hosts: "#{resource.representative_server.vm.private_ipv4} #{resource.identity}",
pgbouncer_instances: (vm.vcpus / 2.0).ceil.clamp(1, 8),
metrics_config: metrics_config
}
end
def trigger_failover(mode:)
unless representative_at
Clog.emit("Cannot trigger failover on a non-representative server") { {ubid: ubid} }
return false
end
unless (standby = failover_target)
Clog.emit("No suitable standby found for failover") { {ubid: ubid} }
return false
end
standby.send(:"incr_#{mode}_take_over")
true
end
def primary?
timeline_access == "push"
end
def standby?
timeline_access == "fetch" && !doing_pitr?
end
def doing_pitr?
!resource.representative_server.primary?
end
def read_replica?
resource.read_replica?
end
def storage_size_gib
vm.vm_storage_volumes_dataset.reject(&:boot).sum(&:size_gib)
end
def needs_recycling?
recycle_set? || vm.display_size != resource.target_vm_size || storage_size_gib != resource.target_storage_size_gib || version != resource.target_version
end
def lsn_caught_up
parent_server = if read_replica?
resource.parent&.representative_server
else
resource.representative_server
end
lsn_diff(parent_server&.current_lsn || current_lsn, current_lsn) < 80 * 1024 * 1024
end
def current_lsn
run_query("SELECT #{lsn_function}").chomp
end
def failover_target
target = resource.servers
.reject { it.representative_at }
.select { it.strand.label == "wait" && !it.needs_recycling? }
.map { {server: it, lsn: it.current_lsn} }
.max_by { lsn2int(it[:lsn]) }
return nil if target.nil?
if resource.ha_type == PostgresResource::HaType::ASYNC
return nil if lsn_monitor.last_known_lsn.nil?
return nil if lsn_diff(lsn_monitor.last_known_lsn, target[:lsn]) > 80 * 1024 * 1024 # 80 MB or ~5 WAL files
end
target[:server]
end
def lsn_function
if primary?
"pg_current_wal_lsn()"
elsif standby?
"pg_last_wal_receive_lsn()"
else
"pg_last_wal_replay_lsn()"
end
end
def init_health_monitor_session
FileUtils.rm_rf(health_monitor_socket_path)
FileUtils.mkdir_p(health_monitor_socket_path)
ssh_session = vm.sshable.start_fresh_session
ssh_session.forward.local_socket(File.join(health_monitor_socket_path, ".s.PGSQL.5432"), "/var/run/postgresql/.s.PGSQL.5432")
{
ssh_session: ssh_session,
db_connection: nil
}
end
def init_metrics_export_session
ssh_session = vm.sshable.start_fresh_session
{
ssh_session: ssh_session
}
end
def check_pulse(session:, previous_pulse:)
reading = begin
session[:db_connection] ||= Sequel.connect(adapter: "postgres", host: health_monitor_socket_path, user: "postgres", connect_timeout: 4, keep_reference: false)
last_known_lsn = session[:db_connection]["SELECT #{lsn_function} AS lsn"].first[:lsn]
"up"
rescue
"down"
end
pulse = aggregate_readings(previous_pulse: previous_pulse, reading: reading, data: {last_known_lsn: last_known_lsn})
DB.transaction do
if pulse[:reading] == "up" && pulse[:reading_rpt] % 12 == 1
begin
PostgresLsnMonitor.new(last_known_lsn: last_known_lsn) { it.postgres_server_id = id }
.insert_conflict(
target: :postgres_server_id,
update: {last_known_lsn: last_known_lsn}
).save_changes
rescue Sequel::Error => ex
Clog.emit("Failed to update PostgresLsnMonitor") { {lsn_update_error: {ubid: ubid, last_known_lsn: last_known_lsn, exception: Util.exception_to_hash(ex)}} }
end
end
if pulse[:reading] == "down" && pulse[:reading_rpt] > 5 && Time.now - pulse[:reading_chg] > 30 && !reload.checkup_set?
incr_checkup
end
end
pulse
end
def needs_event_loop_for_pulse_check?
true
end
def health_monitor_socket_path
@health_monitor_socket_path ||= File.join(Dir.pwd, "var", "health_monitor_sockets", "pg_#{vm.ip6}")
end
def lsn2int(lsn)
lsn.split("/").map { it.rjust(8, "0") }.join.hex
end
def lsn_diff(lsn1, lsn2)
lsn2int(lsn1) - lsn2int(lsn2)
end
def run_query(query)
vm.sshable.cmd("PGOPTIONS='-c statement_timeout=60s' psql -U postgres -t --csv -v 'ON_ERROR_STOP=1'", stdin: query).chomp
end
def metrics_config
ignored_timeseries_patterns = [
"pg_stat_user_tables_.*",
"pg_statio_user_tables_.*"
]
exclude_pattern = ignored_timeseries_patterns.join("|")
query_params = {
"match[]": "{__name__!~'#{exclude_pattern}'}"
}
query_str = URI.encode_www_form(query_params)
{
endpoints: [
"https://localhost:9090/federate?#{query_str}"
],
max_file_retention: 120,
interval: "15s",
additional_labels: {},
metrics_dir: "/home/ubi/postgres/metrics",
project_id: Config.postgres_service_project_id
}
end
def storage_device_paths
if vm.location.aws?
# On AWS, pick the largest block device to use as the data disk,
# since the device path detected by the VmStorageVolume is not always
# correct.
storage_device_count = vm.vm_storage_volumes.count { it.boot == false }
vm.sshable.cmd("lsblk -b -d -o NAME,SIZE | sort -n -k2 | tail -n#{storage_device_count} | awk '{print \"/dev/\"$1}'").strip.split
else
[vm.vm_storage_volumes.find { it.boot == false }.device_path.shellescape]
end
end
def taking_over?
unplanned_take_over_set? || planned_take_over_set? || FAILOVER_LABELS.include?(strand.label)
end
def switch_to_new_timeline(parent_id: timeline.id)
update(
timeline_id: Prog::Postgres::PostgresTimelineNexus.assemble(location_id: resource.location_id, parent_id: parent_id).id,
timeline_access: "push"
)
refresh_walg_credentials
end
def refresh_walg_credentials
return if timeline.blob_storage.nil?
walg_config = timeline.generate_walg_config
vm.sshable.cmd("sudo -u postgres tee /etc/postgresql/wal-g.env > /dev/null", stdin: walg_config)
vm.sshable.cmd("sudo tee /usr/lib/ssl/certs/blob_storage_ca.crt > /dev/null", stdin: timeline.blob_storage.root_certs) unless timeline.aws?
end
FAILOVER_LABELS = ["prepare_for_unplanned_take_over", "prepare_for_planned_take_over", "wait_fencing_of_old_primary", "taking_over"].freeze
end
# Table: postgres_server
# Columns:
# id | uuid | PRIMARY KEY
# created_at | timestamp with time zone | NOT NULL DEFAULT now()
# updated_at | timestamp with time zone | NOT NULL DEFAULT now()
# resource_id | uuid | NOT NULL
# vm_id | uuid |
# timeline_id | uuid | NOT NULL
# timeline_access | timeline_access | NOT NULL DEFAULT 'push'::timeline_access
# representative_at | timestamp with time zone |
# synchronization_status | synchronization_status | NOT NULL DEFAULT 'ready'::synchronization_status
# version | postgres_version | NOT NULL
# Indexes:
# postgres_server_pkey1 | PRIMARY KEY btree (id)
# postgres_server_resource_id_index | UNIQUE btree (resource_id) WHERE representative_at IS NOT NULL
# Foreign key constraints:
# postgres_server_timeline_id_fkey | (timeline_id) REFERENCES postgres_timeline(id)
# postgres_server_vm_id_fkey | (vm_id) REFERENCES vm(id)