Files
ubicloud/lib/monitor_repartitioner.rb
Jeremy Evans b3f0b57d3f Split monitor into testable classes
Move the MonitorResourceType class into it's own file.

Add MonitorRepartioner class to handle repartitioning.

Add MonitorRunner class to handle the main monitor loop:

* scan (if needed)
* report/emit metrics (if needed)
* check stuck pulses (if needed)
* enqueue

Each of the sections of the MonitorRunner main loop
are in separate methods, so they can be tested
independently.

This doesn't add tests, it only splits monitor into
the 3 classes. Tests for each class will be in subsequent
commits.

This fixes bugs found when testing.
2025-07-30 07:00:03 +09:00

133 lines
4.5 KiB
Ruby

# frozen_string_literal: true
class MonitorRepartitioner
attr_reader :strand_id_range
attr_accessor :repartitioned
def initialize(partition_number, listen_timeout: 1, recheck_seconds: 18, stale_seconds: 40)
@partition_number = partition_number
# Assume when starting that we are the final partition. For cases where we aren't,
# this will quickly be updated after startup.
@num_partitions = partition_number
# Used for NOTIFY, since NOTIFY payload must be a string
@partition_number_string = partition_number.to_s
# Flag set when we have repartitioned, to ensure we do a scan using the new partition
# before enqueuing additional resources.
@repartitioned = true
# This starts out empty, but will be filled in by notifications from the current
# monitor process and other monitor processes.
@partition_times = {}
# Check for shutdown every second
@listen_timeout = listen_timeout
# Check for stale partitions and notify that the current process is still running
# every 18 seconds.
@recheck_seconds = recheck_seconds
# Remove a partition if we have not been notified about it in the last 40 seconds.
# Combined with the above two settings, this means that if the final monitor partition
# process exits, other monitor processes will repartition in 40-59 seconds.
@stale_seconds = stale_seconds
# The next deadline after which to check for stale partitions and notify.
@partition_recheck_time = Time.now + recheck_seconds - rand
@shutdown = false
repartition(partition_number)
end
def shutdown!
@shutdown = true
end
# Notify the monitor channel that we exist, so that other monitor processes
# can repartition appropriately if needed.
def notify
DB.notify(:monitor, payload: @partition_number_string)
end
# Listens on the monitor channel to determine what other monitor processes are
# running, and updates the num_partitions information, so that the current process
# scan thread will use the appropriate partition.
def listen
# If the maximum partition number after rechecking is lower than the currently
# expected partitioning, repartition the current process to expand the
# partition size.
loop = proc do
if (max_partition = repartition_check)&.<(@num_partitions)
repartition(max_partition)
end
end
# Continuouly LISTENs for notifications on the monitor channel until shutdown.
# If notified about a higher partition number than the currently expected
# partitioning, repartition the current process to decrease the partition size.
DB.listen(:monitor, loop:, after_listen: proc { notify }, timeout: @listen_timeout) do |_, _, payload|
throw :stop if @shutdown
unless (partition_num = Integer(payload, exception: false)) && (partition_num <= 8)
Clog.emit("invalid monitor repartition notification") { {monitor_notify_payload: payload} }
next
end
repartition(partition_num) if partition_num > @num_partitions
@partition_times[partition_num] = Time.now
end
end
private
def partition_boundary(partition_num, partition_size)
"%08x-0000-0000-0000-000000000000" % (partition_num * partition_size).to_i
end
# This calculates the partition of the id space that this process will monitor.
def calculate_strand_id_range
partition_size = (16**8) / @num_partitions.to_r
start_id = partition_boundary(@partition_number - 1, partition_size)
@strand_id_range = if @num_partitions == @partition_number
start_id.."ffffffff-ffff-ffff-ffff-ffffffffffff"
else
start_id...partition_boundary(@partition_number, partition_size)
end
end
# Updates the total number of partitions, and sets the repartition flag, so the
# next main loop iteration will run a scan query.
def repartition(np)
@num_partitions = np
calculate_strand_id_range
@repartitioned = true
Clog.emit("monitor repartitioning") {
{monitor_repartition: {
partition_number: @partition_number,
num_partitions: np,
range: @strand_id_range
}}
}
end
# Called every second. Used to exit the listen loop on shutdown, and to NOTIFY
# about the current process and remove stale processes when rechecking.
def repartition_check
throw :stop if @shutdown
t = Time.now
if t > @partition_recheck_time
@partition_recheck_time = t + @recheck_seconds
notify
stale = t - @stale_seconds
@partition_times.reject! { |_, time| time < stale }
@partition_times.keys.max
end
end
end