This commit updates bin/monitor to scan for resources which have metrics export capability (currently only PG servers) and periodically export metrics out and store them in VictoriaMetrics. Each resource which exports metrics should include `MetricsTargetMethods`, which contain the config and a commmon algorithm to fetch scraped data from a given list of endpoints. Roghly the logic looks like this: Producer side (PG server): A systemd-backed timer executes `postgres/bin/metrics-collector`, which scrapes the list of endpoints and stores them on disk, every 15s (default interval). This location is the `pending` buffer, which can grow upto a 120 entries by default, providing recovery for 30 minutes of outages in other parts of the system. Consumer side (bin/monitor): Monitor periodically connects to each metrics target resource to fetch upto 4 scrapes at a time and write them to VictoriaMetrics, and move the successfully exported scrapes to the `done` buffer, which is cleand up periodically by the `metrics-collector`. Currently, we only create a single global VictoriaMetrics instance for simplicity, although it can be configured to be location-aware.
100 lines
3.5 KiB
Ruby
Executable File
100 lines
3.5 KiB
Ruby
Executable File
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
require_relative "../loader"
|
|
clover_freeze
|
|
|
|
health_monitor_resources = {}
|
|
health_monitor_mutex = Mutex.new
|
|
health_monitor_thread_pool_size = (Config.max_health_monitor_threads - 2).clamp(1, nil)
|
|
monitorable_resource_types = [VmHost, PostgresServer, Vm.where(~Sshable.where(id: Sequel[:vm][:id]).exists), MinioServer, GithubRunner, VmHostSlice, LoadBalancerVmPort, KubernetesCluster, VictoriaMetricsServer]
|
|
health_monitor_queue_size = health_monitor_thread_pool_size + (monitorable_resource_types.sum(&:count) * 1.5).round
|
|
health_monitor_queue = SizedQueue.new(health_monitor_queue_size)
|
|
|
|
metrics_target_resources = {}
|
|
metrics_export_mutex = Mutex.new
|
|
metrics_export_thread_pool_size = (Config.max_metrics_export_threads - 2).clamp(1, nil)
|
|
metrics_target_resource_types = [PostgresServer]
|
|
metrics_export_queue_size = metrics_export_thread_pool_size + (metrics_target_resource_types.sum(&:count) * 1.5).round
|
|
metrics_export_queue = SizedQueue.new(metrics_export_queue_size)
|
|
|
|
resource_scanner = Thread.new do
|
|
loop do
|
|
monitorable_resources = monitorable_resource_types.flat_map(&:all)
|
|
health_monitor_mutex.synchronize do
|
|
monitorable_resources.each do |r|
|
|
health_monitor_resources[r.id] ||= MonitorableResource.new(r)
|
|
end
|
|
end
|
|
metrics_resources = metrics_target_resource_types.flat_map(&:all)
|
|
metrics_export_mutex.synchronize do
|
|
metrics_resources.each do |r|
|
|
metrics_target_resources[r.id] ||= MetricsTargetResource.new(r)
|
|
end
|
|
end
|
|
|
|
sleep 60
|
|
end
|
|
rescue => ex
|
|
Clog.emit("Resource scanning has failed.") { {resource_scanning_failure: {exception: Util.exception_to_hash(ex)}} }
|
|
ThreadPrinter.run
|
|
Kernel.exit!
|
|
end
|
|
|
|
health_monitor_thread_pool = Array.new(health_monitor_thread_pool_size) do
|
|
Thread.new do
|
|
while (r = health_monitor_queue.pop)
|
|
r.lock_no_wait do
|
|
r.open_resource_session
|
|
r.process_event_loop
|
|
r.check_pulse
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
metrics_export_thread_pool = Array.new(metrics_export_thread_pool_size) do
|
|
Thread.new do
|
|
while (r = metrics_export_queue.pop)
|
|
r.lock_no_wait do
|
|
r.open_resource_session
|
|
r.export_metrics
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
begin
|
|
loop do
|
|
# Since the switch to use a thread pool for monitored resources,
|
|
# this emits the number of pulse threads + export threads, not the number of monitor threads + pulse threads
|
|
Clog.emit("Active threads count.") { {active_threads_count: Thread.list.count - health_monitor_thread_pool_size - metrics_export_thread_pool_size - 2} }
|
|
|
|
health_monitor_rs = health_monitor_mutex.synchronize { health_monitor_resources.values }
|
|
health_monitor_rs.each do |r|
|
|
r.force_stop_if_stuck
|
|
health_monitor_queue.push(r)
|
|
end
|
|
health_monitor_mutex.synchronize { health_monitor_resources.delete_if { |_, r| r.deleted } }
|
|
|
|
metrics_export_rs = metrics_export_mutex.synchronize { metrics_target_resources.values }
|
|
metrics_export_rs.each do |r|
|
|
r.force_stop_if_stuck
|
|
metrics_export_queue.push(r)
|
|
end
|
|
metrics_export_mutex.synchronize { metrics_target_resources.delete_if { |_, r| r.deleted } }
|
|
|
|
sleep 5
|
|
end
|
|
rescue => ex
|
|
Clog.emit("Pulse checking has failed.") { {pulse_checking_failure: {exception: Util.exception_to_hash(ex)}} }
|
|
ThreadPrinter.run
|
|
Kernel.exit!
|
|
end
|
|
|
|
resource_scanner.join
|
|
health_monitor_thread_pool.each { health_monitor_queue.push(nil) }
|
|
health_monitor_thread_pool.each(&:join)
|
|
metrics_export_thread_pool.each { metrics_export_queue.push(nil) }
|
|
metrics_export_thread_pool.each(&:join)
|