Files
ubicloud/bin/respirate
Jeremy Evans 391d38332f Fix logged respirate sleep_duration_sec
Kernel#sleep returns the duration slept rounded to an integer, which
is less helpful when sleeps are going to be between 0.2 and 1.
Return the number of seconds that sleep was requested for instead,
which can be fractional. This should allow for proper calculation of
how long respirate scan threads are actually sleeping.
2025-07-10 04:23:15 +09:00

89 lines
3.5 KiB
Ruby
Executable File

#!/usr/bin/env ruby
# frozen_string_literal: true
partition_number = ARGV[0]
partition_number ||= if (match = /respirate\.(\d+)\z/.match(ENV["DYNO"] || ENV["PS"]))
match[1] # Heroku/Foreman
end
if partition_number
partition_number = Integer(partition_number)
raise "invalid partition_number: #{partition_number}" if partition_number < 1
end
require_relative "../loader"
d = Scheduling::Dispatcher.new(partition_number:)
Signal.trap("TERM") { d.shutdown }
if Config.heartbeat_url
puts "Starting heartbeat prog"
# We always insert the heartbeat using the same UBID ("stheartbeatheartbheartheaz")
Strand.dataset.insert_conflict.insert(id: "8b958d2d-cad4-5f3a-5634-b8b958d45caf", prog: "Heartbeat", label: "wait")
end
if Config.github_app_id
# We always insert this strand using the same UBID ("stredelivergith0bfail0reaz")
Strand.dataset.insert_conflict.insert(id: "c39ae087-6ec4-033a-d440-b7a821061caf", prog: "RedeliverGithubFailures", label: "wait", stack: [{last_check_at: Time.now}].to_json)
end
# We always insert this strand using the same UBID ("stresolvee4block0dnsnamesz")
Strand.dataset.insert_conflict.insert(id: "c3b200ed-ce22-c33a-0326-06d735551d9f", prog: "ResolveGloballyBlockedDnsnames", label: "wait")
# We always insert this strand using the same UBID ("stcheckzvsagezalertszzzzza")
Strand.dataset.insert_conflict.insert(id: "645cc9ff-7954-1f3a-fa82-ec6b3ffffff5", prog: "CheckUsageAlerts", label: "wait")
# We always insert this strand using the same UBID ("stexp1repr0ject1nv1tat10na")
Strand.dataset.insert_conflict.insert(id: "776c1c3a-d804-9f3a-6683-5d874ad04155", prog: "ExpireProjectInvitations", label: "wait")
# We always insert this strand using the same UBID ("st10g0vmh0st0vt111zat10nzz")
Strand.dataset.insert_conflict.insert(id: "08200dd2-20ce-833a-de82-10fd5a082bff", prog: "LogVmHostUtilizations", label: "wait")
clover_freeze
if partition_number
# Start with a random offset, so that multiple respirate processes are unlikely
# to run the old strand scan at the same time.
next_old_strand_scan = Time.now + (5 * rand)
end
no_old_strands = true
DB.synchronize do
until d.shutting_down
no_strands = d.start_cohort
break if d.shutting_down
if next_old_strand_scan&.<(Time.now)
# Check every 5 seconds for strands still not leased by another respirate
# process. However, if we find old strands, do not wait 5 seconds, because
# if we are processing strands from another partition, we should ensure
# they are all processed.
if (no_old_strands = d.start_cohort(d.scan_old))
next_old_strand_scan = Time.now + 5
end
end
if no_strands && no_old_strands && !d.shutting_down
sleep_duration_sec = d.sleep_duration
# Only sleep if not shutting down and there were no strands in previous scan.
# Note that this results in an up to a 1 second delay to pick up new strands,
# but that is an accept tradeoff, because we do not want to busy loop the
# database (potentially, LISTEN/NOTIFY could be used to reduce this latency)
sleep sleep_duration_sec
Clog.emit("respirate finished sleep") { {sleep_duration_sec:} }
end
end
end
Clog.emit("Shutting down.") { {unfinished_strand_count: d.num_current_strands} }
# Wait up to 2 seconds for all strand threads to exit.
# We cannot wait very long, as every second we wait is potentially an
# additional second that no respirate process is processing new strands.
Thread.new do
d.shutdown_and_cleanup_threads
exit 0
end.join(2)
exit 1