Files
ubicloud/lib/metrics_target_resource.rb
Jeremy Evans c520839442 Refactor monitor
Previously, monitor started the monitoring processing for every
resource every 5 seconds. If the resource was still being processed
from a previous run, it reported the resource was locked, and still
added it to the queue to process. When it came time for the worker
thread to process it, if it was still locked it was skipped.

This changes the behavior so that monitor will never attempt to
start a monitoring job for a resource currently being monitored.
Worker threads will pop jobs to process off the main queue (now called
the submit queue). When the worker thread is done processing, it will
push the the job onto the finish queue. During each loop iteration,
the main thread will pop jobs off the finish queue, and add them
to the run queue.  Then it will run all jobs in the run queue that
finished more than 5 seconds ago.

So previously, jobs were run every 5 seconds. If a job took 4 seconds
to run, it was run 1 second after it last finished. This changes the
behavior so jobs are run 5 seconds after they finish. So jobs that
take longer to run are run less frequently.

When jobs finish, we record the finish time for the job, which is
used for scheduling the next execution.  Finished jobs are appended
to the run queue.  When processing jobs, we find all jobs that should
be run (jobs are ordered by finish time), and slice off the front of
the run queue, and then submit those jobs.

We no longer emit "Resource is locked." messages, because resources
are never locked. There are no mutexes used anymore. The "owning"
thread of a resource is the last thread that popped it off a queue.

To handle checking for stuck pulses, the main threads runs a check
of all resources every 5 seconds, and logs if there is an active job
for the resource and it has been running for more than than expected
number of seconds. Technically, this accesses the same object from
multiple threads (the main thread and the worker thread) at the same
time. However, it only calls an attr_reader method on the object,
and doing that from multiple threads in Ruby is safe, even if the
object is not frozen.

The MonitorResourceType#scan method now returns an array with the
new resources scanned.  These resources are immediately enqueued,
without going through the run queue. This simplifies things, as the
run queue only has to deal with resources that have a finish time.

The MonitorResourceType#enqueue method now takes a Time agument,
and will enqueue all jobs in the run queue that finished before that
time. It will also move jobs from the finish queue to the run queue.
It returns the finish time for the next job to run in the run queue
(or nil if the run queue is empty).

The main thread will consider the times returned by
MonitorResourceType#enqueue.  It will sleep for whatever the least
amount of time is until it is time to run the next job.  That means the
main thread iteration will run much more often than every 5 seconds,
but it will due much less work each time.  If there are no jobs in the
either run queue, then it will sleep for 5 seconds, as it did before.

The metric information reported by monitor has been expanded. For
both resource types, it now shows:

* Total number of resources
* Submit queue length (will show whether and the extent of a backlog
  of jobs)
* Number of idle worker threads (worker threads waiting on the submit
  queue, potentially indicating too many workers)

Since we are not waiting 5 seconds per iteration, we only emit the
logged information if it has been more than 5 seconds since the last
emit.

This is a pretty extensive change, and since we don't yet have a
test suite for monitor, fairly risky. We should add a test suite
for monitor.
2025-07-30 07:00:03 +09:00

57 lines
1.6 KiB
Ruby

# frozen_string_literal: true
class MetricsTargetResource
attr_reader :deleted, :resource
attr_accessor :monitor_job_started_at, :monitor_job_finished_at
def initialize(resource)
@resource = resource
@session = nil
@last_export_success = false
@export_started_at = Time.now
@deleted = false
vmr = VictoriaMetricsResource.first(project_id: resource.metrics_config[:project_id])
vms = vmr&.servers&.first
@tsdb_client = vms&.client || (VictoriaMetrics::Client.new(endpoint: "http://localhost:8428") if Config.development?)
end
def open_resource_session
return if @session && @last_export_success
@session = @resource.reload.init_metrics_export_session
rescue => ex
if ex.is_a?(Sequel::NoExistingObject)
Clog.emit("Resource is deleted.") { {resource_deleted: {ubid: @resource.ubid}} }
@session = nil
@deleted = true
end
end
def export_metrics
return if @deleted
@export_started_at = Time.now
begin
count = @resource.export_metrics(session: @session, tsdb_client: @tsdb_client)
Clog.emit("Metrics export has finished.") { {metrics_export_success: {ubid: @resource.ubid, count: count}} }
@last_export_success = true
rescue => ex
@last_export_success = false
close_resource_session
Clog.emit("Metrics export has failed.") { {metrics_export_failure: {ubid: @resource.ubid, exception: Util.exception_to_hash(ex)}} }
end
end
def close_resource_session
return if @session.nil?
@session[:ssh_session].shutdown!
begin
@session[:ssh_session].close
rescue
end
@session = nil
end
end