ubicloud/bin/monitor
Jeremy Evans 071fa3b560 Rename MonitorRepartitioner to Repartitioner
Make all arguments required keyword arguments.

Change the monitor-specific comments to be generic.

Change the repartition emit to use a message and key based on the
channel, so that respirate processes are not emitting repartition
information that appears to be coming from monitor.
2025-08-26 04:32:52 +09:00

115 lines
3.4 KiB
Ruby
Executable file

#!/usr/bin/env ruby
# frozen_string_literal: true
ENV["MONITOR_PROCESS"] = "1"
partition_number = ARGV[0]
partition_number ||= if (match = /monitor\.(\d+)\z/.match(ENV["DYNO"] || ENV["PS"]))
match[1] # Heroku/Foreman
end
# For simplicity, monitor always runs partitioned. Even if only running a single
# process, in which case, the partition is the entire id space.
partition_number ||= "1"
max_partition = 8
partition_number = Integer(partition_number)
raise "invalid partition_number: #{partition_number}" if partition_number < 1 || partition_number > max_partition
require_relative "../loader"
clover_freeze
# Information (seconds, log message, log key) for stuck pulses for monitor jobs.
monitor_pulse_info = [120, "Pulse check has stuck.", :pulse_check_stuck].freeze
# Information (seconds, log message, log key) for stuck pulses for metric export jobs.
metric_export_pulse_info = [100, "Pulse check has stuck.", :pulse_check_stuck].freeze
if Config.test?
# Run during monitor smoke tests
$stdout.sync = $stderr.sync = true
# Ensure clog output during smoke test
Clog::Config = Struct.new(:test?).new(false)
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "vp"))
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "down"), pulse: "down")
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "evloop"), need_event_loop: true)
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "mc2"), metrics_count: 2)
monitor_models = [MonitorResourceStub]
metric_export_models = [MonitorResourceStub]
runner_args = {scan_every: 1, report_every: 1, enqueue_every: 1, check_stuck_pulses_every: 1}
else
runner_args = {ignore_threads: 2}
monitor_models = [
VmHost,
PostgresServer,
Vm.where(~Sshable.where(id: Sequel[:vm][:id]).exists),
MinioServer,
GithubRunner,
VmHostSlice,
LoadBalancerVmPort,
KubernetesCluster,
VictoriaMetricsServer
]
metric_export_models = [
PostgresServer,
VmHost
]
end
# Handle both monitored resources and metric export resources.
monitor_resources = MonitorResourceType.create(
MonitorableResource,
monitor_pulse_info,
Config.max_health_monitor_threads,
monitor_models
) do
it.process_event_loop
it.check_pulse
end
metric_export_resources = MonitorResourceType.create(
MetricsTargetResource,
metric_export_pulse_info,
Config.max_metrics_export_threads,
metric_export_models,
&:export_metrics
)
repartitioner = Repartitioner.new(partition_number:, channel: :monitor, listen_timeout: 1, recheck_seconds: 18, stale_seconds: 40, max_partition:)
runner = MonitorRunner.new(monitor_resources:, metric_export_resources:, repartitioner:, **runner_args)
repartition_thread = Thread.new { repartitioner.listen }
# Only NOTIFY for the first 3-5 seconds, so that by the time we actually start monitoring,
# all monitor processes know the expected partitioning. The rand is to avoid thundering herd issues.
sleep(1 + rand)
3.times do
repartitioner.notify
sleep 1
end
do_shutdown = proc do
repartitioner.shutdown!
runner.shutdown!
end
Signal.trap("INT", &do_shutdown)
Signal.trap("TERM", &do_shutdown)
runner.run
# If not all threads exit within two seconds, exit 1 to indicate
# unclean shutdown.
exit_status = 1
Thread.new do
repartition_thread.join
[monitor_resources, metric_export_resources].each(&:wait_cleanup!)
# If all threads exit within two seconds, exit 0 to indicate clean shutdown.
exit_status = 0
end.join(2)
exit exit_status