Unfortunately, GitHub doesn't have an API endpoint to get all workflow jobs for the repository. We just get all queued workflow runs for the repository, then get workflow jobs for each workflow run. We have a 2-minute limit in respirate for each run. If it exceeds this limit, respirate considers the run stuck and terminates itself. We encountered this issue in production when we needed to poll over 200 workflow runs in one iteration, which took more than 2 minutes. As a result, respirate crashed multiple times. The tricky part is that, since runners are job/run agnostic, we sum up all queued labels and compare them with the existing runners for this repository. If there are fewer runners, we provision extra ones. Since we limit polling to the first 200 runs per iteration, the existing runner count will likely be higher, and we won't provision extra ones. However, this is a rare case, and we poll jobs as a nice-to-have when the webhook is missing every 5 minutes, which is acceptable. The number of queued runs goes down when their jobs are assigned to runners, so it shouldn't always be high.
164 lines
6.2 KiB
Ruby
164 lines
6.2 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "../../lib/util"
|
|
|
|
class Prog::Github::GithubRepositoryNexus < Prog::Base
|
|
subject_is :github_repository
|
|
|
|
def self.assemble(installation, name, default_branch)
|
|
DB.transaction do
|
|
repository = GithubRepository.new_with_id(installation_id: installation.id, name: name)
|
|
repository.skip_auto_validations(:unique) do
|
|
updates = {last_job_at: Time.now}
|
|
updates[:default_branch] = default_branch if default_branch
|
|
repository.insert_conflict(target: [:installation_id, :name], update: updates).save_changes
|
|
end
|
|
Strand.new(prog: "Github::GithubRepositoryNexus", label: "wait") { _1.id = repository.id }
|
|
.insert_conflict(target: :id).save_changes
|
|
end
|
|
end
|
|
|
|
def client
|
|
@client ||= Github.installation_client(github_repository.installation.installation_id).tap { _1.auto_paginate = true }
|
|
end
|
|
|
|
# We dynamically adjust the polling interval based on the remaining rate
|
|
# limit. It's 5 minutes by default, but it can be increased if the rate limit
|
|
# is low.
|
|
def polling_interval
|
|
@polling_interval ||= 5 * 60
|
|
end
|
|
|
|
def check_queued_jobs
|
|
unless github_repository.installation.project.active?
|
|
@polling_interval = 24 * 60 * 60
|
|
return
|
|
end
|
|
queued_runs = client.repository_workflow_runs(github_repository.name, {status: "queued"})[:workflow_runs]
|
|
Clog.emit("polled queued runs") { {polled_queued_runs: {repository_name: github_repository.name, count: queued_runs.count}} }
|
|
|
|
# We check the rate limit after the first API call to avoid unnecessary API
|
|
# calls to fetch only the rate limit. Every response includes the rate limit
|
|
# information in the headers.
|
|
remaining_quota = client.rate_limit.remaining / client.rate_limit.limit.to_f
|
|
if remaining_quota < 0.1
|
|
Clog.emit("low remaining quota") { {low_remaining_quota: {repository_name: github_repository.name, limit: client.rate_limit.limit, remaining: client.rate_limit.remaining}} }
|
|
@polling_interval = (client.rate_limit.resets_at - Time.now).to_i
|
|
return
|
|
end
|
|
|
|
queued_labels = Hash.new(0)
|
|
queued_runs.first(200).each do |run|
|
|
jobs = client.workflow_run_attempt_jobs(github_repository.name, run[:id], run[:run_attempt])[:jobs]
|
|
|
|
jobs.each do |job|
|
|
next if job[:status] != "queued"
|
|
next unless (label = job[:labels].find { Github.runner_labels.key?(_1) })
|
|
queued_labels[label] += 1
|
|
end
|
|
end
|
|
|
|
queued_labels.each do |label, count|
|
|
idle_runner_count = github_repository.runners_dataset.where(label: label, workflow_job: nil).count
|
|
# The calculation of the required_runner_count isn't atomic because it
|
|
# requires multiple API calls and database queries. However, it will
|
|
# eventually settle on the correct value. If we create more runners than
|
|
# necessary, the excess will be recycled after 5 minutes at no extra cost
|
|
# to the customer. If fewer runners are created than needed, the system
|
|
# will generate more in the next cycle.
|
|
next if (required_runner_count = count - idle_runner_count) && required_runner_count <= 0
|
|
|
|
Clog.emit("extra runner needed") { {needed_extra_runner: {repository_name: github_repository.name, label: label, count: required_runner_count}} }
|
|
|
|
required_runner_count.times do
|
|
Prog::Vm::GithubRunner.assemble(
|
|
github_repository.installation,
|
|
repository_name: github_repository.name,
|
|
label: label
|
|
)
|
|
end
|
|
end
|
|
|
|
@polling_interval = (remaining_quota < 0.5) ? 15 * 60 : 5 * 60
|
|
end
|
|
|
|
def cleanup_cache
|
|
# Destroy cache entries not accessed in last 7 days or
|
|
# created more than 7 days ago and not accessed yet.
|
|
seven_days_ago = Time.now - 7 * 24 * 60 * 60
|
|
github_repository.cache_entries_dataset
|
|
.where { (last_accessed_at < seven_days_ago) | ((last_accessed_at =~ nil) & (created_at < seven_days_ago)) }
|
|
.limit(200)
|
|
.destroy
|
|
|
|
# Destroy cache entries if it is created 30 minutes ago
|
|
# but couldn't committed yet. 30 minutes decided as during
|
|
# our performance tests uploading 10GB of data (which is
|
|
# the max size for a single cache entry) takes ~8 minutes at most.
|
|
# To be on the safe side, ~2x buffer is added.
|
|
github_repository.cache_entries_dataset
|
|
.where { created_at < Time.now - 30 * 60 }
|
|
.where { committed_at =~ nil }
|
|
.limit(200)
|
|
.destroy
|
|
|
|
# Destroy oldest cache entries if the total usage exceeds the limit.
|
|
dataset = github_repository.cache_entries_dataset.exclude(size: nil)
|
|
total_usage = dataset.sum(:size).to_i
|
|
storage_limit = github_repository.installation.project.effective_quota_value("GithubRunnerCacheStorage") * 1024 * 1024 * 1024
|
|
if total_usage > storage_limit
|
|
dataset.order(:created_at).limit(200).each do |oldest_entry|
|
|
break if total_usage <= storage_limit
|
|
oldest_entry.destroy
|
|
total_usage -= oldest_entry.size
|
|
end
|
|
end
|
|
|
|
if github_repository.cache_entries.empty?
|
|
Clog.emit("Deleting empty bucket and tokens") { {deleting_empty_bucket: {repository_name: github_repository.name}} }
|
|
github_repository.destroy_blob_storage
|
|
end
|
|
end
|
|
|
|
def before_run
|
|
when_destroy_set? do
|
|
if strand.label != "destroy"
|
|
register_deadline(nil, 5 * 60)
|
|
hop_destroy
|
|
end
|
|
end
|
|
end
|
|
|
|
label def wait
|
|
cleanup_cache if github_repository.access_key
|
|
nap 15 * 60 if Time.now - github_repository.last_job_at > 6 * 60 * 60
|
|
|
|
begin
|
|
check_queued_jobs if Config.enable_github_workflow_poller
|
|
rescue Octokit::NotFound
|
|
Clog.emit("not found repository") { {not_found_repository: {repository_name: github_repository.name}} }
|
|
if github_repository.runners.count == 0
|
|
github_repository.incr_destroy
|
|
nap 0
|
|
end
|
|
end
|
|
|
|
# check_queued_jobs may have changed the default polling interval based on
|
|
# the remaining rate limit.
|
|
nap polling_interval
|
|
end
|
|
|
|
label def destroy
|
|
decr_destroy
|
|
|
|
unless github_repository.runners.empty?
|
|
Clog.emit("Cannot destroy repository with active runners") { {not_destroyed_repository: {repository_name: github_repository.name}} }
|
|
nap 5 * 60
|
|
end
|
|
|
|
github_repository.destroy
|
|
|
|
pop "github repository destroyed"
|
|
end
|
|
end
|