Files
ubicloud/spec/lib/metrics_target_resource_spec.rb
Jeremy Evans c520839442 Refactor monitor
Previously, monitor started the monitoring processing for every
resource every 5 seconds. If the resource was still being processed
from a previous run, it reported the resource was locked, and still
added it to the queue to process. When it came time for the worker
thread to process it, if it was still locked it was skipped.

This changes the behavior so that monitor will never attempt to
start a monitoring job for a resource currently being monitored.
Worker threads will pop jobs to process off the main queue (now called
the submit queue). When the worker thread is done processing, it will
push the the job onto the finish queue. During each loop iteration,
the main thread will pop jobs off the finish queue, and add them
to the run queue.  Then it will run all jobs in the run queue that
finished more than 5 seconds ago.

So previously, jobs were run every 5 seconds. If a job took 4 seconds
to run, it was run 1 second after it last finished. This changes the
behavior so jobs are run 5 seconds after they finish. So jobs that
take longer to run are run less frequently.

When jobs finish, we record the finish time for the job, which is
used for scheduling the next execution.  Finished jobs are appended
to the run queue.  When processing jobs, we find all jobs that should
be run (jobs are ordered by finish time), and slice off the front of
the run queue, and then submit those jobs.

We no longer emit "Resource is locked." messages, because resources
are never locked. There are no mutexes used anymore. The "owning"
thread of a resource is the last thread that popped it off a queue.

To handle checking for stuck pulses, the main threads runs a check
of all resources every 5 seconds, and logs if there is an active job
for the resource and it has been running for more than than expected
number of seconds. Technically, this accesses the same object from
multiple threads (the main thread and the worker thread) at the same
time. However, it only calls an attr_reader method on the object,
and doing that from multiple threads in Ruby is safe, even if the
object is not frozen.

The MonitorResourceType#scan method now returns an array with the
new resources scanned.  These resources are immediately enqueued,
without going through the run queue. This simplifies things, as the
run queue only has to deal with resources that have a finish time.

The MonitorResourceType#enqueue method now takes a Time agument,
and will enqueue all jobs in the run queue that finished before that
time. It will also move jobs from the finish queue to the run queue.
It returns the finish time for the next job to run in the run queue
(or nil if the run queue is empty).

The main thread will consider the times returned by
MonitorResourceType#enqueue.  It will sleep for whatever the least
amount of time is until it is time to run the next job.  That means the
main thread iteration will run much more often than every 5 seconds,
but it will due much less work each time.  If there are no jobs in the
either run queue, then it will sleep for 5 seconds, as it did before.

The metric information reported by monitor has been expanded. For
both resource types, it now shows:

* Total number of resources
* Submit queue length (will show whether and the extent of a backlog
  of jobs)
* Number of idle worker threads (worker threads waiting on the submit
  queue, potentially indicating too many workers)

Since we are not waiting 5 seconds per iteration, we only emit the
logged information if it has been more than 5 seconds since the last
emit.

This is a pretty extensive change, and since we don't yet have a
test suite for monitor, fairly risky. We should add a test suite
for monitor.
2025-07-30 07:00:03 +09:00

117 lines
5.7 KiB
Ruby

