Files
ubicloud/routes/runtime/github.rb
Burak Velioglu b367244781 Setup blob storage only when adding cache entries
In the current logic, blob storage bucket is destroyed if
it exists but has no cache entries. Looking at the logs
we noticed some buckets repeatedly getting deleted. This
happens because users only request cache entries without
ever adding new ones.

To fix this, blob storage bucket will now only be created
when adding a new cache entry. Subsequent get requests will
still work since the blob storage will already exist once
an entry is added.
2025-08-20 16:51:46 +03:00

201 lines
8.9 KiB
Ruby

# frozen_string_literal: true
class Clover
hash_branch(:runtime_prefix, "github") do |r|
if (runner = GithubRunner[vm_id: @vm.id]).nil? || (repository = runner.repository).nil?
fail CloverError.new(400, "InvalidRequest", "invalid JWT format or claim in Authorization header")
end
# getCacheEntry
r.get "cache" do
keys, version = typecast_params.nonempty_str!(%w[keys version])
keys = keys.split(",")
dataset = repository.cache_entries_dataset.exclude(committed_at: nil).where(version: version)
unless repository.installation.project.get_ff_access_all_cache_scopes
# Clients can send multiple keys, and we look for caches in multiple scopes.
# We prioritize scope over key, returning the cache for the first matching
# key in the head branch scope, followed by the first matching key in
# default branch scope.
scopes = [runner.workflow_job&.dig("head_branch") || get_scope_from_github(runner, typecast_params.str("runId")), repository.default_branch]
scopes.compact!
scopes.uniq!
dataset = dataset.where(scope: scopes)
.order(Sequel.case(scopes.map.with_index { |scope, idx| [{scope:}, idx] }.to_h, scopes.length))
end
entry = dataset
.where(key: keys)
.order_append(Sequel.case(keys.map.with_index { |key, idx| [{key:}, idx] }.to_h, keys.length))
.first
# GitHub cache supports prefix match if the key doesn't match exactly.
# From their docs:
# When a key doesn't match directly, the action searches for keys
# prefixed with the restore key. If there are multiple partial matches
# for a restore key, the action returns the most recently created cache.
#
# We still prioritize scope over key in this case, and if there are
# multiple prefix matches for a key, this chooses the most recent.
entry ||= dataset
.grep(:key, keys.map { |key| "#{DB.dataset.escape_like(key)}%" })
.order_append(Sequel.case(keys.map.with_index { |key, idx| [Sequel.like(:key, "#{DB.dataset.escape_like(key)}%"), idx] }.to_h, keys.length), Sequel.desc(:created_at))
.first
entry_updated = entry && entry.this.update(last_accessed_at: Sequel::CURRENT_TIMESTAMP, last_accessed_by: runner.id) == 1
# If was not found or entry no longer exists, return 204 to indicate so to GitHub.
next 204 unless entry_updated
signed_url = repository.url_presigner.presigned_url(:get_object, bucket: repository.bucket_name, key: entry.blob_key, expires_in: 900)
{
scope: entry.scope,
cacheKey: entry.key,
cacheVersion: entry.version,
creationTime: entry.created_at,
archiveLocation: signed_url
}
end
r.on "caches" do
begin
repository.setup_blob_storage unless repository.access_key
rescue Excon::Error::HTTPStatus => ex
Clog.emit("Unable to setup blob storage") { {failed_blob_storage_setup: {ubid: runner.ubid, repository_ubid: repository.ubid, response: ex.response.body}} }
fail CloverError.new(400, "InvalidRequest", "unable to setup blob storage")
end
r.is do
# listCache
r.get do
unless (key = typecast_params.nonempty_str("key"))
fail CloverError.new(204, "NotFound", "No cache entry")
end
scopes = [runner.workflow_job&.dig("head_branch"), repository.default_branch].compact
entries = repository.cache_entries_dataset
.exclude(committed_at: nil)
.where(key: key, scope: scopes)
.order(:version).all
{
totalCount: entries.count,
artifactCaches: entries.map do
{
scope: it.scope,
cacheKey: it.key,
cacheVersion: it.version,
creationTime: it.created_at
}
end
}
end
# reserveCache
r.post do
key, version = typecast_params.nonempty_str!(%w[key version])
size = typecast_params.pos_int("cacheSize")
unless (scope = runner.workflow_job&.dig("head_branch") || get_scope_from_github(runner, typecast_params.nonempty_str("runId")))
Clog.emit("The runner does not have a workflow job") { {no_workflow_job: {ubid: runner.ubid, repository_ubid: repository.ubid}} }
fail CloverError.new(400, "InvalidRequest", "No workflow job data available")
end
if size && size > GithubRepository::CACHE_SIZE_LIMIT
fail CloverError.new(400, "InvalidRequest", "The cache size is over the 10GB limit")
end
unless GithubCacheEntry.where(repository_id: runner.repository.id, scope:, key:, version:).empty?
fail CloverError.new(409, "AlreadyExists", "A cache entry for #{scope} scope already exists with #{key} key and #{version} version.")
end
# Need id for blob_key, but don't save record yet
entry = GithubCacheEntry.new_with_id(repository_id: runner.repository.id, key:, version:, size:, scope:, created_by: runner.id)
blob_key = entry.blob_key
bucket = repository.bucket_name
blob_storage_client = repository.blob_storage_client
# Token creation on Cloudflare R2 takes time to propagate. Since that point is the
# first time we use the credentials, we are waiting it to be propagated. Note that,
# credential propagation will happen only while the bucket and token are being created
# initially. So, the retry block expected to run only while saving the first cache
# entry for a repository.
retries = 0
begin
upload_id = blob_storage_client.create_multipart_upload(bucket:, key: blob_key).upload_id
rescue Aws::S3::Errors::Unauthorized, Aws::S3::Errors::InternalError, Aws::S3::Errors::NoSuchBucket => ex
retries += 1
if retries < 3
# :nocov:
sleep(1) unless Config.test?
# :nocov:
retry
else
Clog.emit("Could not authorize multipart upload") { {could_not_authorize_multipart_upload: {ubid: runner.ubid, repository_ubid: repository.ubid, exception: Util.exception_to_hash(ex)}} }
fail CloverError.new(400, "InvalidRequest", "Could not authorize multipart upload")
end
end
begin
entry.update(upload_id:)
rescue Sequel::ValidationFailed, Sequel::UniqueConstraintViolation
fail CloverError.new(409, "AlreadyExists", "A cache entry for #{scope} scope already exists with #{key} key and #{version} version.")
end
# If size is not provided, it means that the client doesn't
# let us know the size of the cache. In this case, we use the
# GithubRepository::CACHE_SIZE_LIMIT as the size.
size ||= GithubRepository::CACHE_SIZE_LIMIT
max_chunk_size = 32 * 1024 * 1024 # 32MB
presigned_urls = (1..size.fdiv(max_chunk_size).ceil).map do
repository.url_presigner.presigned_url(:upload_part, bucket: repository.bucket_name, key: entry.blob_key, upload_id: upload_id, part_number: it, expires_in: 900)
end
{
uploadId: upload_id,
presignedUrls: presigned_urls,
chunkSize: max_chunk_size
}
end
end
# commitCache
r.post "commit" do
etags = typecast_params.array!(:nonempty_str, "etags")
upload_id = typecast_params.nonempty_str!("uploadId")
size = typecast_params.pos_int!("size")
entry = GithubCacheEntry[repository_id: repository.id, upload_id: upload_id, committed_at: nil]
fail CloverError.new(204, "NotFound", "No cache entry") if entry.nil? || (entry.size && entry.size != size)
begin
repository.blob_storage_client.complete_multipart_upload({
bucket: repository.bucket_name,
key: entry.blob_key,
upload_id: upload_id,
multipart_upload: {parts: etags.map.with_index { {part_number: _2 + 1, etag: _1} }}
})
rescue Aws::S3::Errors::InvalidPart, Aws::S3::Errors::NoSuchUpload => ex
Clog.emit("could not complete multipart upload") { {failed_multipart_upload: {ubid: runner.ubid, repository_ubid: repository.ubid, exception: Util.exception_to_hash(ex)}} }
fail CloverError.new(400, "InvalidRequest", "Wrong parameters")
rescue Aws::S3::Errors::ServiceUnavailable => ex
Clog.emit("s3 service unavailable") { {failed_multipart_upload: {ubid: runner.ubid, repository_ubid: repository.ubid, exception: Util.exception_to_hash(ex)}} }
fail CloverError.new(503, "ServiceUnavailable", "Service unavailable")
end
updates = {committed_at: Time.now}
# If the size can not be set with reserveCache, we set it here.
updates[:size] = size if entry.size.nil?
entry.update(updates)
{}
end
end
end
end