Files
ubicloud/bin/monitor
Benjamin Satzger f23d98b603 Add monitor_process? utility method
Add `monitor_process?` as  class method to `Util`. It allows to check
whether the current process is the monitor process or not.
2025-07-22 14:44:50 +02:00

102 lines
3.6 KiB
Ruby
Executable File

#!/usr/bin/env ruby
# frozen_string_literal: true
ENV["MONITOR_PROCESS"] = "1"
require_relative "../loader"
clover_freeze
health_monitor_resources = {}
health_monitor_mutex = Mutex.new
health_monitor_thread_pool_size = (Config.max_health_monitor_threads - 2).clamp(1, nil)
monitorable_resource_types = [VmHost, PostgresServer, Vm.where(~Sshable.where(id: Sequel[:vm][:id]).exists), MinioServer, GithubRunner, VmHostSlice, LoadBalancerVmPort, KubernetesCluster, VictoriaMetricsServer]
health_monitor_queue_size = health_monitor_thread_pool_size + (monitorable_resource_types.sum(&:count) * 1.5).round
health_monitor_queue = SizedQueue.new(health_monitor_queue_size)
metrics_target_resources = {}
metrics_export_mutex = Mutex.new
metrics_export_thread_pool_size = (Config.max_metrics_export_threads - 2).clamp(1, nil)
metrics_target_resource_types = [PostgresServer, VmHost]
metrics_export_queue_size = metrics_export_thread_pool_size + (metrics_target_resource_types.sum(&:count) * 1.5).round
metrics_export_queue = SizedQueue.new(metrics_export_queue_size)
resource_scanner = Thread.new do
loop do
monitorable_resources = monitorable_resource_types.flat_map(&:all)
health_monitor_mutex.synchronize do
monitorable_resources.each do |r|
health_monitor_resources[r.id] ||= MonitorableResource.new(r)
end
end
metrics_resources = metrics_target_resource_types.flat_map(&:all)
metrics_export_mutex.synchronize do
metrics_resources.each do |r|
metrics_target_resources[r.id] ||= MetricsTargetResource.new(r)
end
end
sleep 60
end
rescue => ex
Clog.emit("Resource scanning has failed.") { {resource_scanning_failure: {exception: Util.exception_to_hash(ex)}} }
ThreadPrinter.run
Kernel.exit!
end
health_monitor_thread_pool = Array.new(health_monitor_thread_pool_size) do
Thread.new do
while (r = health_monitor_queue.pop)
r.lock_no_wait do
r.open_resource_session
r.process_event_loop
r.check_pulse
end
end
end
end
metrics_export_thread_pool = Array.new(metrics_export_thread_pool_size) do
Thread.new do
while (r = metrics_export_queue.pop)
r.lock_no_wait do
r.open_resource_session
r.export_metrics
end
end
end
end
begin
loop do
# Since the switch to use a thread pool for monitored resources,
# this emits the number of pulse threads + export threads, not the number of monitor threads + pulse threads
Clog.emit("Active threads count.") { {active_threads_count: Thread.list.count - health_monitor_thread_pool_size - metrics_export_thread_pool_size - 2} }
health_monitor_rs = health_monitor_mutex.synchronize { health_monitor_resources.values }
health_monitor_rs.each do |r|
r.force_stop_if_stuck
health_monitor_queue.push(r)
end
health_monitor_mutex.synchronize { health_monitor_resources.delete_if { |_, r| r.deleted } }
metrics_export_rs = metrics_export_mutex.synchronize { metrics_target_resources.values }
metrics_export_rs.each do |r|
r.force_stop_if_stuck
metrics_export_queue.push(r)
end
metrics_export_mutex.synchronize { metrics_target_resources.delete_if { |_, r| r.deleted } }
sleep 5
end
rescue => ex
Clog.emit("Pulse checking has failed.") { {pulse_checking_failure: {exception: Util.exception_to_hash(ex)}} }
ThreadPrinter.run
Kernel.exit!
end
resource_scanner.join
health_monitor_thread_pool.each { health_monitor_queue.push(nil) }
health_monitor_thread_pool.each(&:join)
metrics_export_thread_pool.each { metrics_export_queue.push(nil) }
metrics_export_thread_pool.each(&:join)