Files
ubicloud/bin/monitor
shikharbhardwaj f6c2aef351 Update bin/monitor to export metrics to VictoriaMetrics
This commit updates bin/monitor to scan for resources which have metrics
export capability (currently only PG servers) and periodically export
metrics out and store them in VictoriaMetrics.

Each resource which exports metrics should include `MetricsTargetMethods`,
which contain the config and a commmon algorithm to fetch scraped data
from a given list of endpoints. Roghly the logic looks like this:

Producer side (PG server):
A systemd-backed timer executes `postgres/bin/metrics-collector`, which
scrapes the list of endpoints and stores them on disk, every 15s
(default interval). This location is the `pending` buffer, which can
grow upto a 120 entries by default, providing recovery for 30
minutes of outages in other parts of the system.

Consumer side (bin/monitor):
Monitor periodically connects to each metrics target resource to fetch
upto 4 scrapes at a time and write them to VictoriaMetrics, and move the
successfully exported scrapes to the `done` buffer, which is cleand up
periodically by the `metrics-collector`.  Currently, we only create a
single global VictoriaMetrics instance for simplicity, although it can
be configured to be location-aware.
2025-05-09 18:26:44 +05:30

100 lines
3.5 KiB
Ruby
Executable File

#!/usr/bin/env ruby
# frozen_string_literal: true
require_relative "../loader"
clover_freeze
health_monitor_resources = {}
health_monitor_mutex = Mutex.new
health_monitor_thread_pool_size = (Config.max_health_monitor_threads - 2).clamp(1, nil)
monitorable_resource_types = [VmHost, PostgresServer, Vm.where(~Sshable.where(id: Sequel[:vm][:id]).exists), MinioServer, GithubRunner, VmHostSlice, LoadBalancerVmPort, KubernetesCluster, VictoriaMetricsServer]
health_monitor_queue_size = health_monitor_thread_pool_size + (monitorable_resource_types.sum(&:count) * 1.5).round
health_monitor_queue = SizedQueue.new(health_monitor_queue_size)
metrics_target_resources = {}
metrics_export_mutex = Mutex.new
metrics_export_thread_pool_size = (Config.max_metrics_export_threads - 2).clamp(1, nil)
metrics_target_resource_types = [PostgresServer]
metrics_export_queue_size = metrics_export_thread_pool_size + (metrics_target_resource_types.sum(&:count) * 1.5).round
metrics_export_queue = SizedQueue.new(metrics_export_queue_size)
resource_scanner = Thread.new do
loop do
monitorable_resources = monitorable_resource_types.flat_map(&:all)
health_monitor_mutex.synchronize do
monitorable_resources.each do |r|
health_monitor_resources[r.id] ||= MonitorableResource.new(r)
end
end
metrics_resources = metrics_target_resource_types.flat_map(&:all)
metrics_export_mutex.synchronize do
metrics_resources.each do |r|
metrics_target_resources[r.id] ||= MetricsTargetResource.new(r)
end
end
sleep 60
end
rescue => ex
Clog.emit("Resource scanning has failed.") { {resource_scanning_failure: {exception: Util.exception_to_hash(ex)}} }
ThreadPrinter.run
Kernel.exit!
end
health_monitor_thread_pool = Array.new(health_monitor_thread_pool_size) do
Thread.new do
while (r = health_monitor_queue.pop)
r.lock_no_wait do
r.open_resource_session
r.process_event_loop
r.check_pulse
end
end
end
end
metrics_export_thread_pool = Array.new(metrics_export_thread_pool_size) do
Thread.new do
while (r = metrics_export_queue.pop)
r.lock_no_wait do
r.open_resource_session
r.export_metrics
end
end
end
end
begin
loop do
# Since the switch to use a thread pool for monitored resources,
# this emits the number of pulse threads + export threads, not the number of monitor threads + pulse threads
Clog.emit("Active threads count.") { {active_threads_count: Thread.list.count - health_monitor_thread_pool_size - metrics_export_thread_pool_size - 2} }
health_monitor_rs = health_monitor_mutex.synchronize { health_monitor_resources.values }
health_monitor_rs.each do |r|
r.force_stop_if_stuck
health_monitor_queue.push(r)
end
health_monitor_mutex.synchronize { health_monitor_resources.delete_if { |_, r| r.deleted } }
metrics_export_rs = metrics_export_mutex.synchronize { metrics_target_resources.values }
metrics_export_rs.each do |r|
r.force_stop_if_stuck
metrics_export_queue.push(r)
end
metrics_export_mutex.synchronize { metrics_target_resources.delete_if { |_, r| r.deleted } }
sleep 5
end
rescue => ex
Clog.emit("Pulse checking has failed.") { {pulse_checking_failure: {exception: Util.exception_to_hash(ex)}} }
ThreadPrinter.run
Kernel.exit!
end
resource_scanner.join
health_monitor_thread_pool.each { health_monitor_queue.push(nil) }
health_monitor_thread_pool.each(&:join)
metrics_export_thread_pool.each { metrics_export_queue.push(nil) }
metrics_export_thread_pool.each(&:join)