Files
ubicloud/spec/respirate_smoke_test.rb
Jeremy Evans 09cc4626cb Make partitioned respirate automatically rebalance based on number of workers
Instead of having a fixed partition of the uuid space, workers now
only have a fixed partition number (based on the ARGV[0], DYNO on
Heroku, or PS on Foreman).  If a partition number is given, the
dispatcher creates a repartition thread.  This thread uses LISTEN
to receive messages from other respirate processes. When it starts,
and every 2 seconds after, it uses NOTIFY to notify other listening
processes of its existance. If it has been more than 4 seconds
since it has been notified that a respirate process exists, it
assumes the respirate process has exited.

When it receives a notification that a respirate process has been
added with a higher number than the number of partitions it is
currently configured for, it repartitions the uuid space it monitors
(decreasing the size), and updates the number of current partitions.

If it removes stale respirate processes, and the highest remaining
process is lower than the number of partitions it is currently
configured for, it repartitions the uuid space it monitors
(increasing the size), and updates the number of current partitions.

Let's say you are running 4 respirate workers: respirate.{1,2,3,4}
These will quickly divide the uuid space into 4 partitions.  If you
add respirate.5 and respirate.6, the existing processes will quickly
repartition to use 6 partitions.  If you remove respirate.6, the
existing processes will repartition to use 5 partitions within 4-6
seconds.  If you then remove respirate.4, it will not repartition,
since respirate.5 is still running.  This is not designed to
repartition if respirate processes crash, only if the number of
respirate processes increases or decreases, with the assumption
that if any increases will start at the number after the highest
current process, and decreases will remove starting with the highest
number.

While this does not repartition for crashed respirate processeses,
strands for those processes are still handled by the old strand scan.

This moves the uuid partitioning logic from respirate to dispatcher,
since it no longer static. Similarly, creating the prepared statement
for the scan query is moved to a separate method, since it is no
longer only called by initialize.  This may appear to be thread unsafe,
but the way Sequel works, is before every prepared statement execution,
it checks whether the SQL of the already prepared statement on the
connection matches what the current SQL for the prepared statement for
the Database object, and if it doesn't match, it DEALLOCATEs the
existing prepared statement on the connection, and the PREPAREs a new
statement, using the current SQL (in this case, the new partition.)

Change the smoke test when running fewer processes than partitions
(such as the 3/4 test) to not create processes for the lower
partition numbers instead of not creating processes for the higher
partition numbers.  Otherwise, due to the rebalancing, instead of
testing the behavior with crashed respirate processes, you end up
only testing with a lower number of total processes.

To monitor the partitioning, this emits logs in 2 additional cases:

* When initially partitioning or repartitioning, so you can see what
  partition each respirate process is monitoring.

* If an unexpected notification is received while listening. This
  should never happen, but we do not want to crash the repartition
  thread if it does.
2025-06-14 01:46:43 +09:00

112 lines
2.4 KiB
Ruby

# frozen_string_literal: true
ENV["RACK_ENV"] = "test"
num_processes = if (arg = ARGV.shift)
num_partitions = Integer(arg).clamp(1, nil)
partitioned = true if num_partitions > 1
if (arg = ARGV.shift)
Integer(arg).clamp(1, nil)
else
num_partitions
end
else
1
end
require_relative "../loader"
num_strands = 1000
seconds_allowed = 60
keep_strand_ids = Strand.select_map(&:id)
at_exit do
delete_strand_ds = Strand
.exclude(id: keep_strand_ids)
.select(:id)
Semaphore
.where(strand_id: delete_strand_ds)
.delete(force: true)
delete_strand_ds.delete(force: true)
end
if ENV["CONSISTENT"]
class Prog::Test
def rand(x = nil)
case x
when range
20
when Integer
10
else
0.5
end
end
end
end
# Use Vm uuids, because they are random, while Strand uuids are timestamp based
# and will always be in the first partition
strands = Array.new(num_strands) { Strand.create(prog: "Test", label: "smoke_test_3", id: Vm.generate_uuid) }
ds = Strand.where(id: strands.map(&:id))
r, w = IO.pipe
output = +""
Thread.new do
output << r.read(4096).to_s
end
time = Time.now
respirate_pids = Array.new(num_processes) do
respirate_args = [(num_partitions - it).to_s] if partitioned
Process.spawn("bin/respirate", *respirate_args, :in => :close, [:out, :err] => w)
end
respirate_pids.compact!
w.close
finished_ds = ds.where(label: "smoke_test_0")
deadline = Time.now + seconds_allowed
print(partitioned ? "#{num_processes}/#{num_partitions} partitioned: " : "#{num_processes} unpartitioned: ")
until (count = finished_ds.count) == num_strands || Time.now > deadline
print count, " "
sleep 1
end
printf("%0.3f seconds ", Time.now - time)
Process.kill(:TERM, *respirate_pids)
reap_queue = Queue.new
Thread.new do
respirate_pids.each { Process.waitpid(it) }
reap_queue.push(true)
end
reap_queue.pop(timeout: 3)
# puts "output:"
# puts output
finished_count = finished_ds.count
unless finished_count == num_strands
puts
raise "Only #{finished_count}/#{num_strands} strands finished processing within #{seconds_allowed} seconds"
end
unless output.length == num_strands * 3
puts
puts output
raise "unexpected output length: #{output.length}"
end
(1..3).each do |n|
unless output.count(n.to_s) == num_strands
puts
raise "Not all strands output expected information"
end
end
puts "passed!"