Files
ubicloud/bin/respirate
Jeremy Evans 48582ef4bc Add max partition check when starting respirate
Similar to recent change for monitor.
2025-08-07 03:03:12 +09:00

93 lines
3.6 KiB
Ruby
Executable File

#!/usr/bin/env ruby
# frozen_string_literal: true
partition_number = ARGV[0]
partition_number ||= if (match = /respirate\.(\d+)\z/.match(ENV["DYNO"] || ENV["PS"]))
match[1] # Heroku/Foreman
end
if partition_number
partition_number = Integer(partition_number)
raise "invalid partition_number: #{partition_number}" if partition_number < 1 || partition_number > 256
end
require_relative "../loader"
d = Scheduling::Dispatcher.new(partition_number:)
Signal.trap("INT") { d.shutdown }
Signal.trap("TERM") { d.shutdown }
if Config.heartbeat_url
puts "Starting heartbeat prog"
# We always insert the heartbeat using the same UBID ("stheartbeatheartbheartheaz")
Strand.dataset.insert_conflict.insert(id: "8b958d2d-cad4-5f3a-5634-b8b958d45caf", prog: "Heartbeat", label: "wait")
end
if Config.github_app_id
# We always insert this strand using the same UBID ("stredelivergith0bfail0reaz")
Strand.dataset.insert_conflict.insert(id: "c39ae087-6ec4-033a-d440-b7a821061caf", prog: "RedeliverGithubFailures", label: "wait", stack: [{last_check_at: Time.now}].to_json)
end
# We always insert this strand using the same UBID ("stresolvee4block0dnsnamesz")
Strand.dataset.insert_conflict.insert(id: "c3b200ed-ce22-c33a-0326-06d735551d9f", prog: "ResolveGloballyBlockedDnsnames", label: "wait")
# We always insert this strand using the same UBID ("stcheckzvsagezalertszzzzza")
Strand.dataset.insert_conflict.insert(id: "645cc9ff-7954-1f3a-fa82-ec6b3ffffff5", prog: "CheckUsageAlerts", label: "wait")
# We always insert this strand using the same UBID ("stexp1repr0ject1nv1tat10na")
Strand.dataset.insert_conflict.insert(id: "776c1c3a-d804-9f3a-6683-5d874ad04155", prog: "ExpireProjectInvitations", label: "wait")
# We always insert this strand using the same UBID ("st10g0vmh0st0vt111zat10nzz")
Strand.dataset.insert_conflict.insert(id: "08200dd2-20ce-833a-de82-10fd5a082bff", prog: "LogVmHostUtilizations", label: "wait")
clover_freeze
if partition_number
# Start with a random offset, so that multiple respirate processes are unlikely
# to run the old strand scan at the same time.
next_old_strand_scan = Time.now + (5 * rand)
end
no_old_strands = true
DB.synchronize do
until d.shutting_down
no_strands = d.start_cohort
break if d.shutting_down
if next_old_strand_scan&.<(Time.now)
# Check every 5 seconds for strands still not leased by another respirate
# process. However, if we find old strands, do not wait 5 seconds, because
# if we are processing strands from another partition, we should ensure
# they are all processed.
if (no_old_strands = d.start_cohort(d.scan_old))
next_old_strand_scan = Time.now + 5
end
end
if no_strands && no_old_strands && !d.shutting_down
sleep_duration_sec = d.sleep_duration
# Only sleep if not shutting down and there were no strands in previous scan.
# Note that this results in an up to a 1 second delay to pick up new strands,
# but that is an accept tradeoff, because we do not want to busy loop the
# database (potentially, LISTEN/NOTIFY could be used to reduce this latency)
sleep sleep_duration_sec
Clog.emit("respirate finished sleep") { {sleep_duration_sec:} }
end
end
end
Clog.emit("Shutting down.") { {unfinished_strand_count: d.num_current_strands} }
exit_status = 1
# Wait up to 2 seconds for all strand threads to exit.
# We cannot wait very long, as every second we wait is potentially an
# additional second that no respirate process is processing new strands.
Thread.new do
d.shutdown_and_cleanup_threads
exit_status = 0
end.join(2)
exit exit_status