Files
ubicloud/prog/github/github_repository_nexus.rb
Enes Cakir 890cb4a8e5 Make cache entry clean up more explicit while destroying a repository
We should only call `repository.destroy` from nexus prog, since it has a
strand. So no need to keep destroy logic in the model, we can move it to
the prog similar to PG timeline.

Also we remove cache entries explicitly. Previously, we are expecting
the association_dependencies in the model to handle this. But we just
need to be sure that the all caches are cleared before destroy the blob
storage bucket. Making this explicit in the prog makes it clearer.
2025-06-18 10:19:00 +03:00

168 lines
6.4 KiB
Ruby

# frozen_string_literal: true
require_relative "../../lib/util"
class Prog::Github::GithubRepositoryNexus < Prog::Base
subject_is :github_repository
def self.assemble(installation, name, default_branch)
DB.transaction do
repository = GithubRepository.new_with_id(installation_id: installation.id, name: name)
repository.skip_auto_validations(:unique) do
updates = {last_job_at: Time.now}
updates[:default_branch] = default_branch if default_branch
repository.insert_conflict(target: [:installation_id, :name], update: updates).save_changes
end
Strand.new(prog: "Github::GithubRepositoryNexus", label: "wait") { it.id = repository.id }
.insert_conflict(target: :id).save_changes
end
end
def client
@client ||= Github.installation_client(github_repository.installation.installation_id).tap { it.auto_paginate = true }
end
# We dynamically adjust the polling interval based on the remaining rate
# limit. It's 5 minutes by default, but it can be increased if the rate limit
# is low.
def polling_interval
@polling_interval ||= 5 * 60
end
def check_queued_jobs
unless github_repository.installation.project.active?
@polling_interval = 24 * 60 * 60
return
end
queued_runs = client.repository_workflow_runs(github_repository.name, {status: "queued"})[:workflow_runs]
Clog.emit("polled queued runs") { {polled_queued_runs: {repository_name: github_repository.name, count: queued_runs.count}} }
# We check the rate limit after the first API call to avoid unnecessary API
# calls to fetch only the rate limit. Every response includes the rate limit
# information in the headers.
remaining_quota = client.rate_limit.remaining / client.rate_limit.limit.to_f
if remaining_quota < 0.1
Clog.emit("low remaining quota") { {low_remaining_quota: {repository_name: github_repository.name, limit: client.rate_limit.limit, remaining: client.rate_limit.remaining}} }
@polling_interval = (client.rate_limit.resets_at - Time.now).to_i
return
end
queued_labels = Hash.new(0)
queued_runs.first(200).each do |run|
jobs = client.workflow_run_attempt_jobs(github_repository.name, run[:id], run[:run_attempt])[:jobs]
jobs.each do |job|
next if job[:status] != "queued"
next unless (label = job[:labels].find { Github.runner_labels.key?(it) })
queued_labels[label] += 1
end
end
queued_labels.each do |label, count|
idle_runner_count = github_repository.runners_dataset.where(label: label, workflow_job: nil).count
# The calculation of the required_runner_count isn't atomic because it
# requires multiple API calls and database queries. However, it will
# eventually settle on the correct value. If we create more runners than
# necessary, the excess will be recycled after 5 minutes at no extra cost
# to the customer. If fewer runners are created than needed, the system
# will generate more in the next cycle.
next if (required_runner_count = count - idle_runner_count) && required_runner_count <= 0
Clog.emit("extra runner needed") { {needed_extra_runner: {repository_name: github_repository.name, label: label, count: required_runner_count}} }
required_runner_count.times do
Prog::Vm::GithubRunner.assemble(
github_repository.installation,
repository_name: github_repository.name,
label: label
)
end
end
@polling_interval = (remaining_quota < 0.5) ? 15 * 60 : 5 * 60
end
def cleanup_cache
# Destroy cache entries not accessed in last 7 days or
# created more than 7 days ago and not accessed yet.
seven_days_ago = Time.now - 7 * 24 * 60 * 60
cond = Sequel.expr { (last_accessed_at < seven_days_ago) | ((last_accessed_at =~ nil) & (created_at < seven_days_ago)) }
github_repository.cache_entries_dataset
.where(cond)
.limit(200)
.destroy_where(cond)
# Destroy cache entries if it is created 30 minutes ago
# but couldn't committed yet. 30 minutes decided as during
# our performance tests uploading 10GB of data (which is
# the max size for a single cache entry) takes ~8 minutes at most.
# To be on the safe side, ~2x buffer is added.
cond = Sequel.expr(committed_at: nil)
github_repository.cache_entries_dataset
.where { created_at < Time.now - 30 * 60 }
.where(cond)
.limit(200)
.destroy_where(cond)
# Destroy oldest cache entries if the total usage exceeds the limit.
dataset = github_repository.cache_entries_dataset.exclude(size: nil)
total_usage = dataset.sum(:size).to_i
storage_limit = github_repository.installation.project.effective_quota_value("GithubRunnerCacheStorage") * 1024 * 1024 * 1024
if total_usage > storage_limit
dataset.order(:created_at).limit(200).all do |oldest_entry|
oldest_entry.destroy
total_usage -= oldest_entry.size
break if total_usage <= storage_limit
end
end
if github_repository.cache_entries.empty?
Clog.emit("Deleting empty bucket and tokens") { {deleting_empty_bucket: {repository_name: github_repository.name}} }
github_repository.destroy_blob_storage
end
end
def before_run
when_destroy_set? do
if strand.label != "destroy"
register_deadline(nil, 5 * 60)
hop_destroy
end
end
end
label def wait
cleanup_cache if github_repository.access_key
nap 15 * 60 if Time.now - github_repository.last_job_at > 6 * 60 * 60
begin
check_queued_jobs if Config.enable_github_workflow_poller
rescue Octokit::NotFound
Clog.emit("not found repository") { {not_found_repository: {repository_name: github_repository.name}} }
if github_repository.runners.count == 0
github_repository.incr_destroy
nap 0
end
end
# check_queued_jobs may have changed the default polling interval based on
# the remaining rate limit.
nap polling_interval
end
label def destroy
decr_destroy
unless github_repository.runners.empty?
Clog.emit("Cannot destroy repository with active runners") { {not_destroyed_repository: {repository_name: github_repository.name}} }
nap 5 * 60
end
github_repository.cache_entries_dataset.destroy
github_repository.destroy_blob_storage if github_repository.access_key
github_repository.destroy
pop "github repository destroyed"
end
end