Instead of having a fixed partition of the uuid space, workers now only have a fixed partition number (based on the ARGV[0], DYNO on Heroku, or PS on Foreman). If a partition number is given, the dispatcher creates a repartition thread. This thread uses LISTEN to receive messages from other respirate processes. When it starts, and every 2 seconds after, it uses NOTIFY to notify other listening processes of its existance. If it has been more than 4 seconds since it has been notified that a respirate process exists, it assumes the respirate process has exited. When it receives a notification that a respirate process has been added with a higher number than the number of partitions it is currently configured for, it repartitions the uuid space it monitors (decreasing the size), and updates the number of current partitions. If it removes stale respirate processes, and the highest remaining process is lower than the number of partitions it is currently configured for, it repartitions the uuid space it monitors (increasing the size), and updates the number of current partitions. Let's say you are running 4 respirate workers: respirate.{1,2,3,4} These will quickly divide the uuid space into 4 partitions. If you add respirate.5 and respirate.6, the existing processes will quickly repartition to use 6 partitions. If you remove respirate.6, the existing processes will repartition to use 5 partitions within 4-6 seconds. If you then remove respirate.4, it will not repartition, since respirate.5 is still running. This is not designed to repartition if respirate processes crash, only if the number of respirate processes increases or decreases, with the assumption that if any increases will start at the number after the highest current process, and decreases will remove starting with the highest number. While this does not repartition for crashed respirate processeses, strands for those processes are still handled by the old strand scan. This moves the uuid partitioning logic from respirate to dispatcher, since it no longer static. Similarly, creating the prepared statement for the scan query is moved to a separate method, since it is no longer only called by initialize. This may appear to be thread unsafe, but the way Sequel works, is before every prepared statement execution, it checks whether the SQL of the already prepared statement on the connection matches what the current SQL for the prepared statement for the Database object, and if it doesn't match, it DEALLOCATEs the existing prepared statement on the connection, and the PREPAREs a new statement, using the current SQL (in this case, the new partition.) Change the smoke test when running fewer processes than partitions (such as the 3/4 test) to not create processes for the lower partition numbers instead of not creating processes for the higher partition numbers. Otherwise, due to the rebalancing, instead of testing the behavior with crashed respirate processes, you end up only testing with a lower number of total processes. To monitor the partitioning, this emits logs in 2 additional cases: * When initially partitioning or repartitioning, so you can see what partition each respirate process is monitoring. * If an unexpected notification is received while listening. This should never happen, but we do not want to crash the repartition thread if it does.
81 lines
3.2 KiB
Ruby
Executable File
81 lines
3.2 KiB
Ruby
Executable File
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
partition_number = ARGV[0]
|
|
partition_number ||= if (dyno = ENV["DYNO"])
|
|
dyno.split(".", 2)[1] # Heroku
|
|
elsif (match = /respirate\.(\d+)\z/.match(ENV["PS"]))
|
|
match[1] # Foreman
|
|
end
|
|
|
|
if partition_number
|
|
partition_number = Integer(partition_number)
|
|
raise "invalid partition_number: #{partition_number}" if partition_number < 1
|
|
end
|
|
|
|
require_relative "../loader"
|
|
|
|
d = Scheduling::Dispatcher.new(partition_number:)
|
|
Signal.trap("TERM") { d.shutdown }
|
|
|
|
if Config.heartbeat_url
|
|
puts "Starting heartbeat prog"
|
|
# We always insert the heartbeat using the same UBID ("stheartbeatheartbheartheaz")
|
|
Strand.dataset.insert_conflict.insert(id: "8b958d2d-cad4-5f3a-5634-b8b958d45caf", prog: "Heartbeat", label: "wait")
|
|
end
|
|
|
|
if Config.github_app_id
|
|
# We always insert this strand using the same UBID ("stredelivergith0bfail0reaz")
|
|
Strand.dataset.insert_conflict.insert(id: "c39ae087-6ec4-033a-d440-b7a821061caf", prog: "RedeliverGithubFailures", label: "wait", stack: [{last_check_at: Time.now}].to_json)
|
|
end
|
|
|
|
# We always insert this strand using the same UBID ("stresolvee4block0dnsnamesz")
|
|
Strand.dataset.insert_conflict.insert(id: "c3b200ed-ce22-c33a-0326-06d735551d9f", prog: "ResolveGloballyBlockedDnsnames", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("stcheckzvsagezalertszzzzza")
|
|
Strand.dataset.insert_conflict.insert(id: "645cc9ff-7954-1f3a-fa82-ec6b3ffffff5", prog: "CheckUsageAlerts", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("stexp1repr0ject1nv1tat10na")
|
|
Strand.dataset.insert_conflict.insert(id: "776c1c3a-d804-9f3a-6683-5d874ad04155", prog: "ExpireProjectInvitations", label: "wait")
|
|
|
|
# We always insert this strand using the same UBID ("st10g0vmh0st0vt111zat10nzz")
|
|
Strand.dataset.insert_conflict.insert(id: "08200dd2-20ce-833a-de82-10fd5a082bff", prog: "LogVmHostUtilizations", label: "wait")
|
|
|
|
clover_freeze
|
|
|
|
if partition_number
|
|
# Start with a random offset, so that multiple respirate processes are unlikely
|
|
# to run the old strand scan at the same time.
|
|
next_old_strand_scan = Time.now + (5 * rand)
|
|
end
|
|
no_old_strands = true
|
|
|
|
DB.synchronize do
|
|
until d.shutting_down
|
|
no_strands = d.start_cohort
|
|
|
|
break if d.shutting_down
|
|
|
|
if next_old_strand_scan&.<(Time.now)
|
|
# Check every 5 seconds for strands still not leased by another respirate
|
|
# process. However, if we find old strands, do not wait 5 seconds, because
|
|
# if we are processing strands from another partition, we should ensure
|
|
# they are all processed.
|
|
if (no_old_strands = d.start_cohort(d.scan_old))
|
|
next_old_strand_scan = Time.now + 5
|
|
end
|
|
end
|
|
|
|
if no_strands && no_old_strands && !d.shutting_down
|
|
# Only sleep if not shutting down and there were no strands in previous scan.
|
|
# Note that this results in an up to a 1 second delay to pick up new strands,
|
|
# but that is an accept tradeoff, because we do not want to busy loop the
|
|
# database (potentially, LISTEN/NOTIFY could be used to reduce this latency)
|
|
duration_slept = sleep 1
|
|
Clog.emit("respirate finished sleep") { {sleep_duration_sec: duration_slept} }
|
|
end
|
|
end
|
|
end
|
|
|
|
Clog.emit("Shutting down.") { {unfinished_strand_count: d.num_current_strands} }
|