mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-04 22:02:18 +08:00
Have all places trying to get the victoria metrics client call one of these two methods, instead of duplicating the logic inconsistently in multiple places.
333 lines
11 KiB
Ruby
333 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "net/ssh"
|
|
require "uri"
|
|
require_relative "../../model"
|
|
|
|
class PostgresServer < Sequel::Model
|
|
one_to_one :strand, key: :id
|
|
many_to_one :resource, class: :PostgresResource, key: :resource_id
|
|
many_to_one :timeline, class: :PostgresTimeline, key: :timeline_id
|
|
one_to_one :vm, key: :id, primary_key: :vm_id
|
|
one_to_one :lsn_monitor, class: :PostgresLsnMonitor, key: :postgres_server_id
|
|
|
|
plugin :association_dependencies, lsn_monitor: :destroy
|
|
|
|
plugin ResourceMethods
|
|
plugin SemaphoreMethods, :initial_provisioning, :refresh_certificates, :update_superuser_password, :checkup,
|
|
:restart, :configure, :fence, :planned_take_over, :unplanned_take_over, :configure_metrics,
|
|
:destroy, :recycle, :promote, :refresh_walg_credentials
|
|
include HealthMonitorMethods
|
|
include MetricsTargetMethods
|
|
|
|
def self.victoria_metrics_client
|
|
VictoriaMetricsResource.client_for_project(Config.postgres_service_project_id)
|
|
end
|
|
|
|
def configure_hash
|
|
configs = {
|
|
"listen_addresses" => "'*'",
|
|
"max_connections" => "500",
|
|
"superuser_reserved_connections" => "3",
|
|
"shared_buffers" => "#{vm.memory_gib * 1024 / 4}MB",
|
|
"work_mem" => "#{[vm.memory_gib / 8, 1].max}MB",
|
|
"maintenance_work_mem" => "#{vm.memory_gib * 1024 / 16}MB",
|
|
"max_parallel_workers" => "4",
|
|
"max_parallel_workers_per_gather" => "2",
|
|
"max_parallel_maintenance_workers" => "2",
|
|
"min_wal_size" => "80MB",
|
|
"max_wal_size" => "5GB",
|
|
"random_page_cost" => "1.1",
|
|
"effective_cache_size" => "#{vm.memory_gib * 1024 * 3 / 4}MB",
|
|
"effective_io_concurrency" => "200",
|
|
"tcp_keepalives_count" => "4",
|
|
"tcp_keepalives_idle" => "2",
|
|
"tcp_keepalives_interval" => "2",
|
|
"ssl" => "on",
|
|
"ssl_min_protocol_version" => "TLSv1.3",
|
|
"ssl_ca_file" => "'/etc/ssl/certs/ca.crt'",
|
|
"ssl_cert_file" => "'/etc/ssl/certs/server.crt'",
|
|
"ssl_key_file" => "'/etc/ssl/certs/server.key'",
|
|
"log_timezone" => "'UTC'",
|
|
"log_directory" => "'pg_log'",
|
|
"log_filename" => "'postgresql.log'",
|
|
"log_truncate_on_rotation" => "true",
|
|
"logging_collector" => "on",
|
|
"timezone" => "'UTC'",
|
|
"lc_messages" => "'C.UTF-8'",
|
|
"lc_monetary" => "'C.UTF-8'",
|
|
"lc_numeric" => "'C.UTF-8'",
|
|
"lc_time" => "'C.UTF-8'",
|
|
"shared_preload_libraries" => "'pg_cron,pg_stat_statements'",
|
|
"cron.use_background_workers" => "on"
|
|
}
|
|
|
|
if resource.flavor == PostgresResource::Flavor::PARADEDB
|
|
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,pg_analytics,pg_search'"
|
|
elsif resource.flavor == PostgresResource::Flavor::LANTERN
|
|
configs["shared_preload_libraries"] = "'pg_cron,pg_stat_statements,lantern_extras'"
|
|
configs["lantern.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
|
|
configs["lantern.external_index_port"] = "443"
|
|
configs["lantern.external_index_secure"] = "true"
|
|
configs["hnsw.external_index_host"] = "'external-indexing.cloud.lantern.dev'"
|
|
configs["hnsw.external_index_port"] = "443"
|
|
configs["hnsw.external_index_secure"] = "true"
|
|
end
|
|
|
|
if timeline.blob_storage
|
|
configs[:archive_mode] = "on"
|
|
configs[:archive_timeout] = "60"
|
|
configs[:archive_command] = "'/usr/bin/wal-g wal-push %p --config /etc/postgresql/wal-g.env'"
|
|
|
|
if primary?
|
|
if resource.ha_type == PostgresResource::HaType::SYNC
|
|
caught_up_standbys = resource.servers.select { it.standby? && it.synchronization_status == "ready" }
|
|
configs[:synchronous_standby_names] = "'ANY 1 (#{caught_up_standbys.map(&:ubid).join(",")})'" unless caught_up_standbys.empty?
|
|
end
|
|
end
|
|
|
|
if standby?
|
|
configs[:primary_conninfo] = "'#{resource.replication_connection_string(application_name: ubid)}'"
|
|
end
|
|
|
|
if doing_pitr?
|
|
configs[:recovery_target_time] = "'#{resource.restore_target}'"
|
|
end
|
|
|
|
if standby? || doing_pitr?
|
|
configs[:restore_command] = "'/usr/bin/wal-g wal-fetch %f %p --config /etc/postgresql/wal-g.env'"
|
|
end
|
|
|
|
if timeline.aws?
|
|
configs[:log_line_prefix] = "'%m [%p:%l] (%x,%v): host=%r,db=%d,user=%u,app=%a,client=%h '"
|
|
configs[:log_connections] = "on"
|
|
configs[:log_disconnections] = "on"
|
|
end
|
|
end
|
|
|
|
{
|
|
configs: configs,
|
|
user_config: resource.user_config,
|
|
pgbouncer_user_config: resource.pgbouncer_user_config,
|
|
private_subnets: vm.private_subnets.map {
|
|
{
|
|
net4: it.net4.to_s,
|
|
net6: it.net6.to_s
|
|
}
|
|
},
|
|
identity: resource.identity,
|
|
hosts: "#{resource.representative_server.vm.private_ipv4} #{resource.identity}",
|
|
pgbouncer_instances: (vm.vcpus / 2.0).ceil.clamp(1, 8),
|
|
metrics_config: metrics_config
|
|
}
|
|
end
|
|
|
|
def trigger_failover(mode:)
|
|
unless representative_at
|
|
Clog.emit("Cannot trigger failover on a non-representative server") { {ubid: ubid} }
|
|
return false
|
|
end
|
|
|
|
unless (standby = failover_target)
|
|
Clog.emit("No suitable standby found for failover") { {ubid: ubid} }
|
|
return false
|
|
end
|
|
|
|
standby.send(:"incr_#{mode}_take_over")
|
|
true
|
|
end
|
|
|
|
def primary?
|
|
timeline_access == "push"
|
|
end
|
|
|
|
def standby?
|
|
timeline_access == "fetch" && !doing_pitr?
|
|
end
|
|
|
|
def doing_pitr?
|
|
!resource.representative_server.primary?
|
|
end
|
|
|
|
def read_replica?
|
|
resource.read_replica?
|
|
end
|
|
|
|
def storage_size_gib
|
|
vm.vm_storage_volumes_dataset.reject(&:boot).sum(&:size_gib)
|
|
end
|
|
|
|
def needs_recycling?
|
|
recycle_set? || vm.display_size != resource.target_vm_size || storage_size_gib != resource.target_storage_size_gib
|
|
end
|
|
|
|
def lsn_caught_up
|
|
parent_server = if read_replica?
|
|
resource.parent&.representative_server
|
|
else
|
|
resource.representative_server
|
|
end
|
|
|
|
lsn_diff(parent_server&.current_lsn || current_lsn, current_lsn) < 80 * 1024 * 1024
|
|
end
|
|
|
|
def current_lsn
|
|
run_query("SELECT #{lsn_function}").chomp
|
|
end
|
|
|
|
def failover_target
|
|
target = resource.servers
|
|
.reject { it.representative_at }
|
|
.select { it.strand.label == "wait" && !it.needs_recycling? }
|
|
.map { {server: it, lsn: it.current_lsn} }
|
|
.max_by { lsn2int(it[:lsn]) }
|
|
|
|
return nil if target.nil?
|
|
|
|
if resource.ha_type == PostgresResource::HaType::ASYNC
|
|
return nil if lsn_monitor.last_known_lsn.nil?
|
|
return nil if lsn_diff(lsn_monitor.last_known_lsn, target[:lsn]) > 80 * 1024 * 1024 # 80 MB or ~5 WAL files
|
|
end
|
|
|
|
target[:server]
|
|
end
|
|
|
|
def lsn_function
|
|
if primary?
|
|
"pg_current_wal_lsn()"
|
|
elsif standby?
|
|
"pg_last_wal_receive_lsn()"
|
|
else
|
|
"pg_last_wal_replay_lsn()"
|
|
end
|
|
end
|
|
|
|
def init_health_monitor_session
|
|
FileUtils.rm_rf(health_monitor_socket_path)
|
|
FileUtils.mkdir_p(health_monitor_socket_path)
|
|
|
|
ssh_session = vm.sshable.start_fresh_session
|
|
ssh_session.forward.local_socket(File.join(health_monitor_socket_path, ".s.PGSQL.5432"), "/var/run/postgresql/.s.PGSQL.5432")
|
|
{
|
|
ssh_session: ssh_session,
|
|
db_connection: nil
|
|
}
|
|
end
|
|
|
|
def init_metrics_export_session
|
|
ssh_session = vm.sshable.start_fresh_session
|
|
{
|
|
ssh_session: ssh_session
|
|
}
|
|
end
|
|
|
|
def check_pulse(session:, previous_pulse:)
|
|
reading = begin
|
|
session[:db_connection] ||= Sequel.connect(adapter: "postgres", host: health_monitor_socket_path, user: "postgres", connect_timeout: 4, keep_reference: false)
|
|
last_known_lsn = session[:db_connection]["SELECT #{lsn_function} AS lsn"].first[:lsn]
|
|
"up"
|
|
rescue
|
|
"down"
|
|
end
|
|
pulse = aggregate_readings(previous_pulse: previous_pulse, reading: reading, data: {last_known_lsn: last_known_lsn})
|
|
|
|
DB.transaction do
|
|
if pulse[:reading] == "up" && pulse[:reading_rpt] % 12 == 1
|
|
begin
|
|
PostgresLsnMonitor.new(last_known_lsn: last_known_lsn) { it.postgres_server_id = id }
|
|
.insert_conflict(
|
|
target: :postgres_server_id,
|
|
update: {last_known_lsn: last_known_lsn}
|
|
).save_changes
|
|
rescue Sequel::Error => ex
|
|
Clog.emit("Failed to update PostgresLsnMonitor") { {lsn_update_error: {ubid: ubid, last_known_lsn: last_known_lsn, exception: Util.exception_to_hash(ex)}} }
|
|
end
|
|
end
|
|
|
|
if pulse[:reading] == "down" && pulse[:reading_rpt] > 5 && Time.now - pulse[:reading_chg] > 30 && !reload.checkup_set?
|
|
incr_checkup
|
|
end
|
|
end
|
|
|
|
pulse
|
|
end
|
|
|
|
def needs_event_loop_for_pulse_check?
|
|
true
|
|
end
|
|
|
|
def health_monitor_socket_path
|
|
@health_monitor_socket_path ||= File.join(Dir.pwd, "var", "health_monitor_sockets", "pg_#{vm.ip6}")
|
|
end
|
|
|
|
def lsn2int(lsn)
|
|
lsn.split("/").map { it.rjust(8, "0") }.join.hex
|
|
end
|
|
|
|
def lsn_diff(lsn1, lsn2)
|
|
lsn2int(lsn1) - lsn2int(lsn2)
|
|
end
|
|
|
|
def run_query(query)
|
|
vm.sshable.cmd("PGOPTIONS='-c statement_timeout=60s' psql -U postgres -t --csv -v 'ON_ERROR_STOP=1'", stdin: query).chomp
|
|
end
|
|
|
|
def metrics_config
|
|
ignored_timeseries_patterns = [
|
|
"pg_stat_user_tables_.*",
|
|
"pg_statio_user_tables_.*"
|
|
]
|
|
exclude_pattern = ignored_timeseries_patterns.join("|")
|
|
query_params = {
|
|
"match[]": "{__name__!~'#{exclude_pattern}'}"
|
|
}
|
|
query_str = URI.encode_www_form(query_params)
|
|
|
|
{
|
|
endpoints: [
|
|
"https://localhost:9090/federate?#{query_str}"
|
|
],
|
|
max_file_retention: 120,
|
|
interval: "15s",
|
|
additional_labels: {},
|
|
metrics_dir: "/home/ubi/postgres/metrics",
|
|
project_id: Config.postgres_service_project_id
|
|
}
|
|
end
|
|
|
|
def storage_device_paths
|
|
if vm.location.aws?
|
|
# On AWS, pick the largest block device to use as the data disk,
|
|
# since the device path detected by the VmStorageVolume is not always
|
|
# correct.
|
|
storage_device_count = vm.vm_storage_volumes.count { it.boot == false }
|
|
vm.sshable.cmd("lsblk -b -d -o NAME,SIZE | sort -n -k2 | tail -n#{storage_device_count} | awk '{print \"/dev/\"$1}'").strip.split
|
|
else
|
|
[vm.vm_storage_volumes.find { it.boot == false }.device_path.shellescape]
|
|
end
|
|
end
|
|
|
|
def taking_over?
|
|
unplanned_take_over_set? || planned_take_over_set? || FAILOVER_LABELS.include?(strand.label)
|
|
end
|
|
|
|
FAILOVER_LABELS = ["prepare_for_unplanned_take_over", "prepare_for_planned_take_over", "wait_fencing_of_old_primary", "taking_over"].freeze
|
|
end
|
|
|
|
# Table: postgres_server
|
|
# Columns:
|
|
# id | uuid | PRIMARY KEY
|
|
# created_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# updated_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# resource_id | uuid | NOT NULL
|
|
# vm_id | uuid |
|
|
# timeline_id | uuid | NOT NULL
|
|
# timeline_access | timeline_access | NOT NULL DEFAULT 'push'::timeline_access
|
|
# representative_at | timestamp with time zone |
|
|
# synchronization_status | synchronization_status | NOT NULL DEFAULT 'ready'::synchronization_status
|
|
# Indexes:
|
|
# postgres_server_pkey1 | PRIMARY KEY btree (id)
|
|
# postgres_server_resource_id_index | UNIQUE btree (resource_id) WHERE representative_at IS NOT NULL
|
|
# Foreign key constraints:
|
|
# postgres_server_timeline_id_fkey | (timeline_id) REFERENCES postgres_timeline(id)
|
|
# postgres_server_vm_id_fkey | (vm_id) REFERENCES vm(id)
|