Previously, monitor started the monitoring processing for every resource every 5 seconds. If the resource was still being processed from a previous run, it reported the resource was locked, and still added it to the queue to process. When it came time for the worker thread to process it, if it was still locked it was skipped. This changes the behavior so that monitor will never attempt to start a monitoring job for a resource currently being monitored. Worker threads will pop jobs to process off the main queue (now called the submit queue). When the worker thread is done processing, it will push the the job onto the finish queue. During each loop iteration, the main thread will pop jobs off the finish queue, and add them to the run queue. Then it will run all jobs in the run queue that finished more than 5 seconds ago. So previously, jobs were run every 5 seconds. If a job took 4 seconds to run, it was run 1 second after it last finished. This changes the behavior so jobs are run 5 seconds after they finish. So jobs that take longer to run are run less frequently. When jobs finish, we record the finish time for the job, which is used for scheduling the next execution. Finished jobs are appended to the run queue. When processing jobs, we find all jobs that should be run (jobs are ordered by finish time), and slice off the front of the run queue, and then submit those jobs. We no longer emit "Resource is locked." messages, because resources are never locked. There are no mutexes used anymore. The "owning" thread of a resource is the last thread that popped it off a queue. To handle checking for stuck pulses, the main threads runs a check of all resources every 5 seconds, and logs if there is an active job for the resource and it has been running for more than than expected number of seconds. Technically, this accesses the same object from multiple threads (the main thread and the worker thread) at the same time. However, it only calls an attr_reader method on the object, and doing that from multiple threads in Ruby is safe, even if the object is not frozen. The MonitorResourceType#scan method now returns an array with the new resources scanned. These resources are immediately enqueued, without going through the run queue. This simplifies things, as the run queue only has to deal with resources that have a finish time. The MonitorResourceType#enqueue method now takes a Time agument, and will enqueue all jobs in the run queue that finished before that time. It will also move jobs from the finish queue to the run queue. It returns the finish time for the next job to run in the run queue (or nil if the run queue is empty). The main thread will consider the times returned by MonitorResourceType#enqueue. It will sleep for whatever the least amount of time is until it is time to run the next job. That means the main thread iteration will run much more often than every 5 seconds, but it will due much less work each time. If there are no jobs in the either run queue, then it will sleep for 5 seconds, as it did before. The metric information reported by monitor has been expanded. For both resource types, it now shows: * Total number of resources * Submit queue length (will show whether and the extent of a backlog of jobs) * Number of idle worker threads (worker threads waiting on the submit queue, potentially indicating too many workers) Since we are not waiting 5 seconds per iteration, we only emit the logged information if it has been more than 5 seconds since the last emit. This is a pretty extensive change, and since we don't yet have a test suite for monitor, fairly risky. We should add a test suite for monitor.
57 lines
1.6 KiB
Ruby
57 lines
1.6 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class MetricsTargetResource
|
|
attr_reader :deleted, :resource
|
|
attr_accessor :monitor_job_started_at, :monitor_job_finished_at
|
|
|
|
def initialize(resource)
|
|
@resource = resource
|
|
@session = nil
|
|
@last_export_success = false
|
|
@export_started_at = Time.now
|
|
@deleted = false
|
|
|
|
vmr = VictoriaMetricsResource.first(project_id: resource.metrics_config[:project_id])
|
|
vms = vmr&.servers&.first
|
|
@tsdb_client = vms&.client || (VictoriaMetrics::Client.new(endpoint: "http://localhost:8428") if Config.development?)
|
|
end
|
|
|
|
def open_resource_session
|
|
return if @session && @last_export_success
|
|
|
|
@session = @resource.reload.init_metrics_export_session
|
|
rescue => ex
|
|
if ex.is_a?(Sequel::NoExistingObject)
|
|
Clog.emit("Resource is deleted.") { {resource_deleted: {ubid: @resource.ubid}} }
|
|
@session = nil
|
|
@deleted = true
|
|
end
|
|
end
|
|
|
|
def export_metrics
|
|
return if @deleted
|
|
|
|
@export_started_at = Time.now
|
|
begin
|
|
count = @resource.export_metrics(session: @session, tsdb_client: @tsdb_client)
|
|
Clog.emit("Metrics export has finished.") { {metrics_export_success: {ubid: @resource.ubid, count: count}} }
|
|
@last_export_success = true
|
|
rescue => ex
|
|
@last_export_success = false
|
|
close_resource_session
|
|
Clog.emit("Metrics export has failed.") { {metrics_export_failure: {ubid: @resource.ubid, exception: Util.exception_to_hash(ex)}} }
|
|
end
|
|
end
|
|
|
|
def close_resource_session
|
|
return if @session.nil?
|
|
|
|
@session[:ssh_session].shutdown!
|
|
begin
|
|
@session[:ssh_session].close
|
|
rescue
|
|
end
|
|
@session = nil
|
|
end
|
|
end
|