mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-05 06:12:09 +08:00
We have some worker strands that process tasks on a schedule, similar to cron jobs. We create them when starting the respirate if they haven't been created yet. We can use a list and import to avoid duplicating code.
86 lines
2.9 KiB
Ruby
Executable file
86 lines
2.9 KiB
Ruby
Executable file
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
partition_number = ARGV[0]
|
|
partition_number ||= if (match = /respirate\.(\d+)\z/.match(ENV["DYNO"] || ENV["PS"]))
|
|
match[1] # Heroku/Foreman
|
|
end
|
|
|
|
if partition_number
|
|
partition_number = Integer(partition_number)
|
|
raise "invalid partition_number: #{partition_number}" if partition_number < 1 || partition_number > 256
|
|
end
|
|
|
|
require_relative "../loader"
|
|
|
|
d = Scheduling::Dispatcher.new(partition_number:)
|
|
Signal.trap("INT") { d.shutdown }
|
|
Signal.trap("TERM") { d.shutdown }
|
|
|
|
workers = [
|
|
{ubid: "stresolvee4block0dnsnamesz", prog: "ResolveGloballyBlockedDnsnames"},
|
|
{ubid: "stcheckzvsagezalertszzzzza", prog: "CheckUsageAlerts"},
|
|
{ubid: "stexp1repr0ject1nv1tat10na", prog: "ExpireProjectInvitations"},
|
|
{ubid: "st10g0vmh0st0vt111zat10nzz", prog: "LogVmHostUtilizations"},
|
|
Config.heartbeat_url ? {ubid: "stheartbeatheartbheartheaz", prog: "Heartbeat"} : nil,
|
|
Config.github_app_id ? {ubid: "stredelivergith0bfail0reaz", prog: "RedeliverGithubFailures", stack: [{last_check_at: Time.now}]} : nil
|
|
]
|
|
workers.compact!
|
|
|
|
Strand.dataset.insert_conflict.import(
|
|
[:id, :prog, :label, :stack],
|
|
workers.map { [UBID.parse(it[:ubid]).to_uuid, it[:prog], "wait", (it[:stack] || [{}]).to_json] }
|
|
)
|
|
|
|
SSH_SESSION_LOCK_NAME = "respirate"
|
|
|
|
clover_freeze
|
|
|
|
if partition_number
|
|
# Start with a random offset, so that multiple respirate processes are unlikely
|
|
# to run the old strand scan at the same time.
|
|
next_old_strand_scan = Time.now + (5 * rand)
|
|
end
|
|
no_old_strands = true
|
|
|
|
DB.synchronize do
|
|
until d.shutting_down
|
|
no_strands = d.start_cohort
|
|
|
|
break if d.shutting_down
|
|
|
|
if next_old_strand_scan&.<(Time.now)
|
|
# Check every 5 seconds for strands still not leased by another respirate
|
|
# process. However, if we find old strands, do not wait 5 seconds, because
|
|
# if we are processing strands from another partition, we should ensure
|
|
# they are all processed.
|
|
if (no_old_strands = d.start_cohort(d.scan_old))
|
|
next_old_strand_scan = Time.now + 5
|
|
end
|
|
end
|
|
|
|
if no_strands && no_old_strands && !d.shutting_down
|
|
sleep_duration_sec = d.sleep_duration
|
|
# Only sleep if not shutting down and there were no strands in previous scan.
|
|
# Note that this results in an up to a 1 second delay to pick up new strands,
|
|
# but that is an accept tradeoff, because we do not want to busy loop the
|
|
# database (potentially, LISTEN/NOTIFY could be used to reduce this latency)
|
|
sleep sleep_duration_sec
|
|
Clog.emit("respirate finished sleep") { {sleep_duration_sec:} }
|
|
end
|
|
end
|
|
end
|
|
|
|
Clog.emit("Shutting down.") { {unfinished_strand_count: d.num_current_strands} }
|
|
|
|
exit_status = 1
|
|
|
|
# Wait up to 2 seconds for all strand threads to exit.
|
|
# We cannot wait very long, as every second we wait is potentially an
|
|
# additional second that no respirate process is processing new strands.
|
|
Thread.new do
|
|
d.shutdown_and_cleanup_threads
|
|
exit_status = 0
|
|
end.join(2)
|
|
|
|
exit exit_status
|