mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-07 07:11:58 +08:00
The Upgrade prog is responsible for matching the current Postgres version to the desired version. If there is a mismatch (current < desired), the Upgrade prog is launched and takes precedence over Convergence. Roughly, the Upgrade prog does the following: 1. Create a new "candidate standby" with the same version as the current one and wait for it to catch up. 2. Fence the current primary. 3. Upgrade the candidate standby to the desired version. 4. Switch the candidate to use a new timeline. 5. Take over from the current primary. 6. Prune any older version servers and exit. The convergence prog will take care of starting any new standbys needed. In case anything fails, we delete the candidate standby and unfence the primary to bring the database back. During the Upgrade health checking is effectively disabled as the auto-recovery causes conflicts with the several restarts of various versions on the candidate.
330 lines
11 KiB
Ruby
330 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "net/ssh"
|
|
require "uri"
|
|
require_relative "../../model"
|
|
|
|
class PostgresServer < Sequel::Model
|
|
one_to_one :strand, key: :id
|
|
many_to_one :resource, class: :PostgresResource, key: :resource_id
|
|
many_to_one :timeline, class: :PostgresTimeline, key: :timeline_id
|
|
one_to_one :vm, key: :id, primary_key: :vm_id
|
|
one_to_one :lsn_monitor, class: :PostgresLsnMonitor, key: :postgres_server_id
|
|
|
|
plugin :association_dependencies, lsn_monitor: :destroy
|
|
|
|
plugin ResourceMethods
|
|
plugin SemaphoreMethods, :initial_provisioning, :refresh_certificates, :update_superuser_password, :checkup,
|
|
:restart, :configure, :fence, :unfence, :planned_take_over, :unplanned_take_over, :configure_metrics,
|
|
:destroy, :recycle, :promote, :refresh_walg_credentials
|
|
include HealthMonitorMethods
|
|
include MetricsTargetMethods
|
|
|
|
def configure_hash
|
|
configs = {
|
|
"listen_addresses" => "'*'",
|
|
"max_connections" => "500",
|
|
"superuser_reserved_connections" => "3",
|
|
"shared_buffers" => "#{vm.memory_gib * 1024 / 4}MB",
|
|
"work_mem" => "#{[vm.memory_gib / 8, 1].max}MB",
|
|
"maintenance_work_mem" => "#{vm.memory_gib * 1024 / 16}MB",
|
|
"max_parallel_workers" => "4",
|
|
"max_parallel_workers_per_gather" => "2",
|
|
"max_parallel_maintenance_workers" => "2",
|
|
"min_wal_size" => "80MB",
|
|
"max_wal_size" => "5GB",
|
|
"random_page_cost" => "1.1",
|
|
"effective_cache_size" => "#{vm.memory_gib * 1024 * 3 / 4}MB",
|
|
"effective_io_concurrency" => "200",
|
|
"tcp_keepalives_count" => "4",
|
|
"tcp_keepalives_idle" => "2",
|
|
"tcp_keepalives_interval" => "2",
|
|
"ssl" => "on",
|
|
"ssl_min_protocol_version" => "TLSv1.3",
|
|
"ssl_ca_file" => "'/etc/ssl/certs/ca.crt'",
|
|
"ssl_cert_file" => "'/etc/ssl/certs/server.crt'",
|
|
"ssl_key_file" => "'/etc/ssl/certs/server.key'",
|
|
"log_timezone" => "'UTC'",
|
|
"log_directory" => "'pg_log'",
|
|
"log_filename" => "'postgresql.log'",
|
|
"log_truncate_on_rotation" => "true",
|
|
"logging_collector" => "on",
|
|
"timezone" => "'UTC'",
|
|
"lc_messages" => "'C.UTF-8'",
|
|
"lc_monetary" => "'C.UTF-8'",
|
|
"lc_numeric" => "'C.UTF-8'",
|
|
"lc_time" => "'C.UTF-8'",
|
|
"shared_preload_libraries" => "'pg_cron,pg_stat_statements'",
|
|
"cron.use_background_workers" => "on"
|
|
}
|
|
|
|
if resource.flavor == PostgresResource::Flavor::PARADEDB
|
|
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,pg_analytics,pg_search'"
|
|
elsif resource.flavor == PostgresResource::Flavor::LANTERN
|
|
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,lantern_extras'"
|
|
configs["lantern.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
|
|
configs["lantern.external_index_port"] = "443"
|
|
configs["lantern.external_index_secure"] = "true"
|
|
configs["hnsw.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
|
|
configs["hnsw.external_index_port"] = "443"
|
|
configs["hnsw.external_index_secure"] = "true"
|
|
end
|
|
|
|
if timeline.blob_storage
|
|
configs[:archive_mode] = "on"
|
|
configs[:archive_timeout] = "60"
|
|
configs[:archive_command] = "'/usr/bin/wal-g wal-push %p --config /etc/postgresql/wal-g.env'"
|
|
|
|
if primary?
|
|
if resource.ha_type == PostgresResource::HaType::SYNC
|
|
caught_up_standbys = resource.servers.select { it.standby? && it.synchronization_status == "ready" }
|
|
configs[:synchronous_standby_names] = "'ANY 1 (#{caught_up_standbys.map(&:ubid).join(",")})'" unless caught_up_standbys.empty?
|
|
end
|
|
end
|
|
|
|
if standby?
|
|
configs[:primary_conninfo] = "'#{resource.replication_connection_string(application_name: ubid)}'"
|
|
end
|
|
|
|
if doing_pitr?
|
|
configs[:recovery_target_time] = "'#{resource.restore_target}'"
|
|
end
|
|
|
|
if standby? || doing_pitr?
|
|
configs[:restore_command] = "'/usr/bin/wal-g wal-fetch %f %p --config /etc/postgresql/wal-g.env'"
|
|
end
|
|
|
|
if timeline.aws?
|
|
configs[:log_line_prefix] = "'%m [%p:%l] (%x,%v): host=%r,db=%d,user=%u,app=%a,client=%h '"
|
|
configs[:log_connections] = "on"
|
|
configs[:log_disconnections] = "on"
|
|
end
|
|
end
|
|
|
|
{
|
|
configs: configs,
|
|
user_config: resource.user_config,
|
|
pgbouncer_user_config: resource.pgbouncer_user_config,
|
|
private_subnets: vm.private_subnets.map {
|
|
{
|
|
net4: it.net4.to_s,
|
|
net6: it.net6.to_s
|
|
}
|
|
},
|
|
identity: resource.identity,
|
|
hosts: "#{resource.representative_server.vm.private_ipv4} #{resource.identity}",
|
|
pgbouncer_instances: (vm.vcpus / 2.0).ceil.clamp(1, 8),
|
|
metrics_config: metrics_config
|
|
}
|
|
end
|
|
|
|
def trigger_failover(mode:)
|
|
unless representative_at
|
|
Clog.emit("Cannot trigger failover on a non-representative server") { {ubid: ubid} }
|
|
return false
|
|
end
|
|
|
|
unless (standby = failover_target)
|
|
Clog.emit("No suitable standby found for failover") { {ubid: ubid} }
|
|
return false
|
|
end
|
|
|
|
standby.send(:"incr_#{mode}_take_over")
|
|
true
|
|
end
|
|
|
|
def primary?
|
|
timeline_access == "push"
|
|
end
|
|
|
|
def standby?
|
|
timeline_access == "fetch" && !doing_pitr?
|
|
end
|
|
|
|
def doing_pitr?
|
|
!resource.representative_server.primary?
|
|
end
|
|
|
|
def read_replica?
|
|
resource.read_replica?
|
|
end
|
|
|
|
def storage_size_gib
|
|
vm.vm_storage_volumes_dataset.reject(&:boot).sum(&:size_gib)
|
|
end
|
|
|
|
def needs_recycling?
|
|
recycle_set? || vm.display_size != resource.target_vm_size || storage_size_gib != resource.target_storage_size_gib
|
|
end
|
|
|
|
def lsn_caught_up
|
|
parent_server = if read_replica?
|
|
resource.parent&.representative_server
|
|
else
|
|
resource.representative_server
|
|
end
|
|
|
|
lsn_diff(parent_server&.current_lsn || current_lsn, current_lsn) < 80 * 1024 * 1024
|
|
end
|
|
|
|
def current_lsn
|
|
run_query("SELECT #{lsn_function}").chomp
|
|
end
|
|
|
|
def failover_target
|
|
target = resource.servers
|
|
.reject { it.representative_at }
|
|
.select { it.strand.label == "wait" && !it.needs_recycling? }
|
|
.map { {server: it, lsn: it.current_lsn} }
|
|
.max_by { lsn2int(it[:lsn]) }
|
|
|
|
return nil if target.nil?
|
|
|
|
if resource.ha_type == PostgresResource::HaType::ASYNC
|
|
return nil if lsn_monitor.last_known_lsn.nil?
|
|
return nil if lsn_diff(lsn_monitor.last_known_lsn, target[:lsn]) > 80 * 1024 * 1024 # 80 MB or ~5 WAL files
|
|
end
|
|
|
|
target[:server]
|
|
end
|
|
|
|
def lsn_function
|
|
if primary?
|
|
"pg_current_wal_lsn()"
|
|
elsif standby?
|
|
"pg_last_wal_receive_lsn()"
|
|
else
|
|
"pg_last_wal_replay_lsn()"
|
|
end
|
|
end
|
|
|
|
def init_health_monitor_session
|
|
FileUtils.rm_rf(health_monitor_socket_path)
|
|
FileUtils.mkdir_p(health_monitor_socket_path)
|
|
|
|
ssh_session = vm.sshable.start_fresh_session
|
|
ssh_session.forward.local_socket(File.join(health_monitor_socket_path, ".s.PGSQL.5432"), "/var/run/postgresql/.s.PGSQL.5432")
|
|
{
|
|
ssh_session: ssh_session,
|
|
db_connection: nil
|
|
}
|
|
end
|
|
|
|
def init_metrics_export_session
|
|
ssh_session = vm.sshable.start_fresh_session
|
|
{
|
|
ssh_session: ssh_session
|
|
}
|
|
end
|
|
|
|
def check_pulse(session:, previous_pulse:)
|
|
reading = begin
|
|
session[:db_connection] ||= Sequel.connect(adapter: "postgres", host: health_monitor_socket_path, user: "postgres", connect_timeout: 4, keep_reference: false)
|
|
last_known_lsn = session[:db_connection]["SELECT #{lsn_function} AS lsn"].first[:lsn]
|
|
"up"
|
|
rescue
|
|
"down"
|
|
end
|
|
pulse = aggregate_readings(previous_pulse: previous_pulse, reading: reading, data: {last_known_lsn: last_known_lsn})
|
|
|
|
DB.transaction do
|
|
if pulse[:reading] == "up" && pulse[:reading_rpt] % 12 == 1
|
|
begin
|
|
PostgresLsnMonitor.new(last_known_lsn: last_known_lsn) { it.postgres_server_id = id }
|
|
.insert_conflict(
|
|
target: :postgres_server_id,
|
|
update: {last_known_lsn: last_known_lsn}
|
|
).save_changes
|
|
rescue Sequel::Error => ex
|
|
Clog.emit("Failed to update PostgresLsnMonitor") { {lsn_update_error: {ubid: ubid, last_known_lsn: last_known_lsn, exception: Util.exception_to_hash(ex)}} }
|
|
end
|
|
end
|
|
|
|
if pulse[:reading] == "down" && pulse[:reading_rpt] > 5 && Time.now - pulse[:reading_chg] > 30 && !reload.checkup_set? && !resource.needs_upgrade?
|
|
incr_checkup
|
|
end
|
|
end
|
|
|
|
pulse
|
|
end
|
|
|
|
def needs_event_loop_for_pulse_check?
|
|
true
|
|
end
|
|
|
|
def health_monitor_socket_path
|
|
@health_monitor_socket_path ||= File.join(Dir.pwd, "var", "health_monitor_sockets", "pg_#{vm.ip6}")
|
|
end
|
|
|
|
def lsn2int(lsn)
|
|
lsn.split("/").map { it.rjust(8, "0") }.join.hex
|
|
end
|
|
|
|
def lsn_diff(lsn1, lsn2)
|
|
lsn2int(lsn1) - lsn2int(lsn2)
|
|
end
|
|
|
|
def run_query(query)
|
|
vm.sshable.cmd("PGOPTIONS='-c statement_timeout=60s' psql -U postgres -t --csv -v 'ON_ERROR_STOP=1'", stdin: query).chomp
|
|
end
|
|
|
|
def metrics_config
|
|
ignored_timeseries_patterns = [
|
|
"pg_stat_user_tables_.*",
|
|
"pg_statio_user_tables_.*"
|
|
]
|
|
exclude_pattern = ignored_timeseries_patterns.join("|")
|
|
query_params = {
|
|
"match[]": "{__name__!~'#{exclude_pattern}'}"
|
|
}
|
|
query_str = URI.encode_www_form(query_params)
|
|
|
|
{
|
|
endpoints: [
|
|
"https://localhost:9090/federate?#{query_str}"
|
|
],
|
|
max_file_retention: 120,
|
|
interval: "15s",
|
|
additional_labels: {},
|
|
metrics_dir: "/home/ubi/postgres/metrics",
|
|
project_id: Config.postgres_service_project_id
|
|
}
|
|
end
|
|
|
|
def storage_device_paths
|
|
if vm.location.aws?
|
|
# On AWS, pick the largest block device to use as the data disk,
|
|
# since the device path detected by the VmStorageVolume is not always
|
|
# correct.
|
|
storage_device_count = vm.vm_storage_volumes.count { it.boot == false }
|
|
vm.sshable.cmd("lsblk -b -d -o NAME,SIZE | sort -n -k2 | tail -n#{storage_device_count} | awk '{print \"/dev/\"$1}'").strip.split
|
|
else
|
|
[vm.vm_storage_volumes.find { it.boot == false }.device_path.shellescape]
|
|
end
|
|
end
|
|
|
|
def taking_over?
|
|
unplanned_take_over_set? || planned_take_over_set? || FAILOVER_LABELS.include?(strand.label)
|
|
end
|
|
|
|
FAILOVER_LABELS = ["prepare_for_unplanned_take_over", "prepare_for_planned_take_over", "wait_fencing_of_old_primary", "taking_over"].freeze
|
|
end
|
|
|
|
# Table: postgres_server
|
|
# Columns:
|
|
# id | uuid | PRIMARY KEY
|
|
# created_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# updated_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# resource_id | uuid | NOT NULL
|
|
# vm_id | uuid |
|
|
# timeline_id | uuid | NOT NULL
|
|
# timeline_access | timeline_access | NOT NULL DEFAULT 'push'::timeline_access
|
|
# representative_at | timestamp with time zone |
|
|
# synchronization_status | synchronization_status | NOT NULL DEFAULT 'ready'::synchronization_status
|
|
# version | postgres_version |
|
|
# Indexes:
|
|
# postgres_server_pkey1 | PRIMARY KEY btree (id)
|
|
# postgres_server_resource_id_index | UNIQUE btree (resource_id) WHERE representative_at IS NOT NULL
|
|
# Foreign key constraints:
|
|
# postgres_server_timeline_id_fkey | (timeline_id) REFERENCES postgres_timeline(id)
|
|
# postgres_server_vm_id_fkey | (vm_id) REFERENCES vm(id)
|