# frozen_string_literal: true
require "spec_helper"
RSpec.describe MetricsTargetResource do
let(:postgres_server) { PostgresServer.new { it.id = "c068cac7-ed45-82db-bf38-a003582b36ee" } }
let(:resource) { described_class.new(postgres_server) }
describe "#initialize" do
it "initializes with a resource and nil tsdb client when VictoriaMetrics is not found" do
expect(Config).to receive(:postgres_service_project_id).and_return("4d8f9896-26a3-4784-8f52-2ed5d5e55c0e")
expect(resource.instance_variable_get(:@resource)).to eq(postgres_server)
end
it "initializes with a resource and a tsdb client when VictoriaMetrics is found" do
expect(Config).to receive(:postgres_service_project_id).at_least(:once).and_return("4d8f9896-26a3-4784-8f52-2ed5d5e55c0e")
prj = Project.create_with_id(name: "pg-project") { it.id = Config.postgres_service_project_id }
vmr = instance_double(VictoriaMetricsResource, project_id: prj.id)
expect(VictoriaMetricsResource).to receive(:first).with(project_id: prj.id).and_return(vmr)
expect(vmr).to receive(:servers).and_return([instance_double(VictoriaMetricsServer, client: "tsdb_client")])
expect(resource.instance_variable_get(:@resource)).to eq(postgres_server)
expect(resource.instance_variable_get(:@tsdb_client)).to eq("tsdb_client")
end
it "initializes with a resource and a tsdb client when VictoriaMetrics is not found in development" do
expect(Config).to receive(:development?).and_return(true)
expect(Config).to receive(:postgres_service_project_id).at_least(:once).and_return("4d8f9896-26a3-4784-8f52-2ed5d5e55c0e")
prj = Project.create_with_id(name: "pg-project") { it.id = Config.postgres_service_project_id }
expect(VictoriaMetricsResource).to receive(:first).with(project_id: prj.id).and_return(nil)
expect(VictoriaMetrics::Client).to receive(:new).with(endpoint: "http://localhost:8428").and_return("tsdb_client")
expect(resource.instance_variable_get(:@resource)).to eq(postgres_server)
expect(resource.instance_variable_get(:@tsdb_client)).to eq("tsdb_client")
expect(resource.instance_variable_get(:@deleted)).to be(false)
end
end
describe "#open_resource_session" do
it "opens a resource session if not already open" do
expect(postgres_server).to receive(:reload).and_return(postgres_server)
expect(postgres_server).to receive(:init_metrics_export_session).and_return("session")
resource.open_resource_session
expect(resource.instance_variable_get(:@session)).to eq("session")
end
it "doesn't reopen a session if already open and last export was successful" do
resource.instance_variable_set(:@session, "session")
resource.instance_variable_set(:@last_export_success, true)
# Ensure postgres_server doesn't receive init_metrics_export_session again
expect(postgres_server).not_to receive(:reload)
expect(postgres_server).not_to receive(:init_metrics_export_session)
resource.open_resource_session
expect(resource.instance_variable_get(:@session)).to eq("session")
end
it "marks resource as deleted when Sequel::NoExistingObject is raised" do
expect(postgres_server).to receive(:reload).and_raise(Sequel::NoExistingObject)
expect(Clog).to receive(:emit).with("Resource is deleted.").and_yield
resource.open_resource_session
expect(resource.deleted).to be true
expect(resource.instance_variable_get(:@session)).to be_nil
end
it "ignores other exceptions" do
expect(postgres_server).to receive(:reload).and_raise(StandardError)
expect { resource.open_resource_session }.not_to raise_error
end
end
describe "#export_metrics" do
before do
prj = Project.create_with_id(name: "vm-project") { it.id = "4d8f9896-26a3-4784-8f52-2ed5d5e55c0d" }
expect(Config).to receive(:postgres_service_project_id).and_return(prj.id)
vmr = instance_double(VictoriaMetricsResource, project_id: prj.id)
expect(VictoriaMetricsResource).to receive(:first).with(project_id: prj.id).and_return(vmr)
expect(vmr).to receive(:servers).and_return([instance_double(VictoriaMetricsServer, client: "tsdb_client")])
end
it "calls export_metrics on the resource and updates last_export_success" do
resource.instance_variable_set(:@session, "session")
expect(postgres_server).to receive(:export_metrics).with(session: "session", tsdb_client: "tsdb_client")
expect { resource.export_metrics }.to change { resource.instance_variable_get(:@last_export_success) }.from(false).to(true)
end
it "swallows exceptions and logs them" do
expect(postgres_server).to receive(:export_metrics).and_raise(StandardError.new("Export failed"))
expect(Clog).to receive(:emit).and_call_original
expect { resource.export_metrics }.not_to raise_error
end
it "skips export if resource is deleted" do
resource.instance_variable_set(:@deleted, true)
expect(postgres_server).not_to receive(:export_metrics)
expect { resource.export_metrics }.not_to raise_error
end
end
describe "#close_resource_session" do
it "returns if session is nil" do
resource.instance_variable_set(:@session, nil)
expect { resource.close_resource_session }.not_to raise_error
end
it "closes the session and sets it to nil" do
session = instance_double(Net::SSH::Connection::Session)
resource.instance_variable_set(:@session, {ssh_session: session})
expect(session).to receive(:shutdown!)
expect(session).to receive(:close)
expect { resource.close_resource_session }.to change { resource.instance_variable_get(:@session) }.from({ssh_session: session}).to(nil)
end
end
end