We were only cleaning up the basebackups and forget about the wal files. With this commit, we start cleaning up the wal files, too.
183 lines
5.8 KiB
Ruby
183 lines
5.8 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "../../model"
|
|
require "aws-sdk-s3"
|
|
|
|
class PostgresTimeline < Sequel::Model
|
|
one_to_one :strand, key: :id
|
|
one_to_one :parent, key: :parent_id, class: self
|
|
one_to_one :leader, class: :PostgresServer, key: :timeline_id, conditions: {timeline_access: "push"}
|
|
many_to_one :location
|
|
|
|
plugin ResourceMethods, encrypted_columns: :secret_key
|
|
plugin SemaphoreMethods, :destroy
|
|
|
|
BACKUP_BUCKET_EXPIRATION_DAYS = 8
|
|
|
|
def bucket_name
|
|
ubid
|
|
end
|
|
|
|
def generate_walg_config
|
|
<<-WALG_CONF
|
|
WALG_S3_PREFIX=s3://#{ubid}
|
|
AWS_ENDPOINT=#{blob_storage_endpoint}
|
|
AWS_ACCESS_KEY_ID=#{access_key}
|
|
AWS_SECRET_ACCESS_KEY=#{secret_key}
|
|
AWS_REGION: #{aws? ? location.name : "us-east-1"}
|
|
AWS_S3_FORCE_PATH_STYLE=true
|
|
PGHOST=/var/run/postgresql
|
|
WALG_CONF
|
|
end
|
|
|
|
def need_backup?
|
|
return false if blob_storage.nil?
|
|
return false if leader.nil?
|
|
|
|
status = leader.vm.sshable.cmd("common/bin/daemonizer --check take_postgres_backup")
|
|
return true if ["Failed", "NotStarted"].include?(status)
|
|
return true if status == "Succeeded" && (latest_backup_started_at.nil? || latest_backup_started_at < Time.now - 60 * 60 * 24)
|
|
|
|
false
|
|
end
|
|
|
|
def backups
|
|
return [] if blob_storage.nil?
|
|
|
|
begin
|
|
list_objects("basebackups_005/")
|
|
.select { it.key.end_with?("backup_stop_sentinel.json") }
|
|
rescue => ex
|
|
recoverable_errors = ["The AWS Access Key Id you provided does not exist in our records.", "The specified bucket does not exist", "AccessDenied", "No route to host", "Connection refused"]
|
|
Clog.emit("Backup fetch exception") { Util.exception_to_hash(ex) }
|
|
return [] if recoverable_errors.any? { ex.message.include?(it) }
|
|
raise
|
|
end
|
|
end
|
|
|
|
def latest_backup_label_before_target(target:)
|
|
backup = backups.sort_by(&:last_modified).reverse.find { it.last_modified < target }
|
|
fail "BUG: no backup found" unless backup
|
|
backup.key.delete_prefix("basebackups_005/").delete_suffix("_backup_stop_sentinel.json")
|
|
end
|
|
|
|
def earliest_restore_time
|
|
# Check if we have cached earliest backup time, if not, calculate it.
|
|
# The cached time is valid if its within BACKUP_BUCKET_EXPIRATION_DAYS.
|
|
time_limit = Time.now - BACKUP_BUCKET_EXPIRATION_DAYS * 24 * 60 * 60
|
|
|
|
if cached_earliest_backup_at.nil? || cached_earliest_backup_at <= time_limit
|
|
earliest_backup = backups
|
|
.select { |b| b.last_modified > time_limit }
|
|
.map(&:last_modified).min
|
|
|
|
update(cached_earliest_backup_at: earliest_backup)
|
|
end
|
|
|
|
if cached_earliest_backup_at
|
|
cached_earliest_backup_at + 5 * 60
|
|
end
|
|
end
|
|
|
|
def latest_restore_time
|
|
Time.now
|
|
end
|
|
|
|
def aws?
|
|
location&.aws?
|
|
end
|
|
|
|
S3BlobStorage = Struct.new(:url)
|
|
|
|
def blob_storage
|
|
@blob_storage ||= MinioCluster[blob_storage_id] || (aws? ? S3BlobStorage.new("https://s3.#{location.name}.amazonaws.com") : nil)
|
|
end
|
|
|
|
def blob_storage_endpoint
|
|
@blob_storage_endpoint ||= blob_storage.url || blob_storage.ip4_urls.sample
|
|
end
|
|
|
|
def blob_storage_client
|
|
@blob_storage_client ||= aws? ? Aws::S3::Client.new(
|
|
region: location.name,
|
|
access_key_id: access_key,
|
|
secret_access_key: secret_key,
|
|
endpoint: blob_storage_endpoint,
|
|
force_path_style: true
|
|
) : Minio::Client.new(
|
|
endpoint: blob_storage_endpoint,
|
|
access_key: access_key,
|
|
secret_key: secret_key,
|
|
ssl_ca_data: blob_storage.root_certs
|
|
)
|
|
end
|
|
|
|
def blob_storage_policy
|
|
{Version: "2012-10-17", Statement: [{Effect: "Allow", Action: ["s3:*"], Resource: ["arn:aws:s3:::#{ubid}*"]}]}
|
|
end
|
|
|
|
def list_objects(prefix)
|
|
aws? ?
|
|
aws_list_objects(prefix)
|
|
: blob_storage_client.list_objects(ubid, prefix)
|
|
end
|
|
|
|
def aws_list_objects(prefix)
|
|
response = blob_storage_client.list_objects_v2(bucket: ubid, prefix: prefix)
|
|
objects = response.contents
|
|
while response.is_truncated
|
|
response = blob_storage_client.list_objects_v2(bucket: ubid, prefix: prefix, continuation_token: response.next_continuation_token)
|
|
objects.concat(response.contents)
|
|
end
|
|
objects
|
|
end
|
|
|
|
def create_bucket
|
|
aws? ? aws_create_bucket : blob_storage_client.create_bucket(ubid)
|
|
end
|
|
|
|
def aws_create_bucket
|
|
location_constraint = (location.name == "us-east-1") ? nil : {location_constraint: location.name}
|
|
blob_storage_client.create_bucket(bucket: ubid, create_bucket_configuration: location_constraint)
|
|
end
|
|
|
|
def set_lifecycle_policy
|
|
aws? ?
|
|
blob_storage_client.put_bucket_lifecycle_configuration({
|
|
bucket: ubid,
|
|
lifecycle_configuration: {
|
|
rules: [
|
|
{
|
|
id: "DeleteOldBackups",
|
|
status: "Enabled",
|
|
expiration: {
|
|
days: BACKUP_BUCKET_EXPIRATION_DAYS
|
|
},
|
|
filter: {}
|
|
}
|
|
]
|
|
}
|
|
})
|
|
: blob_storage_client.set_lifecycle_policy(ubid, ubid, BACKUP_BUCKET_EXPIRATION_DAYS)
|
|
end
|
|
end
|
|
|
|
# Table: postgres_timeline
|
|
# Columns:
|
|
# id | uuid | PRIMARY KEY
|
|
# created_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# updated_at | timestamp with time zone | NOT NULL DEFAULT now()
|
|
# parent_id | uuid |
|
|
# access_key | text |
|
|
# secret_key | text |
|
|
# latest_backup_started_at | timestamp with time zone |
|
|
# blob_storage_id | uuid |
|
|
# location_id | uuid |
|
|
# cached_earliest_backup_at | timestamp with time zone |
|
|
# Indexes:
|
|
# postgres_timeline_pkey | PRIMARY KEY btree (id)
|
|
# Foreign key constraints:
|
|
# postgres_timeline_location_id_fkey | (location_id) REFERENCES location(id)
|
|
# Referenced By:
|
|
# postgres_server | postgres_server_timeline_id_fkey | (timeline_id) REFERENCES postgres_timeline(id)
|