Files
ubicloud/model/postgres/postgres_timeline.rb
Furkan Sahin 69d9f17f81 Use AWS S3 SDK client for bucket operations
We used to use our MinIO client to interract with AWS S3, too. However,
that has a bunch of problems because we manually created that to talk to
our own minio clusters. Now, introducing multiple regions, we have to
either dive deeper to refactor the client or we could simply introduce
the aws sdk. I chose latter considering it's much cheaper for
engineering.
2025-06-24 19:42:51 +03:00

181 lines
5.4 KiB
Ruby

# frozen_string_literal: true
require_relative "../../model"
require "aws-sdk-s3"
class PostgresTimeline < Sequel::Model
one_to_one :strand, key: :id
one_to_one :parent, key: :parent_id, class: self
one_to_one :leader, class: :PostgresServer, key: :timeline_id, conditions: {timeline_access: "push"}
many_to_one :location
plugin ResourceMethods
include SemaphoreMethods
semaphore :destroy
plugin :column_encryption do |enc|
enc.column :secret_key
end
BACKUP_BUCKET_EXPIRATION_DAYS = 8
def bucket_name
ubid
end
def generate_walg_config
<<-WALG_CONF
WALG_S3_PREFIX=s3://#{ubid}
AWS_ENDPOINT=#{blob_storage_endpoint}
AWS_ACCESS_KEY_ID=#{access_key}
AWS_SECRET_ACCESS_KEY=#{secret_key}
AWS_REGION: #{aws? ? location.name : "us-east-1"}
AWS_S3_FORCE_PATH_STYLE=true
PGHOST=/var/run/postgresql
WALG_CONF
end
def need_backup?
return false if blob_storage.nil?
return false if leader.nil?
status = leader.vm.sshable.cmd("common/bin/daemonizer --check take_postgres_backup")
return true if ["Failed", "NotStarted"].include?(status)
return true if status == "Succeeded" && (latest_backup_started_at.nil? || latest_backup_started_at < Time.now - 60 * 60 * 24)
false
end
def backups
return [] if blob_storage.nil?
begin
list_objects("basebackups_005/")
.select { it.key.end_with?("backup_stop_sentinel.json") }
rescue => ex
recoverable_errors = ["The Access Key Id you provided does not exist in our records.", "AccessDenied", "No route to host", "Connection refused"]
Clog.emit("Backup fetch exception") { Util.exception_to_hash(ex) }
return [] if recoverable_errors.any? { ex.message.include?(it) }
raise
end
end
def latest_backup_label_before_target(target:)
backup = backups.sort_by(&:last_modified).reverse.find { it.last_modified < target }
fail "BUG: no backup found" unless backup
backup.key.delete_prefix("basebackups_005/").delete_suffix("_backup_stop_sentinel.json")
end
def earliest_restore_time
# Check if we have cached earliest backup time, if not, calculate it.
# The cached time is valid if its within BACKUP_BUCKET_EXPIRATION_DAYS.
time_limit = Time.now - BACKUP_BUCKET_EXPIRATION_DAYS * 24 * 60 * 60
if cached_earliest_backup_at.nil? || cached_earliest_backup_at <= time_limit
earliest_backup = backups
.select { |b| b.last_modified > time_limit }
.map(&:last_modified).min
update(cached_earliest_backup_at: earliest_backup)
end
if cached_earliest_backup_at
cached_earliest_backup_at + 5 * 60
end
end
def latest_restore_time
Time.now
end
def aws?
location&.aws?
end
S3BlobStorage = Struct.new(:url)
def blob_storage
@blob_storage ||= MinioCluster[blob_storage_id] || (aws? ? S3BlobStorage.new("https://s3.#{location.name}.amazonaws.com") : nil)
end
def blob_storage_endpoint
@blob_storage_endpoint ||= blob_storage.url || blob_storage.ip4_urls.sample
end
def blob_storage_client
@blob_storage_client ||= aws? ? Aws::S3::Client.new(
region: location.name,
access_key_id: access_key,
secret_access_key: secret_key,
endpoint: blob_storage_endpoint,
force_path_style: true
) : Minio::Client.new(
endpoint: blob_storage_endpoint,
access_key: access_key,
secret_key: secret_key,
ssl_ca_data: blob_storage.root_certs
)
end
def blob_storage_policy
{Version: "2012-10-17", Statement: [{Effect: "Allow", Action: ["s3:*"], Resource: ["arn:aws:s3:::#{ubid}*"]}]}
end
def list_objects(prefix)
aws? ?
blob_storage_client.list_objects_v2(bucket: ubid, prefix: prefix).contents
: blob_storage_client.list_objects(ubid, prefix)
end
def create_bucket
aws? ?
blob_storage_client.create_bucket({
bucket: ubid,
create_bucket_configuration: {
location_constraint: location.name
}
})
: blob_storage_client.create_bucket(ubid)
end
def set_lifecycle_policy
aws? ?
blob_storage_client.put_bucket_lifecycle_configuration({
bucket: ubid,
lifecycle_configuration: {
rules: [
{
id: "DeleteOldBackups",
prefix: "basebackups_005/",
status: "Enabled",
expiration: {
days: BACKUP_BUCKET_EXPIRATION_DAYS
}
}
]
}
})
: blob_storage_client.set_lifecycle_policy(ubid, ubid, BACKUP_BUCKET_EXPIRATION_DAYS)
end
end
# Table: postgres_timeline
# Columns:
# id | uuid | PRIMARY KEY
# created_at | timestamp with time zone | NOT NULL DEFAULT now()
# updated_at | timestamp with time zone | NOT NULL DEFAULT now()
# parent_id | uuid |
# access_key | text |
# secret_key | text |
# latest_backup_started_at | timestamp with time zone |
# blob_storage_id | uuid |
# location_id | uuid |
# cached_earliest_backup_at | timestamp with time zone |
# Indexes:
# postgres_timeline_pkey | PRIMARY KEY btree (id)
# Foreign key constraints:
# postgres_timeline_location_id_fkey | (location_id) REFERENCES location(id)
# Referenced By:
# postgres_server | postgres_server_timeline_id_fkey | (timeline_id) REFERENCES postgres_timeline(id)