mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-05 06:12:09 +08:00
The main fix here is to the synchronization between the threads that notify for partition numbers 2 and 3. Don't send the notify for partition 2 until after the notification for partition 3 has been received and applied by the repartitioner and checked by the spec. While here, use 3 separate queues for synchronization, instead of only 2 queues, to rule out the possibility of queue unexpected queue reuse causing the problem. Add expectations for each queue pop and thread join, so that if there is a timeout expiration, you can better diagnose the cause of the spec failure. This passed 100 spec runs without failure locally, where the previous code generally didn't get through 30 spec runs without failure locally.
171 lines
5.7 KiB
Ruby
171 lines
5.7 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "../spec_helper"
|
|
|
|
RSpec.describe Repartitioner do
|
|
let(:channel) { :"monitor_#{object_id}" }
|
|
|
|
def repartitioner(**)
|
|
described_class.new(partition_number: 1, channel:, max_partition: 8, listen_timeout: 1, recheck_seconds: 18, stale_seconds: 40, **)
|
|
end
|
|
|
|
describe ".new" do
|
|
it "repartitions when initializing" do
|
|
expect(Clog).to receive(:emit).with("#{channel} repartitioning").and_call_original
|
|
mp = repartitioner
|
|
expect(mp.repartitioned).to be true
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
end
|
|
|
|
it "assumes given partition is last partition" do
|
|
expect(Clog).to receive(:emit).with("#{channel} repartitioning").and_call_original
|
|
expect(repartitioner(partition_number: 2).strand_id_range).to eq("80000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
end
|
|
end
|
|
|
|
describe "#notify" do
|
|
it "uses NOTIFY to notify listeners on given channel" do
|
|
q = Queue.new
|
|
th = Thread.new do
|
|
payload = nil
|
|
DB.listen(channel, after_listen: proc { q.push nil }, timeout: 1) do |_, _, pl|
|
|
payload = pl
|
|
end
|
|
payload
|
|
end
|
|
q.pop(timeout: 1)
|
|
Thread.new { repartitioner(channel:).notify }.join(1)
|
|
expect(th.value).to eq "1"
|
|
end
|
|
end
|
|
|
|
describe "#listen" do
|
|
after do
|
|
@mp.shutdown!
|
|
@th.join(1)
|
|
expect(@th.alive?).to be false
|
|
end
|
|
|
|
it "repartitions when it receives a notification about a new partition" do
|
|
@mp = mp = repartitioner(listen_timeout: 0.01, recheck_seconds: 2)
|
|
q = Queue.new
|
|
mp.define_singleton_method(:notify) do
|
|
super()
|
|
q.push nil
|
|
end
|
|
mp.define_singleton_method(:repartition) do |n|
|
|
super(n)
|
|
q.push nil if n == 2
|
|
end
|
|
@th = Thread.new { mp.listen }
|
|
|
|
q.pop(timeout: 1)
|
|
expect(mp).to receive(:repartition).with(2).and_call_original
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
Thread.new { repartitioner(partition_number: 2).notify }.join(1)
|
|
|
|
q.pop(timeout: 1)
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000"..."80000000-0000-0000-0000-000000000000")
|
|
end
|
|
|
|
it "repartitions when an existing partition goes stale" do
|
|
@mp = mp = repartitioner(listen_timeout: 0.01, recheck_seconds: 0.01)
|
|
notify_q = Queue.new
|
|
repartition_3q = Queue.new
|
|
repartition_2q = Queue.new
|
|
notified = false
|
|
mp.define_singleton_method(:notify) do
|
|
super()
|
|
unless notified
|
|
notified = true
|
|
notify_q.push true
|
|
end
|
|
end
|
|
mp.define_singleton_method(:repartition) do |n|
|
|
super(n)
|
|
|
|
case n
|
|
when 3
|
|
repartition_3q.push true
|
|
when 2
|
|
repartition_2q.push true
|
|
end
|
|
end
|
|
@th = Thread.new { mp.listen }
|
|
|
|
expect(notify_q.pop(timeout: 1)).to be true
|
|
expect(mp).to receive(:repartition).with(3).and_call_original
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
t = Thread.new do
|
|
repartitioner(partition_number: 3).notify
|
|
true
|
|
end
|
|
t.join(1)
|
|
expect(t.value).to be true
|
|
|
|
expect(repartition_3q.pop(timeout: 1)).to be true
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000"..."55555555-0000-0000-0000-000000000000")
|
|
t = Thread.new do
|
|
repartitioner(partition_number: 2).notify
|
|
true
|
|
end
|
|
t.join(1)
|
|
expect(t.value).to be true
|
|
|
|
expect(mp).to receive(:repartition).with(2).and_call_original
|
|
mp.instance_variable_get(:@partition_times)[3] = Time.now - 60
|
|
expect(repartition_2q.pop(timeout: 1)).to be true
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000"..."80000000-0000-0000-0000-000000000000")
|
|
end
|
|
|
|
it "emits and otherwise ignores invalid partition numbers" do
|
|
@mp = mp = repartitioner(listen_timeout: 0.01)
|
|
q = Queue.new
|
|
mp.define_singleton_method(:notify) do
|
|
super()
|
|
q.push nil
|
|
end
|
|
@th = Thread.new { mp.listen }
|
|
|
|
q.pop(timeout: 1)
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
|
|
received_invalid = false
|
|
expect(Clog).to receive(:emit).at_least(:once).and_wrap_original do |m, msg, &blk|
|
|
m.call(msg, &blk)
|
|
if msg == "invalid #{channel} repartition notification"
|
|
received_invalid = true
|
|
q.push nil
|
|
end
|
|
end
|
|
Thread.new { repartitioner(partition_number: 1000).notify }.join(1)
|
|
|
|
q.pop(timeout: 1)
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
expect(received_invalid).to be true
|
|
end
|
|
|
|
it "stops listen loop if notification is received after shutting down" do
|
|
@mp = mp = repartitioner
|
|
q = Queue.new
|
|
mp.define_singleton_method(:notify) do
|
|
super()
|
|
q.push nil
|
|
end
|
|
mp.define_singleton_method(:listen) do
|
|
super()
|
|
q.push nil
|
|
end
|
|
@th = Thread.new { mp.listen }
|
|
|
|
q.pop(timeout: 1)
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
mp.shutdown!
|
|
|
|
expect(mp).not_to receive(:repartition)
|
|
Thread.new { repartitioner(partition_number: 2).notify }.join(1)
|
|
q.pop(timeout: 1)
|
|
expect(mp.strand_id_range).to eq("00000000-0000-0000-0000-000000000000".."ffffffff-ffff-ffff-ffff-ffffffffffff")
|
|
end
|
|
end
|
|
end
|