mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-06 14:51:57 +08:00
Make all arguments required keyword arguments. Change the monitor-specific comments to be generic. Change the repartition emit to use a message and key based on the channel, so that respirate processes are not emitting repartition information that appears to be coming from monitor.
115 lines
3.4 KiB
Ruby
Executable file
115 lines
3.4 KiB
Ruby
Executable file
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
ENV["MONITOR_PROCESS"] = "1"
|
|
|
|
partition_number = ARGV[0]
|
|
partition_number ||= if (match = /monitor\.(\d+)\z/.match(ENV["DYNO"] || ENV["PS"]))
|
|
match[1] # Heroku/Foreman
|
|
end
|
|
|
|
# For simplicity, monitor always runs partitioned. Even if only running a single
|
|
# process, in which case, the partition is the entire id space.
|
|
partition_number ||= "1"
|
|
|
|
max_partition = 8
|
|
|
|
partition_number = Integer(partition_number)
|
|
raise "invalid partition_number: #{partition_number}" if partition_number < 1 || partition_number > max_partition
|
|
|
|
require_relative "../loader"
|
|
clover_freeze
|
|
|
|
# Information (seconds, log message, log key) for stuck pulses for monitor jobs.
|
|
monitor_pulse_info = [120, "Pulse check has stuck.", :pulse_check_stuck].freeze
|
|
|
|
# Information (seconds, log message, log key) for stuck pulses for metric export jobs.
|
|
metric_export_pulse_info = [100, "Pulse check has stuck.", :pulse_check_stuck].freeze
|
|
|
|
if Config.test?
|
|
# Run during monitor smoke tests
|
|
$stdout.sync = $stderr.sync = true
|
|
# Ensure clog output during smoke test
|
|
Clog::Config = Struct.new(:test?).new(false)
|
|
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "vp"))
|
|
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "down"), pulse: "down")
|
|
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "evloop"), need_event_loop: true)
|
|
MonitorResourceStub.add(UBID.generate_vanity("et", "mr", "mc2"), metrics_count: 2)
|
|
monitor_models = [MonitorResourceStub]
|
|
metric_export_models = [MonitorResourceStub]
|
|
runner_args = {scan_every: 1, report_every: 1, enqueue_every: 1, check_stuck_pulses_every: 1}
|
|
else
|
|
runner_args = {ignore_threads: 2}
|
|
monitor_models = [
|
|
VmHost,
|
|
PostgresServer,
|
|
Vm.where(~Sshable.where(id: Sequel[:vm][:id]).exists),
|
|
MinioServer,
|
|
GithubRunner,
|
|
VmHostSlice,
|
|
LoadBalancerVmPort,
|
|
KubernetesCluster,
|
|
VictoriaMetricsServer
|
|
]
|
|
|
|
metric_export_models = [
|
|
PostgresServer,
|
|
VmHost
|
|
]
|
|
end
|
|
|
|
# Handle both monitored resources and metric export resources.
|
|
monitor_resources = MonitorResourceType.create(
|
|
MonitorableResource,
|
|
monitor_pulse_info,
|
|
Config.max_health_monitor_threads,
|
|
monitor_models
|
|
) do
|
|
it.process_event_loop
|
|
it.check_pulse
|
|
end
|
|
|
|
metric_export_resources = MonitorResourceType.create(
|
|
MetricsTargetResource,
|
|
metric_export_pulse_info,
|
|
Config.max_metrics_export_threads,
|
|
metric_export_models,
|
|
&:export_metrics
|
|
)
|
|
|
|
repartitioner = Repartitioner.new(partition_number:, channel: :monitor, listen_timeout: 1, recheck_seconds: 18, stale_seconds: 40, max_partition:)
|
|
|
|
runner = MonitorRunner.new(monitor_resources:, metric_export_resources:, repartitioner:, **runner_args)
|
|
|
|
repartition_thread = Thread.new { repartitioner.listen }
|
|
|
|
# Only NOTIFY for the first 3-5 seconds, so that by the time we actually start monitoring,
|
|
# all monitor processes know the expected partitioning. The rand is to avoid thundering herd issues.
|
|
sleep(1 + rand)
|
|
3.times do
|
|
repartitioner.notify
|
|
sleep 1
|
|
end
|
|
|
|
do_shutdown = proc do
|
|
repartitioner.shutdown!
|
|
runner.shutdown!
|
|
end
|
|
|
|
Signal.trap("INT", &do_shutdown)
|
|
Signal.trap("TERM", &do_shutdown)
|
|
|
|
runner.run
|
|
|
|
# If not all threads exit within two seconds, exit 1 to indicate
|
|
# unclean shutdown.
|
|
exit_status = 1
|
|
|
|
Thread.new do
|
|
repartition_thread.join
|
|
[monitor_resources, metric_export_resources].each(&:wait_cleanup!)
|
|
# If all threads exit within two seconds, exit 0 to indicate clean shutdown.
|
|
exit_status = 0
|
|
end.join(2)
|
|
|
|
exit exit_status
|