Otherwise, this can result in a feedback loop, where processing old strands with long delay further delays the processing of other old strands.
369 lines
14 KiB
Ruby
369 lines
14 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "../model/spec_helper"
|
|
|
|
RSpec.describe Scheduling::Dispatcher do
|
|
subject(:di) { described_class.new(pool_size: 1) }
|
|
|
|
after do
|
|
(@di || di).shutdown_and_cleanup_threads
|
|
Thread.current.name = nil
|
|
end
|
|
|
|
describe "#shutdown" do
|
|
it "sets shutting_down flag" do
|
|
expect { di.shutdown }.to change(di, :shutting_down).from(false).to(true)
|
|
|
|
# Test idempotent behavior
|
|
2.times { di.shutdown_and_cleanup_threads }
|
|
end
|
|
end
|
|
|
|
describe "#num_current_strands" do
|
|
it "returns the number of current strands being handled" do
|
|
expect(di.num_current_strands).to eq 0
|
|
di.instance_variable_get(:@current_strands)["a"] = true
|
|
expect(di.num_current_strands).to eq 1
|
|
end
|
|
end
|
|
|
|
describe "#repartition_thread" do
|
|
it "emits for invalid notify" do
|
|
t = Thread.new do
|
|
payload = nil
|
|
DB.listen(:respirate) { |_, _, pl| payload = pl }
|
|
payload
|
|
end
|
|
@di = described_class.new(partition_number: 1, listen_timeout: 0.01, pool_size: 1)
|
|
|
|
# Wait until dispatcher has started listening and notified
|
|
t.join(5)
|
|
expect(t.value).to eq "1"
|
|
|
|
q = Queue.new
|
|
expect(Clog).to receive(:emit).at_least(:once).and_wrap_original do |m, arg|
|
|
q.push(true) if arg == "invalid respirate repartition notification"
|
|
m.call(arg)
|
|
end
|
|
Thread.new { DB.notify(:respirate, payload: "foo") }.join(5)
|
|
expect(q.pop(timeout: 5)).to be true
|
|
end
|
|
|
|
it "repartitions for new processes and stale processes" do
|
|
q = Queue.new
|
|
t = Thread.new do
|
|
payload = nil
|
|
DB.listen(:respirate) { |_, _, pl| payload = pl }
|
|
payload
|
|
end
|
|
|
|
di = @di = described_class.new(partition_number: 1, listen_timeout: 0.01, pool_size: 1)
|
|
|
|
# Wait until dispatcher has started listening and notified
|
|
t.join(5)
|
|
expect(t.value).to eq "1"
|
|
|
|
args = []
|
|
expect(di).to receive(:setup_prepared_statements).twice.and_wrap_original do |m, **kw|
|
|
args << kw
|
|
q.push true if args.length == 2
|
|
m.call(**kw)
|
|
end
|
|
|
|
q2 = Queue.new
|
|
# separate threads so they are not inside a transaction
|
|
Thread.new do
|
|
DB.notify(:respirate, payload: "2")
|
|
q2.push(true)
|
|
end.join(5)
|
|
Thread.new do
|
|
# Ensure payload 3 notify is after payload 2 notify
|
|
q2.pop(timeout: 5)
|
|
DB.notify(:respirate, payload: "3")
|
|
end.join(5)
|
|
|
|
expect(q.pop(timeout: 5)).to be true
|
|
expect(args).to eq([{num_partitions: 2}, {num_partitions: 3}])
|
|
args.clear
|
|
q3 = Queue.new
|
|
|
|
expect(di).to receive(:setup_prepared_statements).at_least(:once).and_wrap_original do |m, **kw|
|
|
args << kw
|
|
if kw == {num_partitions: 1}
|
|
q.push true
|
|
q3.push true
|
|
end
|
|
m.call(**kw)
|
|
end
|
|
t = Thread.new do
|
|
until q.pop(timeout: 0.1)
|
|
DB.notify(:respirate, payload: "1")
|
|
sleep(0.005)
|
|
end
|
|
end
|
|
expect(q3.pop(timeout: 5)).to be true
|
|
t.join(5)
|
|
expect(args.last).to eq(num_partitions: 1)
|
|
end
|
|
end
|
|
|
|
describe "#scan" do
|
|
it "returns empty array if there are no strands ready for running" do
|
|
expect(di.scan).to eq([])
|
|
end
|
|
|
|
it "returns array of strands ready for running" do
|
|
Strand.create(prog: "Test", label: "wait_exit")
|
|
st = Strand.first
|
|
expect(di.scan).to eq([])
|
|
st.update(schedule: Time.now + 10)
|
|
expect(di.scan).to eq([])
|
|
st.update(schedule: Time.now - 10)
|
|
expect(di.scan.map(&:id)).to eq([st.id])
|
|
end
|
|
|
|
it "does not include strands outside of partition" do
|
|
st = Strand.create(prog: "Test", label: "wait_exit", id: "00000000-0000-0000-0000-000000000000")
|
|
di = @di = described_class.new(partition_number: 2, listen_timeout: 0.01, pool_size: 1)
|
|
di.setup_prepared_statements(num_partitions: 2)
|
|
st.update(schedule: Time.now - 10)
|
|
expect(di.scan.map(&:id)).to eq([])
|
|
end
|
|
|
|
it "includes strands inside of partition" do
|
|
st = Strand.create(prog: "Test", label: "wait_exit", id: "00000000-0000-0000-0000-000000000000")
|
|
di = @di = described_class.new(partition_number: 1, listen_timeout: 0.01, pool_size: 1)
|
|
di.setup_prepared_statements(num_partitions: 2)
|
|
st.update(schedule: Time.now - 10)
|
|
expect(di.scan.map(&:id)).to eq([st.id])
|
|
end
|
|
|
|
it "returns empty array when shutting down" do
|
|
di.shutdown
|
|
expect(di.scan).to eq([])
|
|
end
|
|
end
|
|
|
|
describe "#scan_old" do
|
|
it "returns empty array for non-partitioned dispatcher" do
|
|
expect(di.scan_old).to eq([])
|
|
end
|
|
|
|
it "returns empty array when shutting down" do
|
|
Strand.create(prog: "Test", label: "wait_exit", id: "00000000-0000-0000-0000-000000000000")
|
|
di = @di = described_class.new(partition_number: 1, listen_timeout: 0.01, pool_size: 1)
|
|
di.setup_prepared_statements(num_partitions: 2)
|
|
di.shutdown
|
|
expect(di.scan_old).to eq([])
|
|
end
|
|
|
|
it "includes strands outside of partition, if they are older than old_strand_delay" do
|
|
st = Strand.create(prog: "Test", label: "wait_exit", id: "00000000-0000-0000-0000-000000000000")
|
|
di = @di = described_class.new(partition_number: 2, listen_timeout: 0.01, pool_size: 1)
|
|
di.setup_prepared_statements(num_partitions: 2)
|
|
st.update(schedule: Time.now - 3)
|
|
expect(di.scan_old.map(&:id)).to eq([])
|
|
st.update(schedule: Time.now - 7)
|
|
expect(di.scan_old.map(&:id)).to eq([st.id])
|
|
di.instance_variable_set(:@current_strand_delay, 10)
|
|
expect(di.scan_old.map(&:id)).to eq([])
|
|
st.update(schedule: Time.now - 16)
|
|
expect(di.scan_old.map(&:id)).to eq([])
|
|
st.update(schedule: Time.now - 18)
|
|
expect(di.scan_old.map(&:id)).to eq([st.id])
|
|
end
|
|
end
|
|
|
|
describe "#apoptosis_run" do
|
|
it "does not trigger exit if strand runs on time" do
|
|
expect(ThreadPrinter).not_to receive(:run)
|
|
expect(Kernel).not_to receive(:exit!)
|
|
start_queue = Queue.new
|
|
finish_queue = Queue.new
|
|
start_queue.push(true)
|
|
finish_queue.push(true)
|
|
expect(di.apoptosis_run(0, start_queue, finish_queue)).to be true
|
|
end
|
|
end
|
|
|
|
describe "#apoptosis_thread" do
|
|
it "triggers thread dumps and exit if the Prog takes too long" do
|
|
exited = false
|
|
expect(ThreadPrinter).to receive(:run)
|
|
expect(Kernel).to receive(:exit!).and_invoke(-> { exited = true })
|
|
di = @di = described_class.new(apoptosis_timeout: 0.05, pool_size: 1)
|
|
start_queue = di.instance_variable_get(:@thread_data).dig(0, :start_queue)
|
|
start_queue.push(true)
|
|
t = Time.now
|
|
until exited
|
|
raise "no apoptosis within 1 second" if Time.now - t > 1
|
|
sleep 0.1
|
|
end
|
|
expect(exited).to be true
|
|
end
|
|
|
|
it "triggers thread dumps and exit if the there is an exception raised" do
|
|
exited = false
|
|
expect(ThreadPrinter).to receive(:run)
|
|
expect(Kernel).to receive(:exit!).and_invoke(-> { exited = true })
|
|
di = @di = described_class.new(apoptosis_timeout: 0.05, pool_size: 1)
|
|
thread_data = di.instance_variable_get(:@thread_data)
|
|
start_queue = thread_data.dig(0, :start_queue)
|
|
finish_queue = thread_data.dig(0, :finish_queue)
|
|
finish_queue.singleton_class.undef_method(:pop)
|
|
start_queue.push(true)
|
|
t = Time.now
|
|
until exited
|
|
raise "no apoptosis within 1 second" if Time.now - t > 1
|
|
sleep 0.1
|
|
end
|
|
expect(exited).to be true
|
|
end
|
|
end
|
|
|
|
describe "#start_cohort" do
|
|
it "accepts an array of strands" do
|
|
expect(di.start_cohort([])).to be true
|
|
expect(di.instance_variable_get(:@strand_queue).pop(timeout: 0)).to be_nil
|
|
expect(di.instance_variable_get(:@current_strands)).to be_empty
|
|
end
|
|
|
|
it "returns true if there are no strands" do
|
|
expect(di.start_cohort).to be true
|
|
expect(di.instance_variable_get(:@strand_queue).pop(timeout: 0)).to be_nil
|
|
expect(di.instance_variable_get(:@current_strands)).to be_empty
|
|
end
|
|
|
|
it "returns false if the dispatcher is shutting down after the scan" do
|
|
Strand.create(prog: "Test", label: "wait_exit", schedule: Time.now - 10)
|
|
expect(di).to receive(:scan).and_wrap_original do |original_method|
|
|
res = original_method.call
|
|
di.shutdown
|
|
res
|
|
end
|
|
expect(di.start_cohort).to be false
|
|
expect(di.instance_variable_get(:@strand_queue).pop(timeout: 0)).to be_nil
|
|
expect(di.instance_variable_get(:@current_strands)).to be_empty
|
|
end
|
|
|
|
it "returns true if the dispatcher is shutting down and there are no strands" do
|
|
di.shutdown
|
|
expect(di.start_cohort).to be true
|
|
expect(di.instance_variable_get(:@strand_queue).pop(timeout: 0)).to be_nil
|
|
expect(di.instance_variable_get(:@current_strands)).to be_empty
|
|
end
|
|
|
|
it "returns false and pushes to strand queue if there are strands" do
|
|
st = Strand.create(prog: "Test", label: "wait_exit", schedule: Time.now - 10)
|
|
old_queue = di.instance_variable_get(:@strand_queue)
|
|
new_queue = di.instance_variable_set(:@strand_queue, Queue.new)
|
|
expect(di.start_cohort).to be false
|
|
expect(new_queue.pop(true).id).to eq st.id
|
|
expect(di.instance_variable_get(:@current_strands)).to eq(st.id => true)
|
|
|
|
# Check that we don't retrieve strands currently executing
|
|
di.instance_variable_set(:@strand_queue, old_queue)
|
|
expect(di.start_cohort).to be true
|
|
expect(di.instance_variable_get(:@current_strands)).to eq(st.id => true)
|
|
end
|
|
end
|
|
|
|
describe "#metrics_thread" do
|
|
it "emits metrics every 200 queue entries" do
|
|
q = Queue.new
|
|
n = described_class::METRICS_EVERY
|
|
n.times { q.push 1 }
|
|
q.push nil
|
|
expect(di).to receive(:metrics_hash).with([1] * n, instance_of(Float)).and_return({})
|
|
expect(Clog).to receive(:emit).and_call_original
|
|
di.metrics_thread(q)
|
|
end
|
|
end
|
|
|
|
describe "#metrics_hash" do
|
|
it "takes array of Strand::RespirateMetrics and returns hash of metric information" do
|
|
t = Time.now
|
|
arrays = []
|
|
rm = Strand::RespirateMetrics
|
|
arrays << Array.new(150) { rm.new(t, t + 1, t + 2, t + 3, true, 0, 0) }
|
|
arrays << Array.new(20) { rm.new(t, t + 2, t + 4, t + 7, true, 10, 9) }
|
|
arrays << Array.new(20) { rm.new(t, t + 3, t + 8, t + 12, true, 20, 7) }
|
|
arrays << Array.new(8) { rm.new(t, t + 5, t + 12, t + 21, false, 30, 5) }
|
|
arrays << Array.new(1) { rm.new(t, t + 6, t + 16, t + 29, true, 40, 3, true) }
|
|
arrays << Array.new(1) { rm.new(t, t + 7, t + 20, t + 37, false, 50, 1, true) }
|
|
expect(di.metrics_hash(arrays.flatten, 0.5)).to eq({
|
|
available_workers: {average: 1, max: 9, median: 0, p75: 1, p85: 7, p95: 9, p99: 9},
|
|
lease_acquire_percentage: 95.5,
|
|
lease_delay: {average: 1.96, max: 17.0, median: 1.0, p75: 3.0, p85: 4.0, p95: 9.0, p99: 13.0},
|
|
old_strand_percentage: 1.0,
|
|
queue_delay: {average: 1.845, max: 13.0, median: 1.0, p75: 2.0, p85: 5.0, p95: 7.0, p99: 10.0},
|
|
queue_size: {average: 4, max: 50, median: 0, p75: 10, p85: 20, p95: 30, p99: 40},
|
|
scan_delay: {average: 1.515, max: 7.0, median: 1.0, p75: 2.0, p85: 3.0, p95: 5.0, p99: 6.0},
|
|
total_delay: {average: 5.32, max: 37.0, median: 3.0, p75: 7.0, p85: 12.0, p95: 21.0, p99: 29.0},
|
|
strand_count: 200,
|
|
strands_per_second: 400
|
|
})
|
|
expect(di.instance_variable_get(:@current_strand_delay)).to eq 12.0
|
|
expect(di.old_strand_delay).to eq 19.4
|
|
end
|
|
end
|
|
|
|
describe "#strand_thread" do
|
|
it "runs strands pushed onto queue" do
|
|
Strand.create(prog: "Test", label: "napper", schedule: Time.now - 10)
|
|
st = di.scan.first
|
|
strand_queue = Queue.new
|
|
start_queue = Queue.new
|
|
finish_queue = Queue.new
|
|
current_strands = di.instance_variable_get(:@current_strands)
|
|
current_strands[st.id] = true
|
|
session = instance_double(Net::SSH::Connection::Session)
|
|
expect(session).to receive(:close).and_raise(RuntimeError)
|
|
Thread.current[:clover_ssh_cache] = {nil => session}
|
|
strand_queue.push(st)
|
|
strand_queue.push(nil)
|
|
expect(di.strand_thread(strand_queue, start_queue, finish_queue)).to be_nil
|
|
expect(Time.now - st.respirate_metrics.worker_started).to be_within(1).of(0)
|
|
expect(st.respirate_metrics.queue_size).to eq 1
|
|
expect(st.respirate_metrics.available_workers).to eq 0
|
|
expect(start_queue.pop(true)).to eq st.ubid
|
|
expect(start_queue.pop(true)).to be_nil
|
|
expect(finish_queue.pop(true)).to be true
|
|
expect(current_strands).to be_empty
|
|
expect(Thread.current[:clover_ssh_cache]).to be_empty
|
|
end
|
|
end
|
|
|
|
describe "#run_strand" do
|
|
it "print exceptions if they are raised" do
|
|
ex = begin
|
|
begin
|
|
raise StandardError.new("nested test error")
|
|
rescue
|
|
raise StandardError.new("outer test error")
|
|
end
|
|
rescue => ex
|
|
ex
|
|
end
|
|
|
|
st = Strand.create(prog: "Test", label: "wait_exit", schedule: Time.now - 10)
|
|
expect(st).to receive(:run).and_raise(ex)
|
|
|
|
# Go to the trouble of emitting those exceptions to provoke
|
|
# plausible crashes in serialization.
|
|
expect(Config).to receive(:test?).and_return(false).twice
|
|
expect($stdout).to receive(:write).with(a_string_matching(/outer test error/))
|
|
expect($stdout).to receive(:write).with(a_string_matching(/nested test error/))
|
|
|
|
start_queue = Queue.new
|
|
finish_queue = Queue.new
|
|
current_strands = di.instance_variable_get(:@current_strands)
|
|
current_strands[st.id] = true
|
|
expect(di.run_strand(st, start_queue, finish_queue)).to eq ex
|
|
expect(start_queue.pop(true)).to eq st.ubid
|
|
expect(finish_queue.pop(true)).to be true
|
|
expect(current_strands).to be_empty
|
|
end
|
|
end
|
|
end
|