Kernel#sleep returns the duration slept rounded to an integer, which is less helpful when sleeps are going to be between 0.2 and 1. Return the number of seconds that sleep was requested for instead, which can be fractional. This should allow for proper calculation of how long respirate scan threads are actually sleeping.
89 lines
3.5 KiB
Ruby
Executable File
89 lines
3.5 KiB
Ruby
Executable File
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
partition_number = ARGV[0]
|
|
partition_number ||= if (match = /respirate\.(\d+)\z/.match(ENV["DYNO"] || ENV["PS"]))
|
|
match[1] # Heroku/Foreman
|
|
end
|
|
|
|
if partition_number
|
|
partition_number = Integer(partition_number)
|
|
raise "invalid partition_number: #{partition_number}" if partition_number < 1
|
|
end
|
|
|
|
require_relative "../loader"
|
|
|
|
d = Scheduling::Dispatcher.new(partition_number:)
|
|
Signal.trap("TERM") { d.shutdown }
|
|
|
|
if Config.heartbeat_url
|
|
puts "Starting heartbeat prog"
|
|
# We always insert the heartbeat using the same UBID ("stheartbeatheartbheartheaz")
|
|
Strand.dataset.insert_conflict.insert(id: "8b958d2d-cad4-5f3a-5634-b8b958d45caf", prog: "Heartbeat", label: "wait")
|
|
end
|
|
|
|
if Config.github_app_id
|
|
# We always insert this strand using the same UBID ("stredelivergith0bfail0reaz")
|
|
Strand.dataset.insert_conflict.insert(id: "c39ae087-6ec4-033a-d440-b7a821061caf", prog: "RedeliverGithubFailures", label: "wait", stack: [{last_check_at: Time.now}].to_json)
|
|
end
|
|
|
|
# We always insert this strand using the same UBID ("stresolvee4block0dnsnamesz")
|
|
Strand.dataset.insert_conflict.insert(id: "c3b200ed-ce22-c33a-0326-06d735551d9f", prog: "ResolveGloballyBlockedDnsnames", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("stcheckzvsagezalertszzzzza")
|
|
Strand.dataset.insert_conflict.insert(id: "645cc9ff-7954-1f3a-fa82-ec6b3ffffff5", prog: "CheckUsageAlerts", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("stexp1repr0ject1nv1tat10na")
|
|
Strand.dataset.insert_conflict.insert(id: "776c1c3a-d804-9f3a-6683-5d874ad04155", prog: "ExpireProjectInvitations", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("st10g0vmh0st0vt111zat10nzz")
|
|
Strand.dataset.insert_conflict.insert(id: "08200dd2-20ce-833a-de82-10fd5a082bff", prog: "LogVmHostUtilizations", label: "wait")
|
|
|
|
clover_freeze
|
|
|
|
if partition_number
|
|
# Start with a random offset, so that multiple respirate processes are unlikely
|
|
# to run the old strand scan at the same time.
|
|
next_old_strand_scan = Time.now + (5 * rand)
|
|
end
|
|
no_old_strands = true
|
|
|
|
DB.synchronize do
|
|
until d.shutting_down
|
|
no_strands = d.start_cohort
|
|
|
|
break if d.shutting_down
|
|
|
|
if next_old_strand_scan&.<(Time.now)
|
|
# Check every 5 seconds for strands still not leased by another respirate
|
|
# process. However, if we find old strands, do not wait 5 seconds, because
|
|
# if we are processing strands from another partition, we should ensure
|
|
# they are all processed.
|
|
if (no_old_strands = d.start_cohort(d.scan_old))
|
|
next_old_strand_scan = Time.now + 5
|
|
end
|
|
end
|
|
|
|
if no_strands && no_old_strands && !d.shutting_down
|
|
sleep_duration_sec = d.sleep_duration
|
|
# Only sleep if not shutting down and there were no strands in previous scan.
|
|
# Note that this results in an up to a 1 second delay to pick up new strands,
|
|
# but that is an accept tradeoff, because we do not want to busy loop the
|
|
# database (potentially, LISTEN/NOTIFY could be used to reduce this latency)
|
|
sleep sleep_duration_sec
|
|
Clog.emit("respirate finished sleep") { {sleep_duration_sec:} }
|
|
end
|
|
end
|
|
end
|
|
|
|
Clog.emit("Shutting down.") { {unfinished_strand_count: d.num_current_strands} }
|
|
|
|
# Wait up to 2 seconds for all strand threads to exit.
|
|
# We cannot wait very long, as every second we wait is potentially an
|
|
# additional second that no respirate process is processing new strands.
|
|
Thread.new do
|
|
d.shutdown_and_cleanup_threads
|
|
exit 0
|
|
end.join(2)
|
|
exit 1
|
|
|