mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-05 06:12:09 +08:00
By setting `SSH_SESSION_LOCK_NAME`, activate the conflict logging for respirate SSH sessions. Keeping this commit separate makes it easy to revert in case there are side effects.
94 lines
3.6 KiB
Ruby
Executable file
94 lines
3.6 KiB
Ruby
Executable file
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
partition_number = ARGV[0]
|
|
partition_number ||= if (match = /respirate\.(\d+)\z/.match(ENV["DYNO"] || ENV["PS"]))
|
|
match[1] # Heroku/Foreman
|
|
end
|
|
|
|
if partition_number
|
|
partition_number = Integer(partition_number)
|
|
raise "invalid partition_number: #{partition_number}" if partition_number < 1 || partition_number > 256
|
|
end
|
|
|
|
require_relative "../loader"
|
|
|
|
d = Scheduling::Dispatcher.new(partition_number:)
|
|
Signal.trap("INT") { d.shutdown }
|
|
Signal.trap("TERM") { d.shutdown }
|
|
|
|
if Config.heartbeat_url
|
|
puts "Starting heartbeat prog"
|
|
# We always insert the heartbeat using the same UBID ("stheartbeatheartbheartheaz")
|
|
Strand.dataset.insert_conflict.insert(id: "8b958d2d-cad4-5f3a-5634-b8b958d45caf", prog: "Heartbeat", label: "wait")
|
|
end
|
|
|
|
if Config.github_app_id
|
|
# We always insert this strand using the same UBID ("stredelivergith0bfail0reaz")
|
|
Strand.dataset.insert_conflict.insert(id: "c39ae087-6ec4-033a-d440-b7a821061caf", prog: "RedeliverGithubFailures", label: "wait", stack: [{last_check_at: Time.now}].to_json)
|
|
end
|
|
|
|
# We always insert this strand using the same UBID ("stresolvee4block0dnsnamesz")
|
|
Strand.dataset.insert_conflict.insert(id: "c3b200ed-ce22-c33a-0326-06d735551d9f", prog: "ResolveGloballyBlockedDnsnames", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("stcheckzvsagezalertszzzzza")
|
|
Strand.dataset.insert_conflict.insert(id: "645cc9ff-7954-1f3a-fa82-ec6b3ffffff5", prog: "CheckUsageAlerts", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("stexp1repr0ject1nv1tat10na")
|
|
Strand.dataset.insert_conflict.insert(id: "776c1c3a-d804-9f3a-6683-5d874ad04155", prog: "ExpireProjectInvitations", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("st10g0vmh0st0vt111zat10nzz")
|
|
Strand.dataset.insert_conflict.insert(id: "08200dd2-20ce-833a-de82-10fd5a082bff", prog: "LogVmHostUtilizations", label: "wait")
|
|
|
|
SSH_SESSION_LOCK_NAME = "respirate"
|
|
|
|
clover_freeze
|
|
|
|
if partition_number
|
|
# Start with a random offset, so that multiple respirate processes are unlikely
|
|
# to run the old strand scan at the same time.
|
|
next_old_strand_scan = Time.now + (5 * rand)
|
|
end
|
|
no_old_strands = true
|
|
|
|
DB.synchronize do
|
|
until d.shutting_down
|
|
no_strands = d.start_cohort
|
|
|
|
break if d.shutting_down
|
|
|
|
if next_old_strand_scan&.<(Time.now)
|
|
# Check every 5 seconds for strands still not leased by another respirate
|
|
# process. However, if we find old strands, do not wait 5 seconds, because
|
|
# if we are processing strands from another partition, we should ensure
|
|
# they are all processed.
|
|
if (no_old_strands = d.start_cohort(d.scan_old))
|
|
next_old_strand_scan = Time.now + 5
|
|
end
|
|
end
|
|
|
|
if no_strands && no_old_strands && !d.shutting_down
|
|
sleep_duration_sec = d.sleep_duration
|
|
# Only sleep if not shutting down and there were no strands in previous scan.
|
|
# Note that this results in an up to a 1 second delay to pick up new strands,
|
|
# but that is an accept tradeoff, because we do not want to busy loop the
|
|
# database (potentially, LISTEN/NOTIFY could be used to reduce this latency)
|
|
sleep sleep_duration_sec
|
|
Clog.emit("respirate finished sleep") { {sleep_duration_sec:} }
|
|
end
|
|
end
|
|
end
|
|
|
|
Clog.emit("Shutting down.") { {unfinished_strand_count: d.num_current_strands} }
|
|
|
|
exit_status = 1
|
|
|
|
# Wait up to 2 seconds for all strand threads to exit.
|
|
# We cannot wait very long, as every second we wait is potentially an
|
|
# additional second that no respirate process is processing new strands.
|
|
Thread.new do
|
|
d.shutdown_and_cleanup_threads
|
|
exit_status = 0
|
|
end.join(2)
|
|
|
|
exit exit_status
|