ubicloud/spec/lib/repartitioner_spec.rb
Jeremy Evans 016918170d Fix repartitioner invalid partition spec nondeterminism
This attempts to fix the following error seen in CI:

```
  1) Repartitioner#listen emits and otherwise ignores invalid partition numbers
     Failure/Error: expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
       expected: "00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff"
            got: "00000000-0000-0000-0000-000000000000"..."80000000-0000-0000-0000-000000000000"
```

I don't see how it is possible for this error to occur unless
something else is notifying on the same channel, and we are using
separate channels per spec.  However, since this doesn't affect
what we are actually testing for in this spec, avoid the
issue by not listening until after checking the initial strand
range. Also, set max_partition to 1 in the spec, so even if there
was an repartitioner notifying for partition number 2, it would
still be considered invalid and fail.
2025-09-18 02:29:12 +09:00

174 lines
5.8 KiB
Ruby

# frozen_string_literal: true
require_relative "../spec_helper"
RSpec.describe Repartitioner do
let(:channel) { :"monitor_#{object_id}" }
def repartitioner(**)
described_class.new(partition_number: 1, channel:, max_partition: 8, listen_timeout: 1, recheck_seconds: 18, stale_seconds: 40, **)
end
describe ".new" do
it "repartitions when initializing" do
expect(Clog).to receive(:emit).with("#{channel} repartitioning").and_call_original
mp = repartitioner
expect(mp.repartitioned).to be true
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
end
it "assumes given partition is last partition" do
expect(Clog).to receive(:emit).with("#{channel} repartitioning").and_call_original
expect(repartitioner(partition_number: 2).strand_id_range).to eq("80000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
end
end
describe "#notify" do
it "uses NOTIFY to notify listeners on given channel" do
q = Queue.new
th = Thread.new do
payload = nil
DB.listen(channel, after_listen: proc { q.push nil }, timeout: 1) do |_, _, pl|
payload = pl
end
payload
end
q.pop(timeout: 1)
Thread.new { repartitioner(channel:).notify }.join(1)
expect(th.value).to eq "1"
end
end
describe "#listen" do
after do
@mp.shutdown!
@th.join(1)
expect(@th.alive?).to be false
end
it "repartitions when it receives a notification about a new partition" do
@mp = mp = repartitioner(listen_timeout: 0.01, recheck_seconds: 2)
q = Queue.new
mp.define_singleton_method(:notify) do
super()
q.push nil
end
mp.define_singleton_method(:repartition) do |n|
super(n)
q.push nil if n == 2
end
@th = Thread.new { mp.listen }
q.pop(timeout: 1)
expect(mp).to receive(:repartition).with(2).and_call_original
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
Thread.new { repartitioner(partition_number: 2).notify }.join(1)
q.pop(timeout: 1)
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000"..."80000000-0000-0000-0000-000000000000")
end
it "repartitions when an existing partition goes stale" do
@mp = mp = repartitioner(listen_timeout: 0.01, recheck_seconds: 0.01)
notify_q = Queue.new
repartition_3q = Queue.new
repartition_2q = Queue.new
notified = false
mp.define_singleton_method(:notify) do
super()
unless notified
notified = true
notify_q.push true
end
end
mp.define_singleton_method(:repartition) do |n|
super(n)
case n
when 3
repartition_3q.push true
when 2
repartition_2q.push true
end
end
@th = Thread.new { mp.listen }
expect(notify_q.pop(timeout: 1)).to be true
expect(mp).to receive(:repartition).with(3).and_call_original
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
t = Thread.new do
repartitioner(partition_number: 3).notify
true
end
t.join(1)
expect(t.value).to be true
expect(repartition_3q.pop(timeout: 1)).to be true
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000"..."55555555-0000-0000-0000-000000000000")
t = Thread.new do
repartitioner(partition_number: 2).notify
true
end
t.join(1)
expect(t.value).to be true
expect(mp).to receive(:repartition).with(2).and_call_original
mp.instance_variable_get(:@partition_times)[3] = Time.now - 60
expect(repartition_2q.pop(timeout: 1)).to be true
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000"..."80000000-0000-0000-0000-000000000000")
end
it "emits and otherwise ignores invalid partition numbers" do
@mp = mp = repartitioner(listen_timeout: 0.01, max_partition: 1)
q = Queue.new
listen_q = Queue.new
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
expect(mp).not_to receive(:repartition)
mp.define_singleton_method(:notify) do
super()
listen_q.push true
end
@th = Thread.new { mp.listen }
expect(listen_q.pop(timeout: 1)).to be true
expect(Clog).to receive(:emit).at_least(:once).and_wrap_original do |m, msg, &blk|
m.call(msg, &blk)
if msg == "invalid #{channel} repartition notification"
q.push true
end
end
th = Thread.new do
repartitioner(partition_number: 1000).notify
true
end
th.join(1)
expect(th.value).to be true
expect(q.pop(timeout: 1)).to be true
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
end
it "stops listen loop if notification is received after shutting down" do
@mp = mp = repartitioner
q = Queue.new
mp.define_singleton_method(:notify) do
super()
q.push nil
end
mp.define_singleton_method(:listen) do
super()
q.push nil
end
@th = Thread.new { mp.listen }
q.pop(timeout: 1)
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
mp.shutdown!
expect(mp).not_to receive(:repartition)
Thread.new { repartitioner(partition_number: 2).notify }.join(1)
q.pop(timeout: 1)
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
end
end
end