New logic is added to VmHost model to serve metrics config to start collecting node_exporter metrics on the VmHosts. Also a new label is added to the VmHost prog to install the right scripts to collect the metrics from the node_exporter in the right directory for bin/monitor to collect
100 lines
3.5 KiB
Ruby
Executable File
100 lines
3.5 KiB
Ruby
Executable File
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
require_relative "../loader"
|
|
clover_freeze
|
|
|
|
health_monitor_resources = {}
|
|
health_monitor_mutex = Mutex.new
|
|
health_monitor_thread_pool_size = (Config.max_health_monitor_threads - 2).clamp(1, nil)
|
|
monitorable_resource_types = [VmHost, PostgresServer, Vm.where(~Sshable.where(id: Sequel[:vm][:id]).exists), MinioServer, GithubRunner, VmHostSlice, LoadBalancerVmPort, KubernetesCluster, VictoriaMetricsServer]
|
|
health_monitor_queue_size = health_monitor_thread_pool_size + (monitorable_resource_types.sum(&:count) * 1.5).round
|
|
health_monitor_queue = SizedQueue.new(health_monitor_queue_size)
|
|
|
|
metrics_target_resources = {}
|
|
metrics_export_mutex = Mutex.new
|
|
metrics_export_thread_pool_size = (Config.max_metrics_export_threads - 2).clamp(1, nil)
|
|
metrics_target_resource_types = [PostgresServer, VmHost]
|
|
metrics_export_queue_size = metrics_export_thread_pool_size + (metrics_target_resource_types.sum(&:count) * 1.5).round
|
|
metrics_export_queue = SizedQueue.new(metrics_export_queue_size)
|
|
|
|
resource_scanner = Thread.new do
|
|
loop do
|
|
monitorable_resources = monitorable_resource_types.flat_map(&:all)
|
|
health_monitor_mutex.synchronize do
|
|
monitorable_resources.each do |r|
|
|
health_monitor_resources[r.id] ||= MonitorableResource.new(r)
|
|
end
|
|
end
|
|
metrics_resources = metrics_target_resource_types.flat_map(&:all)
|
|
metrics_export_mutex.synchronize do
|
|
metrics_resources.each do |r|
|
|
metrics_target_resources[r.id] ||= MetricsTargetResource.new(r)
|
|
end
|
|
end
|
|
|
|
sleep 60
|
|
end
|
|
rescue => ex
|
|
Clog.emit("Resource scanning has failed.") { {resource_scanning_failure: {exception: Util.exception_to_hash(ex)}} }
|
|
ThreadPrinter.run
|
|
Kernel.exit!
|
|
end
|
|
|
|
health_monitor_thread_pool = Array.new(health_monitor_thread_pool_size) do
|
|
Thread.new do
|
|
while (r = health_monitor_queue.pop)
|
|
r.lock_no_wait do
|
|
r.open_resource_session
|
|
r.process_event_loop
|
|
r.check_pulse
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
metrics_export_thread_pool = Array.new(metrics_export_thread_pool_size) do
|
|
Thread.new do
|
|
while (r = metrics_export_queue.pop)
|
|
r.lock_no_wait do
|
|
r.open_resource_session
|
|
r.export_metrics
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
begin
|
|
loop do
|
|
# Since the switch to use a thread pool for monitored resources,
|
|
# this emits the number of pulse threads + export threads, not the number of monitor threads + pulse threads
|
|
Clog.emit("Active threads count.") { {active_threads_count: Thread.list.count - health_monitor_thread_pool_size - metrics_export_thread_pool_size - 2} }
|
|
|
|
health_monitor_rs = health_monitor_mutex.synchronize { health_monitor_resources.values }
|
|
health_monitor_rs.each do |r|
|
|
r.force_stop_if_stuck
|
|
health_monitor_queue.push(r)
|
|
end
|
|
health_monitor_mutex.synchronize { health_monitor_resources.delete_if { |_, r| r.deleted } }
|
|
|
|
metrics_export_rs = metrics_export_mutex.synchronize { metrics_target_resources.values }
|
|
metrics_export_rs.each do |r|
|
|
r.force_stop_if_stuck
|
|
metrics_export_queue.push(r)
|
|
end
|
|
metrics_export_mutex.synchronize { metrics_target_resources.delete_if { |_, r| r.deleted } }
|
|
|
|
sleep 5
|
|
end
|
|
rescue => ex
|
|
Clog.emit("Pulse checking has failed.") { {pulse_checking_failure: {exception: Util.exception_to_hash(ex)}} }
|
|
ThreadPrinter.run
|
|
Kernel.exit!
|
|
end
|
|
|
|
resource_scanner.join
|
|
health_monitor_thread_pool.each { health_monitor_queue.push(nil) }
|
|
health_monitor_thread_pool.each(&:join)
|
|
metrics_export_thread_pool.each { metrics_export_queue.push(nil) }
|
|
metrics_export_thread_pool.each(&:join)
|