Files
ubicloud/prog/github/github_repository_nexus.rb
Enes Cakir 23259944f5 Not poll more than 200 jobs for each iteration
Unfortunately, GitHub doesn't have an API endpoint to get all workflow
jobs for the repository.

We just get all queued workflow runs for the repository, then get
workflow jobs for each workflow run.

We have a 2-minute limit in respirate for each run. If it exceeds this
limit, respirate considers the run stuck and terminates itself.

We encountered this issue in production when we needed to poll over 200
workflow runs in one iteration, which took more than 2 minutes. As a
result, respirate crashed multiple times.

The tricky part is that, since runners are job/run agnostic, we sum up
all queued labels and compare them with the existing runners for this
repository. If there are fewer runners, we provision extra ones. Since
we limit polling to the first 200 runs per iteration, the existing
runner count will likely be higher, and we won't provision extra ones.

However, this is a rare case, and we poll jobs as a nice-to-have when
the webhook is missing every 5 minutes, which is acceptable.

The number of queued runs goes down when their jobs are assigned to
runners, so it shouldn't always be high.
2025-03-20 13:30:01 +03:00

164 lines
6.2 KiB
Ruby

# frozen_string_literal: true
require_relative "../../lib/util"
class Prog::Github::GithubRepositoryNexus < Prog::Base
subject_is :github_repository
def self.assemble(installation, name, default_branch)
DB.transaction do
repository = GithubRepository.new_with_id(installation_id: installation.id, name: name)
repository.skip_auto_validations(:unique) do
updates = {last_job_at: Time.now}
updates[:default_branch] = default_branch if default_branch
repository.insert_conflict(target: [:installation_id, :name], update: updates).save_changes
end
Strand.new(prog: "Github::GithubRepositoryNexus", label: "wait") { _1.id = repository.id }
.insert_conflict(target: :id).save_changes
end
end
def client
@client ||= Github.installation_client(github_repository.installation.installation_id).tap { _1.auto_paginate = true }
end
# We dynamically adjust the polling interval based on the remaining rate
# limit. It's 5 minutes by default, but it can be increased if the rate limit
# is low.
def polling_interval
@polling_interval ||= 5 * 60
end
def check_queued_jobs
unless github_repository.installation.project.active?
@polling_interval = 24 * 60 * 60
return
end
queued_runs = client.repository_workflow_runs(github_repository.name, {status: "queued"})[:workflow_runs]
Clog.emit("polled queued runs") { {polled_queued_runs: {repository_name: github_repository.name, count: queued_runs.count}} }
# We check the rate limit after the first API call to avoid unnecessary API
# calls to fetch only the rate limit. Every response includes the rate limit
# information in the headers.
remaining_quota = client.rate_limit.remaining / client.rate_limit.limit.to_f
if remaining_quota < 0.1
Clog.emit("low remaining quota") { {low_remaining_quota: {repository_name: github_repository.name, limit: client.rate_limit.limit, remaining: client.rate_limit.remaining}} }
@polling_interval = (client.rate_limit.resets_at - Time.now).to_i
return
end
queued_labels = Hash.new(0)
queued_runs.first(200).each do |run|
jobs = client.workflow_run_attempt_jobs(github_repository.name, run[:id], run[:run_attempt])[:jobs]
jobs.each do |job|
next if job[:status] != "queued"
next unless (label = job[:labels].find { Github.runner_labels.key?(_1) })
queued_labels[label] += 1
end
end
queued_labels.each do |label, count|
idle_runner_count = github_repository.runners_dataset.where(label: label, workflow_job: nil).count
# The calculation of the required_runner_count isn't atomic because it
# requires multiple API calls and database queries. However, it will
# eventually settle on the correct value. If we create more runners than
# necessary, the excess will be recycled after 5 minutes at no extra cost
# to the customer. If fewer runners are created than needed, the system
# will generate more in the next cycle.
next if (required_runner_count = count - idle_runner_count) && required_runner_count <= 0
Clog.emit("extra runner needed") { {needed_extra_runner: {repository_name: github_repository.name, label: label, count: required_runner_count}} }
required_runner_count.times do
Prog::Vm::GithubRunner.assemble(
github_repository.installation,
repository_name: github_repository.name,
label: label
)
end
end
@polling_interval = (remaining_quota < 0.5) ? 15 * 60 : 5 * 60
end
def cleanup_cache
# Destroy cache entries not accessed in last 7 days or
# created more than 7 days ago and not accessed yet.
seven_days_ago = Time.now - 7 * 24 * 60 * 60
github_repository.cache_entries_dataset
.where { (last_accessed_at < seven_days_ago) | ((last_accessed_at =~ nil) & (created_at < seven_days_ago)) }
.limit(200)
.destroy
# Destroy cache entries if it is created 30 minutes ago
# but couldn't committed yet. 30 minutes decided as during
# our performance tests uploading 10GB of data (which is
# the max size for a single cache entry) takes ~8 minutes at most.
# To be on the safe side, ~2x buffer is added.
github_repository.cache_entries_dataset
.where { created_at < Time.now - 30 * 60 }
.where { committed_at =~ nil }
.limit(200)
.destroy
# Destroy oldest cache entries if the total usage exceeds the limit.
dataset = github_repository.cache_entries_dataset.exclude(size: nil)
total_usage = dataset.sum(:size).to_i
storage_limit = github_repository.installation.project.effective_quota_value("GithubRunnerCacheStorage") * 1024 * 1024 * 1024
if total_usage > storage_limit
dataset.order(:created_at).limit(200).each do |oldest_entry|
break if total_usage <= storage_limit
oldest_entry.destroy
total_usage -= oldest_entry.size
end
end
if github_repository.cache_entries.empty?
Clog.emit("Deleting empty bucket and tokens") { {deleting_empty_bucket: {repository_name: github_repository.name}} }
github_repository.destroy_blob_storage
end
end
def before_run
when_destroy_set? do
if strand.label != "destroy"
register_deadline(nil, 5 * 60)
hop_destroy
end
end
end
label def wait
cleanup_cache if github_repository.access_key
nap 15 * 60 if Time.now - github_repository.last_job_at > 6 * 60 * 60
begin
check_queued_jobs if Config.enable_github_workflow_poller
rescue Octokit::NotFound
Clog.emit("not found repository") { {not_found_repository: {repository_name: github_repository.name}} }
if github_repository.runners.count == 0
github_repository.incr_destroy
nap 0
end
end
# check_queued_jobs may have changed the default polling interval based on
# the remaining rate limit.
nap polling_interval
end
label def destroy
decr_destroy
unless github_repository.runners.empty?
Clog.emit("Cannot destroy repository with active runners") { {not_destroyed_repository: {repository_name: github_repository.name}} }
nap 5 * 60
end
github_repository.destroy
pop "github repository destroyed"
end
end