Files
ubicloud/spec/lib/monitor_runner_spec.rb
2025-08-07 02:13:08 +09:00

253 lines
8.4 KiB
Ruby

# frozen_string_literal: true
require_relative "../spec_helper"
RSpec.describe MonitorRunner do
stuck_info = [10, "stuck", :stuck].freeze
let(:monitor_runner) do
described_class.new(monitor_resources:, metric_export_resources:, repartitioner:, **monitor_runner_args)
end
let(:monitor_resources) { MonitorResourceType.create(MonitorableResource, stuck_info, 2, [VmHost]) {} }
let(:metric_export_resources) { MonitorResourceType.create(MetricsTargetResource, stuck_info, 2, [VmHost]) {} }
let(:repartitioner) { MonitorRepartitioner.new(2) }
let(:monitor_runner_args) do
{
scan_every: 0.01,
report_every: 0.01,
enqueue_every: 0.01,
check_stuck_pulses_every: 0.01
}
end
def vm_host_with_id(id)
Sshable.create_with_id(id)
VmHost.create_with_id(id, location_id: Location::HETZNER_FSN1_ID, allocation_state: "accepting", arch: "x64", family: "standard", total_cores: 48, used_cores: 2)
end
after do
monitor_runner.wait_cleanup!(1)
end
describe "#scan" do
it "scans both resource types and pushes only new resources in partition to respective queues" do
vm1 = vm_host_with_id("90000000-0000-0000-0000-000000000000")
vm2 = vm_host_with_id("a0000000-0000-0000-0000-000000000000")
# Check that it doesn't pick up resources outside partition
vm_host_with_id("10000000-0000-0000-0000-000000000000")
msq = monitor_resources.submit_queue
mesq = metric_export_resources.submit_queue
q1 = monitor_resources.submit_queue = Queue.new
q2 = metric_export_resources.submit_queue = Queue.new
# Check that id doesn't pick up existing resources
monitor_resources.resources[vm2.id] = true
metric_export_resources.resources[vm2.id] = true
monitor_runner.scan
v = q1.pop(timeout: 1)
expect(v).to be_a MonitorableResource
expect(v.resource).to eq vm1
v = q2.pop(timeout: 1)
expect(v).to be_a MetricsTargetResource
expect(v.resource).to eq vm1
expect(q1.pop(timeout: 0)).to be_nil
expect(q2.pop(timeout: 0)).to be_nil
ensure
monitor_resources.submit_queue = msq
metric_export_resources.submit_queue = mesq
end
end
describe "#emit_metrics" do
it "emits metrics" do
q = Queue.new
expect(Clog).to receive(:emit).at_least(:once).and_wrap_original do |m, a, &b|
if a == "monitor metrics"
m.call(a, &b)
q.push(b.call)
end
end
monitor_runner.emit_metrics
hash = q.pop(timeout: 1)
expect(hash.keys).to eq [:monitor_metrics]
hash = hash[:monitor_metrics]
expect(hash.delete(:active_threads_count)).to be_a Integer
expect(hash.delete(:monitor_idle_worker_threads)).to be <= 1
expect(hash.delete(:metric_export_idle_worker_threads)).to be <= 1
expect(hash).to eq({
threads_waiting_for_db_connection: 0,
total_monitor_resources: 0,
total_metric_export_resources: 0,
monitor_submit_queue_length: 0,
metric_export_submit_queue_length: 0
})
monitor_resources.resources["a0000000-0000-0000-0000-000000000000"] = true
metric_export_resources.resources["90000000-0000-0000-0000-000000000000"] = true
metric_export_resources.resources["a0000000-0000-0000-0000-000000000000"] = true
mr = MonitorableResource.new(nil)
q2 = Queue.new
expect(mr).to receive(:open_resource_session).twice do
q2.pop(timeout: 1)
end
2.times { monitor_resources.submit_queue.push(mr) }
monitor_runner.emit_metrics
hash = q.pop(timeout: 1)
expect(hash.keys).to eq [:monitor_metrics]
hash = hash[:monitor_metrics]
expect(hash.delete(:active_threads_count)).to be_a Integer
expect(hash).to eq({
threads_waiting_for_db_connection: 0,
total_monitor_resources: 1,
total_metric_export_resources: 2,
monitor_submit_queue_length: 1,
metric_export_submit_queue_length: 0,
monitor_idle_worker_threads: 0,
metric_export_idle_worker_threads: 1
})
2.times { q2.push nil }
end
end
describe "#check_stuck_pulses" do
it "emits for stuck pulses" do
i = 0
expect(Clog).to receive(:emit).at_least(:once) do |a|
i += 1 if a == "stuck"
end
monitor_runner.check_stuck_pulses
expect(i).to eq 0
mr = MonitorableResource.new(VmHost.new_with_id)
mr.monitor_job_started_at = Time.now - 5
monitor_resources.resources[1] = mr
metric_export_resources.resources[1] = mr
monitor_runner.check_stuck_pulses
expect(i).to eq 0
mr.monitor_job_started_at = Time.now - 15
monitor_runner.check_stuck_pulses
expect(i).to eq 2
end
end
describe "#enqueue" do
before do
monitor_runner_args[:enqueue_every] = 10
end
it "enqueues jobs for both resource types" do
mr = MonitorableResource.new(VmHost.new_with_id)
mr.monitor_job_finished_at = Time.now - 15
monitor_resources.run_queue << mr
mr = MetricsTargetResource.new(VmHost.new_with_id)
mr.monitor_job_finished_at = Time.now - 15
metric_export_resources.run_queue << mr
expect(monitor_resources.run_queue.size).to eq 1
expect(metric_export_resources.run_queue.size).to eq 1
monitor_runner.enqueue
expect(monitor_resources.run_queue.size).to eq 0
expect(metric_export_resources.run_queue.size).to eq 0
end
it "returns enqueue_every if there are no jobs in either run queue" do
expect(monitor_runner.enqueue).to eq 10
end
it "returns time to sleep based on next entry in run queue" do
mr = MonitorableResource.new(VmHost.new_with_id)
mr.monitor_job_finished_at = Time.now - 4
monitor_resources.run_queue << mr
expect(monitor_resources.run_queue.size).to eq 1
expect(monitor_runner.enqueue).to be_within(1).of(6)
mr = MetricsTargetResource.new(VmHost.new_with_id)
mr.monitor_job_finished_at = Time.now - 8
metric_export_resources.run_queue << mr
expect(monitor_runner.enqueue).to be_within(1).of(2)
expect(metric_export_resources.run_queue.size).to eq 1
mr.monitor_job_finished_at = Time.now - 12
expect(monitor_runner.enqueue).to be_within(1).of(6)
expect(metric_export_resources.run_queue.size).to eq 0
end
end
describe "#run" do
before do
i = 0
monitor_runner_args[:enqueue_every] = 0
monitor_runner.define_singleton_method(:enqueue) do
shutdown! if i == 100
i += 1
super()
end
end
it "runs scan/report/check_stuck_pulse/enqueue loop with no resources" do
expect(monitor_runner.run).to be_nil
end
it "runs scan/report/check_stuck_pulse/enqueue loop with resources" do
vm1 = vm_host_with_id("90000000-0000-0000-0000-000000000000")
msq = monitor_resources.submit_queue
mesq = metric_export_resources.submit_queue
q1 = monitor_resources.submit_queue = Queue.new
q2 = metric_export_resources.submit_queue = Queue.new
monitor_runner.instance_variable_set(:@report_every, 0)
monitor_runner.instance_variable_set(:@check_stuck_pulses_every, 0)
monitor_runner.run
expect(q1.pop(timeout: 0).resource).to eq vm1
expect(q2.pop(timeout: 0).resource).to eq vm1
ensure
monitor_resources.submit_queue = msq
metric_export_resources.submit_queue = mesq
end
it "handles ClosedQueueError before shutdown" do
vm_host_with_id("90000000-0000-0000-0000-000000000000")
monitor_resources.submit_queue.close
expect(monitor_runner.run).to be_nil
end
it "handles shutting down after scanning" do
monitor_runner.define_singleton_method(:scan) do
super()
shutdown!
end
expect(monitor_runner.run).to be_nil
end
it "handles shutting down before scanning when there are resources" do
vm_host_with_id("90000000-0000-0000-0000-000000000000")
monitor_runner.define_singleton_method(:scan) do
shutdown!
super()
end
expect(monitor_runner.run).to be_nil
end
it "thread prints and exits for other failures" do
exited = false
expect(ThreadPrinter).to receive(:run)
expect(Kernel).to receive(:exit!).and_invoke(->(_) { exited = true })
expect(Clog).to receive(:emit).with("Pulse checking or resource scanning has failed.").and_call_original
monitor_runner.define_singleton_method(:scan) { raise }
monitor_runner.run
expect(exited).to be true
end
end
end