Files
ubicloud/bin/respirate
Enes Cakir 23e958d182 Create worker strands from a list
We have some worker strands that process tasks on a schedule, similar to
cron jobs. We create them when starting the respirate if they haven't
been created yet.

We can use a list and import to avoid duplicating code.
2025-06-19 18:49:26 +03:00

73 lines
2.5 KiB
Ruby
Executable File

#!/usr/bin/env ruby
# frozen_string_literal: true
partition_number = ARGV[0]
partition_number ||= if (dyno = ENV["DYNO"])
dyno.split(".", 2)[1] # Heroku
elsif (match = /respirate\.(\d+)\z/.match(ENV["PS"]))
match[1] # Foreman
end
if partition_number
partition_number = Integer(partition_number)
raise "invalid partition_number: #{partition_number}" if partition_number < 1
end
require_relative "../loader"
d = Scheduling::Dispatcher.new(partition_number:)
Signal.trap("TERM") { d.shutdown }
workers = [
{ubid: "stresolvee4block0dnsnamesz", prog: "ResolveGloballyBlockedDnsnames"},
{ubid: "stcheckzvsagezalertszzzzza", prog: "CheckUsageAlerts"},
{ubid: "stexp1repr0ject1nv1tat10na", prog: "ExpireProjectInvitations"},
{ubid: "st10g0vmh0st0vt111zat10nzz", prog: "LogVmHostUtilizations"},
Config.heartbeat_url ? {ubid: "stheartbeatheartbheartheaz", prog: "Heartbeat"} : nil,
Config.github_app_id ? {ubid: "stredelivergith0bfail0reaz", prog: "RedeliverGithubFailures", stack: [{last_check_at: Time.now}]} : nil
]
workers.compact!
Strand.dataset.insert_conflict.import(
[:id, :prog, :label, :stack],
workers.map { [UBID.parse(it[:ubid]).to_uuid, it[:prog], "wait", (it[:stack] || [{}]).to_json] }
)
clover_freeze
if partition_number
# Start with a random offset, so that multiple respirate processes are unlikely
# to run the old strand scan at the same time.
next_old_strand_scan = Time.now + (5 * rand)
end
no_old_strands = true
DB.synchronize do
until d.shutting_down
no_strands = d.start_cohort
break if d.shutting_down
if next_old_strand_scan&.<(Time.now)
# Check every 5 seconds for strands still not leased by another respirate
# process. However, if we find old strands, do not wait 5 seconds, because
# if we are processing strands from another partition, we should ensure
# they are all processed.
if (no_old_strands = d.start_cohort(d.scan_old))
next_old_strand_scan = Time.now + 5
end
end
if no_strands && no_old_strands && !d.shutting_down
# Only sleep if not shutting down and there were no strands in previous scan.
# Note that this results in an up to a 1 second delay to pick up new strands,
# but that is an accept tradeoff, because we do not want to busy loop the
# database (potentially, LISTEN/NOTIFY could be used to reduce this latency)
duration_slept = sleep 1
Clog.emit("respirate finished sleep") { {sleep_duration_sec: duration_slept} }
end
end
end
Clog.emit("Shutting down.") { {unfinished_strand_count: d.num_current_strands} }