ubicloud/model/postgres/postgres_server.rb
shikharbhardwaj 56bf890ae2
Implement Prog::Postgres::UpgradePostgresResource
The Upgrade prog is responsible for matching the current Postgres
version to the desired version. If there is a mismatch (current <
desired), the Upgrade prog is launched and takes precedence over
Convergence.

Roughly, the Upgrade prog does the following:
1. Create a new "candidate standby" with the same version as the current
   one and wait for it to catch up.
2. Fence the current primary.
3. Upgrade the candidate standby to the desired version.
4. Switch the candidate to use a new timeline.
5. Take over from the current primary.
6. Prune any older version servers and exit. The convergence prog will
   take care of starting any new standbys needed.

In case anything fails, we delete the candidate standby and unfence the
primary to bring the database back. During the Upgrade health checking
is effectively disabled as the auto-recovery causes conflicts with the
several restarts of various versions on the candidate.
2025-09-15 19:41:58 +02:00

330 lines
11 KiB
Ruby

# frozen_string_literal: true
require "net/ssh"
require "uri"
require_relative "../../model"
class PostgresServer < Sequel::Model
one_to_one :strand, key: :id
many_to_one :resource, class: :PostgresResource, key: :resource_id
many_to_one :timeline, class: :PostgresTimeline, key: :timeline_id
one_to_one :vm, key: :id, primary_key: :vm_id
one_to_one :lsn_monitor, class: :PostgresLsnMonitor, key: :postgres_server_id
plugin :association_dependencies, lsn_monitor: :destroy
plugin ResourceMethods
plugin SemaphoreMethods, :initial_provisioning, :refresh_certificates, :update_superuser_password, :checkup,
:restart, :configure, :fence, :unfence, :planned_take_over, :unplanned_take_over, :configure_metrics,
:destroy, :recycle, :promote, :refresh_walg_credentials
include HealthMonitorMethods
include MetricsTargetMethods
def configure_hash
configs = {
"listen_addresses" => "'*'",
"max_connections" => "500",
"superuser_reserved_connections" => "3",
"shared_buffers" => "#{vm.memory_gib * 1024 / 4}MB",
"work_mem" => "#{[vm.memory_gib / 8, 1].max}MB",
"maintenance_work_mem" => "#{vm.memory_gib * 1024 / 16}MB",
"max_parallel_workers" => "4",
"max_parallel_workers_per_gather" => "2",
"max_parallel_maintenance_workers" => "2",
"min_wal_size" => "80MB",
"max_wal_size" => "5GB",
"random_page_cost" => "1.1",
"effective_cache_size" => "#{vm.memory_gib * 1024 * 3 / 4}MB",
"effective_io_concurrency" => "200",
"tcp_keepalives_count" => "4",
"tcp_keepalives_idle" => "2",
"tcp_keepalives_interval" => "2",
"ssl" => "on",
"ssl_min_protocol_version" => "TLSv1.3",
"ssl_ca_file" => "'/etc/ssl/certs/ca.crt'",
"ssl_cert_file" => "'/etc/ssl/certs/server.crt'",
"ssl_key_file" => "'/etc/ssl/certs/server.key'",
"log_timezone" => "'UTC'",
"log_directory" => "'pg_log'",
"log_filename" => "'postgresql.log'",
"log_truncate_on_rotation" => "true",
"logging_collector" => "on",
"timezone" => "'UTC'",
"lc_messages" => "'C.UTF-8'",
"lc_monetary" => "'C.UTF-8'",
"lc_numeric" => "'C.UTF-8'",
"lc_time" => "'C.UTF-8'",
"shared_preload_libraries" => "'pg_cron,pg_stat_statements'",
"cron.use_background_workers" => "on"
}
if resource.flavor == PostgresResource::Flavor::PARADEDB
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,pg_analytics,pg_search'"
elsif resource.flavor == PostgresResource::Flavor::LANTERN
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,lantern_extras'"
configs["lantern.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
configs["lantern.external_index_port"] = "443"
configs["lantern.external_index_secure"] = "true"
configs["hnsw.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
configs["hnsw.external_index_port"] = "443"
configs["hnsw.external_index_secure"] = "true"
end
if timeline.blob_storage
configs[:archive_mode] = "on"
configs[:archive_timeout] = "60"
configs[:archive_command] = "'/usr/bin/wal-g wal-push %p --config /etc/postgresql/wal-g.env'"
if primary?
if resource.ha_type == PostgresResource::HaType::SYNC
caught_up_standbys = resource.servers.select { it.standby? && it.synchronization_status == "ready" }
configs[:synchronous_standby_names] = "'ANY 1 (#{caught_up_standbys.map(&:ubid).join(",")})'" unless caught_up_standbys.empty?
end
end
if standby?
configs[:primary_conninfo] = "'#{resource.replication_connection_string(application_name: ubid)}'"
end
if doing_pitr?
configs[:recovery_target_time] = "'#{resource.restore_target}'"
end
if standby? || doing_pitr?
configs[:restore_command] = "'/usr/bin/wal-g wal-fetch %f %p --config /etc/postgresql/wal-g.env'"
end
if timeline.aws?
configs[:log_line_prefix] = "'%m [%p:%l] (%x,%v): host=%r,db=%d,user=%u,app=%a,client=%h '"
configs[:log_connections] = "on"
configs[:log_disconnections] = "on"
end
end
{
configs: configs,
user_config: resource.user_config,
pgbouncer_user_config: resource.pgbouncer_user_config,
private_subnets: vm.private_subnets.map {
{
net4: it.net4.to_s,
net6: it.net6.to_s
}
},
identity: resource.identity,
hosts: "#{resource.representative_server.vm.private_ipv4} #{resource.identity}",
pgbouncer_instances: (vm.vcpus / 2.0).ceil.clamp(1, 8),
metrics_config: metrics_config
}
end
def trigger_failover(mode:)
unless representative_at
Clog.emit("Cannot trigger failover on a non-representative server") { {ubid: ubid} }
return false
end
unless (standby = failover_target)
Clog.emit("No suitable standby found for failover") { {ubid: ubid} }
return false
end
standby.send(:"incr_#{mode}_take_over")
true
end
def primary?
timeline_access == "push"
end
def standby?
timeline_access == "fetch" && !doing_pitr?
end
def doing_pitr?
!resource.representative_server.primary?
end
def read_replica?
resource.read_replica?
end
def storage_size_gib
vm.vm_storage_volumes_dataset.reject(&:boot).sum(&:size_gib)
end
def needs_recycling?
recycle_set? || vm.display_size != resource.target_vm_size || storage_size_gib != resource.target_storage_size_gib
end
def lsn_caught_up
parent_server = if read_replica?
resource.parent&.representative_server
else
resource.representative_server
end
lsn_diff(parent_server&.current_lsn || current_lsn, current_lsn) < 80 * 1024 * 1024
end
def current_lsn
run_query("SELECT #{lsn_function}").chomp
end
def failover_target
target = resource.servers
.reject { it.representative_at }
.select { it.strand.label == "wait" && !it.needs_recycling? }
.map { {server: it, lsn: it.current_lsn} }
.max_by { lsn2int(it[:lsn]) }
return nil if target.nil?
if resource.ha_type == PostgresResource::HaType::ASYNC
return nil if lsn_monitor.last_known_lsn.nil?
return nil if lsn_diff(lsn_monitor.last_known_lsn, target[:lsn]) > 80 * 1024 * 1024 # 80 MB or ~5 WAL files
end
target[:server]
end
def lsn_function
if primary?
"pg_current_wal_lsn()"
elsif standby?
"pg_last_wal_receive_lsn()"
else
"pg_last_wal_replay_lsn()"
end
end
def init_health_monitor_session
FileUtils.rm_rf(health_monitor_socket_path)
FileUtils.mkdir_p(health_monitor_socket_path)
ssh_session = vm.sshable.start_fresh_session
ssh_session.forward.local_socket(File.join(health_monitor_socket_path, ".s.PGSQL.5432"), "/var/run/postgresql/.s.PGSQL.5432")
{
ssh_session: ssh_session,
db_connection: nil
}
end
def init_metrics_export_session
ssh_session = vm.sshable.start_fresh_session
{
ssh_session: ssh_session
}
end
def check_pulse(session:, previous_pulse:)
reading = begin
session[:db_connection] ||= Sequel.connect(adapter: "postgres", host: health_monitor_socket_path, user: "postgres", connect_timeout: 4, keep_reference: false)
last_known_lsn = session[:db_connection]["SELECT #{lsn_function} AS lsn"].first[:lsn]
"up"
rescue
"down"
end
pulse = aggregate_readings(previous_pulse: previous_pulse, reading: reading, data: {last_known_lsn: last_known_lsn})
DB.transaction do
if pulse[:reading] == "up" && pulse[:reading_rpt] % 12 == 1
begin
PostgresLsnMonitor.new(last_known_lsn: last_known_lsn) { it.postgres_server_id = id }
.insert_conflict(
target: :postgres_server_id,
update: {last_known_lsn: last_known_lsn}
).save_changes
rescue Sequel::Error => ex
Clog.emit("Failed to update PostgresLsnMonitor") { {lsn_update_error: {ubid: ubid, last_known_lsn: last_known_lsn, exception: Util.exception_to_hash(ex)}} }
end
end
if pulse[:reading] == "down" && pulse[:reading_rpt] > 5 && Time.now - pulse[:reading_chg] > 30 && !reload.checkup_set? && !resource.needs_upgrade?
incr_checkup
end
end
pulse
end
def needs_event_loop_for_pulse_check?
true
end
def health_monitor_socket_path
@health_monitor_socket_path ||= File.join(Dir.pwd, "var", "health_monitor_sockets", "pg_#{vm.ip6}")
end
def lsn2int(lsn)
lsn.split("/").map { it.rjust(8, "0") }.join.hex
end
def lsn_diff(lsn1, lsn2)
lsn2int(lsn1) - lsn2int(lsn2)
end
def run_query(query)
vm.sshable.cmd("PGOPTIONS='-c statement_timeout=60s' psql -U postgres -t --csv -v 'ON_ERROR_STOP=1'", stdin: query).chomp
end
def metrics_config
ignored_timeseries_patterns = [
"pg_stat_user_tables_.*",
"pg_statio_user_tables_.*"
]
exclude_pattern = ignored_timeseries_patterns.join("|")
query_params = {
"match[]": "{__name__!~'#{exclude_pattern}'}"
}
query_str = URI.encode_www_form(query_params)
{
endpoints: [
"https://localhost:9090/federate?#{query_str}"
],
max_file_retention: 120,
interval: "15s",
additional_labels: {},
metrics_dir: "/home/ubi/postgres/metrics",
project_id: Config.postgres_service_project_id
}
end
def storage_device_paths
if vm.location.aws?
# On AWS, pick the largest block device to use as the data disk,
# since the device path detected by the VmStorageVolume is not always
# correct.
storage_device_count = vm.vm_storage_volumes.count { it.boot == false }
vm.sshable.cmd("lsblk -b -d -o NAME,SIZE | sort -n -k2 | tail -n#{storage_device_count} | awk '{print \"/dev/\"$1}'").strip.split
else
[vm.vm_storage_volumes.find { it.boot == false }.device_path.shellescape]
end
end
def taking_over?
unplanned_take_over_set? || planned_take_over_set? || FAILOVER_LABELS.include?(strand.label)
end
FAILOVER_LABELS = ["prepare_for_unplanned_take_over", "prepare_for_planned_take_over", "wait_fencing_of_old_primary", "taking_over"].freeze
end
# Table: postgres_server
# Columns:
# id | uuid | PRIMARY KEY
# created_at | timestamp with time zone | NOT NULL DEFAULT now()
# updated_at | timestamp with time zone | NOT NULL DEFAULT now()
# resource_id | uuid | NOT NULL
# vm_id | uuid |
# timeline_id | uuid | NOT NULL
# timeline_access | timeline_access | NOT NULL DEFAULT 'push'::timeline_access
# representative_at | timestamp with time zone |
# synchronization_status | synchronization_status | NOT NULL DEFAULT 'ready'::synchronization_status
# version | postgres_version |
# Indexes:
# postgres_server_pkey1 | PRIMARY KEY btree (id)
# postgres_server_resource_id_index | UNIQUE btree (resource_id) WHERE representative_at IS NOT NULL
# Foreign key constraints:
# postgres_server_timeline_id_fkey | (timeline_id) REFERENCES postgres_timeline(id)
# postgres_server_vm_id_fkey | (vm_id) REFERENCES vm(id)