330 lines
11 KiB
Ruby
330 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "net/ssh"
|
|
require "uri"
|
|
require_relative "../../model"
|
|
|
|
class PostgresServer < Sequel::Model
|
|
one_to_one :strand, key: :id
|
|
many_to_one :resource, class: :PostgresResource, key: :resource_id
|
|
many_to_one :timeline, class: :PostgresTimeline, key: :timeline_id
|
|
one_to_one :vm, key: :id, primary_key: :vm_id
|
|
one_to_one :lsn_monitor, class: :PostgresLsnMonitor, key: :postgres_server_id
|
|
|
|
plugin :association_dependencies, lsn_monitor: :destroy
|
|
|
|
plugin ResourceMethods
|
|
plugin SemaphoreMethods, :initial_provisioning, :refresh_certificates, :update_superuser_password, :checkup,
|
|
:restart, :configure, :fence, :planned_take_over, :unplanned_take_over, :configure_metrics,
|
|
:destroy, :recycle, :promote, :refresh_walg_credentials
|
|
include HealthMonitorMethods
|
|
include MetricsTargetMethods
|
|
|
|
def configure_hash
|
|
configs = {
|
|
"listen_addresses" => "'*'",
|
|
"max_connections" => "500",
|
|
"superuser_reserved_connections" => "3",
|
|
"shared_buffers" => "#{vm.memory_gib * 1024 / 4}MB",
|
|
"work_mem" => "#{[vm.memory_gib / 8, 1].max}MB",
|
|
"maintenance_work_mem" => "#{vm.memory_gib * 1024 / 16}MB",
|
|
"max_parallel_workers" => "4",
|
|
"max_parallel_workers_per_gather" => "2",
|
|
"max_parallel_maintenance_workers" => "2",
|
|
"min_wal_size" => "80MB",
|
|
"max_wal_size" => "5GB",
|
|
"random_page_cost" => "1.1",
|
|
"effective_cache_size" => "#{vm.memory_gib * 1024 * 3 / 4}MB",
|
|
"effective_io_concurrency" => "200",
|
|
"tcp_keepalives_count" => "4",
|
|
"tcp_keepalives_idle" => "2",
|
|
"tcp_keepalives_interval" => "2",
|
|
"ssl" => "on",
|
|
"ssl_min_protocol_version" => "TLSv1.3",
|
|
"ssl_ca_file" => "'/etc/ssl/certs/ca.crt'",
|
|
"ssl_cert_file" => "'/etc/ssl/certs/server.crt'",
|
|
"ssl_key_file" => "'/etc/ssl/certs/server.key'",
|
|
"log_timezone" => "'UTC'",
|
|
"log_directory" => "'pg_log'",
|
|
"log_filename" => "'postgresql.log'",
|
|
"log_truncate_on_rotation" => "true",
|
|
"logging_collector" => "on",
|
|
"timezone" => "'UTC'",
|
|
"lc_messages" => "'C.UTF-8'",
|
|
"lc_monetary" => "'C.UTF-8'",
|
|
"lc_numeric" => "'C.UTF-8'",
|
|
"lc_time" => "'C.UTF-8'",
|
|
"shared_preload_libraries" => "'pg_cron,pg_stat_statements'",
|
|
"cron.use_background_workers" => "on"
|
|
}
|
|
|
|
if resource.flavor == PostgresResource::Flavor::PARADEDB
|
|
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,pg_analytics,pg_search'"
|
|
elsif resource.flavor == PostgresResource::Flavor::LANTERN
|
|
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,lantern_extras'"
|
|
configs["lantern.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
|
|
configs["lantern.external_index_port"] = "443"
|
|
configs["lantern.external_index_secure"] = "true"
|
|
configs["hnsw.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
|
|
configs["hnsw.external_index_port"] = "443"
|
|
configs["hnsw.external_index_secure"] = "true"
|
|
end
|
|
|
|
if timeline.blob_storage
|
|
configs[:archive_mode] = "on"
|
|
configs[:archive_timeout] = "60"
|
|
configs[:archive_command] = "'/usr/bin/wal-g wal-push %p --config /etc/postgresql/wal-g.env'"
|
|
|
|
if primary?
|
|
if resource.ha_type == PostgresResource::HaType::SYNC
|
|
caught_up_standbys = resource.servers.select { it.standby? && it.synchronization_status == "ready" }
|
|
configs[:synchronous_standby_names] = "'ANY 1 (#{caught_up_standbys.map(&:ubid).join(",")})'" unless caught_up_standbys.empty?
|
|
end
|
|
end
|
|
|
|
if standby?
|
|
configs[:primary_conninfo] = "'#{resource.replication_connection_string(application_name: ubid)}'"
|
|
end
|
|
|
|
if doing_pitr?
|
|
configs[:recovery_target_time] = "'#{resource.restore_target}'"
|
|
end
|
|
|
|
if standby? || doing_pitr?
|
|
configs[:restore_command] = "'/usr/bin/wal-g wal-fetch %f %p --config /etc/postgresql/wal-g.env'"
|
|
end
|
|
|
|
if timeline.aws?
|
|
configs[:log_line_prefix] = "'%m [%p:%l] (%x,%v): host=%r,db=%d,user=%u,app=%a,client=%h '"
|
|
configs[:log_connections] = "on"
|
|
configs[:log_disconnections] = "on"
|
|
end
|
|
end
|
|
|
|
{
|
|
configs: configs,
|
|
user_config: resource.user_config,
|
|
pgbouncer_user_config: resource.pgbouncer_user_config,
|
|
private_subnets: vm.private_subnets.map {
|
|
{
|
|
net4: it.net4.to_s,
|
|
net6: it.net6.to_s
|
|
}
|
|
},
|
|
identity: resource.identity,
|
|
hosts: "#{resource.representative_server.vm.private_ipv4} #{resource.identity}",
|
|
pgbouncer_instances: (vm.vcpus / 2.0).ceil.clamp(1, 8),
|
|
metrics_config: metrics_config
|
|
}
|
|
end
|
|
|
|
def trigger_failover(mode:)
|
|
unless representative_at
|
|
Clog.emit("Cannot trigger failover on a non-representative server") { {ubid: ubid} }
|
|
return false
|
|
end
|
|
|
|
unless (standby = failover_target)
|
|
Clog.emit("No suitable standby found for failover") { {ubid: ubid} }
|
|
return false
|
|
end
|
|
|
|
standby.send(:"incr_#{mode}_take_over")
|
|
true
|
|
end
|
|
|
|
def primary?
|
|
timeline_access == "push"
|
|
end
|
|
|
|
def standby?
|
|
timeline_access == "fetch" && !doing_pitr?
|
|
end
|
|
|
|
def doing_pitr?
|
|
!resource.representative_server.primary?
|
|
end
|
|
|
|
def read_replica?
|
|
resource.read_replica?
|
|
end
|
|
|
|
def storage_size_gib
|
|
vm.vm_storage_volumes_dataset.reject(&:boot).sum(&:size_gib)
|
|
end
|
|
|
|
def needs_recycling?
|
|
recycle_set? || vm.display_size != resource.target_vm_size || storage_size_gib != resource.target_storage_size_gib
|
|
end
|
|
|
|
def lsn_caught_up
|
|
parent_server = if read_replica?
|
|
resource.parent&.representative_server
|
|
else
|
|
resource.representative_server
|
|
end
|
|
|
|
lsn_diff(parent_server&.current_lsn || current_lsn, current_lsn) < 80 * 1024 * 1024
|
|
end
|
|
|
|
def current_lsn
|
|
run_query("SELECT #{lsn_function}").chomp
|
|
end
|
|
|
|
def failover_target
|
|
target = resource.servers
|
|
.reject { it.representative_at }
|
|
.select { it.strand.label == "wait" && !it.needs_recycling? }
|
|
.map { {server: it, lsn: it.current_lsn} }
|
|
.max_by { lsn2int(it[:lsn]) }
|
|
|
|
return nil if target.nil?
|
|
|
|
if resource.ha_type == PostgresResource::HaType::ASYNC
|
|
return nil if lsn_monitor.last_known_lsn.nil?
|
|
return nil if lsn_diff(lsn_monitor.last_known_lsn, target[:lsn]) > 80 * 1024 * 1024 # 80 MB or ~5 WAL files
|
|
end
|
|
|
|
target[:server]
|
|
end
|
|
|
|
def lsn_function
|
|
if primary?
|
|
"pg_current_wal_lsn()"
|
|
elsif standby?
|
|
"pg_last_wal_receive_lsn()"
|
|
else
|
|
"pg_last_wal_replay_lsn()"
|
|
end
|
|
end
|
|
|
|
def init_health_monitor_session
|
|
FileUtils.rm_rf(health_monitor_socket_path)
|
|
FileUtils.mkdir_p(health_monitor_socket_path)
|
|
|
|
ssh_session = vm.sshable.start_fresh_session
|
|
ssh_session.forward.local_socket(File.join(health_monitor_socket_path, ".s.PGSQL.5432"), "/var/run/postgresql/.s.PGSQL.5432")
|
|
{
|
|
ssh_session: ssh_session,
|
|
db_connection: nil
|
|
}
|
|
end
|
|
|
|
def init_metrics_export_session
|
|
ssh_session = vm.sshable.start_fresh_session
|
|
{
|
|
ssh_session: ssh_session
|
|
}
|
|
end
|
|
|
|
def check_pulse(session:, previous_pulse:)
|
|
reading = begin
|
|
session[:db_connection] ||= Sequel.connect(adapter: "postgres", host: health_monitor_socket_path, user: "postgres", connect_timeout: 4, keep_reference: false)
|
|
last_known_lsn = session[:db_connection]["SELECT #{lsn_function} AS lsn"].first[:lsn]
|
|
"up"
|
|
rescue
|
|
"down"
|
|
end
|
|
pulse = aggregate_readings(previous_pulse: previous_pulse, reading: reading, data: {last_known_lsn: last_known_lsn})
|
|
|
|
DB.transaction do
|
|
if pulse[:reading] == "up" && pulse[:reading_rpt] % 12 == 1
|
|
begin
|
|
PostgresLsnMonitor.new(last_known_lsn: last_known_lsn) { it.postgres_server_id = id }
|
|
.insert_conflict(
|
|
target: :postgres_server_id,
|
|
update: {last_known_lsn: last_known_lsn}
|
|
).save_changes
|
|
rescue Sequel::Error => ex
|
|
Clog.emit("Failed to update PostgresLsnMonitor") { {lsn_update_error: {ubid: ubid, last_known_lsn: last_known_lsn, exception: Util.exception_to_hash(ex)}} }
|
|
end
|
|
end
|
|
|
|
if pulse[:reading] == "down" && pulse[:reading_rpt] > 5 && Time.now - pulse[:reading_chg] > 30 && !reload.checkup_set?
|
|
incr_checkup
|
|
end
|
|
end
|
|
|
|
pulse
|
|
end
|
|
|
|
def needs_event_loop_for_pulse_check?
|
|
true
|
|
end
|
|
|
|
def health_monitor_socket_path
|
|
@health_monitor_socket_path ||= File.join(Dir.pwd, "var", "health_monitor_sockets", "pg_#{vm.ip6}")
|
|
end
|
|
|
|
def lsn2int(lsn)
|
|
lsn.split("/").map { it.rjust(8, "0") }.join.hex
|
|
end
|
|
|
|
def lsn_diff(lsn1, lsn2)
|
|
lsn2int(lsn1) - lsn2int(lsn2)
|
|
end
|
|
|
|
def run_query(query)
|
|
vm.sshable.cmd("PGOPTIONS='-c statement_timeout=60s' psql -U postgres -t --csv -v 'ON_ERROR_STOP=1'", stdin: query).chomp
|
|
end
|
|
|
|
def metrics_config
|
|
ignored_timeseries_patterns = [
|
|
"pg_stat_user_tables_.*",
|
|
"pg_statio_user_tables_.*"
|
|
]
|
|
exclude_pattern = ignored_timeseries_patterns.join("|")
|
|
query_params = {
|
|
"match[]": "{__name__!~'#{exclude_pattern}'}"
|
|
}
|
|
query_str = URI.encode_www_form(query_params)
|
|
|
|
{
|
|
endpoints: [
|
|
"https://localhost:9090/federate?#{query_str}"
|
|
],
|
|
max_file_retention: 120,
|
|
interval: "15s",
|
|
additional_labels: {},
|
|
metrics_dir: "/home/ubi/postgres/metrics",
|
|
project_id: Config.postgres_service_project_id
|
|
}
|
|
end
|
|
|
|
def storage_device_paths
|
|
if vm.location.aws?
|
|
# On AWS, pick the largest block device to use as the data disk,
|
|
# since the device path detected by the VmStorageVolume is not always
|
|
# correct.
|
|
storage_device_count = vm.vm_storage_volumes.count { it.boot == false }
|
|
vm.sshable.cmd("lsblk -b -d -o NAME,SIZE | sort -n -k2 | tail -n#{storage_device_count} | awk '{print \"/dev/\"$1}'").strip.split
|
|
else
|
|
[vm.vm_storage_volumes.find { it.boot == false }.device_path.shellescape]
|
|
end
|
|
end
|
|
|
|
def taking_over?
|
|
unplanned_take_over_set? || planned_take_over_set? || FAILOVER_LABELS.include?(strand.label)
|
|
end
|
|
|
|
FAILOVER_LABELS = ["prepare_for_unplanned_take_over", "prepare_for_planned_take_over", "wait_fencing_of_old_primary", "taking_over"].freeze
|
|
end
|
|
|
|
# Table: postgres_server
|
|
# Columns:
|
|
# id | uuid | PRIMARY KEY
|
|
# created_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# updated_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# resource_id | uuid | NOT NULL
|
|
# vm_id | uuid |
|
|
# timeline_id | uuid | NOT NULL
|
|
# timeline_access | timeline_access | NOT NULL DEFAULT 'push'::timeline_access
|
|
# representative_at | timestamp with time zone |
|
|
# synchronization_status | synchronization_status | NOT NULL DEFAULT 'ready'::synchronization_status
|
|
# Indexes:
|
|
# postgres_server_pkey1 | PRIMARY KEY btree (id)
|
|
# postgres_server_resource_id_index | UNIQUE btree (resource_id) WHERE representative_at IS NOT NULL
|
|
# Foreign key constraints:
|
|
# postgres_server_timeline_id_fkey | (timeline_id) REFERENCES postgres_timeline(id)
|
|
# postgres_server_vm_id_fkey | (vm_id) REFERENCES vm(id)
|