This extends MonitorRepartitioner to be a bit more generic, and uses a subclass of MonitorRepartition to handle respirate partitioning.
143 lines
4.9 KiB
Ruby
143 lines
4.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class MonitorRepartitioner
|
|
attr_reader :strand_id_range
|
|
|
|
attr_accessor :repartitioned
|
|
|
|
def initialize(partition_number, listen_timeout: 1, recheck_seconds: 18, stale_seconds: 40, max_partition: 8, channel: :monitor)
|
|
@partition_number = partition_number
|
|
|
|
# Assume when starting that we are the final partition. For cases where we aren't,
|
|
# this will quickly be updated after startup.
|
|
@num_partitions = partition_number
|
|
|
|
# Used for NOTIFY, since NOTIFY payload must be a string
|
|
@partition_number_string = partition_number.to_s
|
|
|
|
# Flag set when we have repartitioned, to ensure we do a scan using the new partition
|
|
# before enqueuing additional resources.
|
|
@repartitioned = true
|
|
|
|
# This starts out empty, but will be filled in by notifications from the current
|
|
# monitor process and other monitor processes.
|
|
@partition_times = {}
|
|
|
|
# Check for shutdown every second
|
|
@listen_timeout = listen_timeout
|
|
|
|
# Check for stale partitions and notify that the current process is still running
|
|
# every 18 seconds.
|
|
@recheck_seconds = recheck_seconds
|
|
|
|
# Remove a partition if we have not been notified about it in the last 40 seconds.
|
|
# Combined with the above two settings, this means that if the final monitor partition
|
|
# process exits, other monitor processes will repartition in 40-59 seconds.
|
|
@stale_seconds = stale_seconds
|
|
|
|
# The next deadline after which to check for stale partitions and notify.
|
|
@partition_recheck_time = Time.now + recheck_seconds - rand
|
|
|
|
# The channel to LISTEN and NOTIFY on.
|
|
@channel = channel
|
|
|
|
# The maximum partition we will consider valid.
|
|
@max_partition = max_partition
|
|
|
|
@shutdown = false
|
|
|
|
repartition(partition_number)
|
|
end
|
|
|
|
def shutdown!
|
|
@shutdown = true
|
|
end
|
|
|
|
# Notify the monitor channel that we exist, so that other monitor processes
|
|
# can repartition appropriately if needed.
|
|
def notify
|
|
DB.notify(@channel, payload: @partition_number_string)
|
|
end
|
|
|
|
# Listens on the monitor channel to determine what other monitor processes are
|
|
# running, and updates the num_partitions information, so that the current process
|
|
# scan thread will use the appropriate partition.
|
|
def listen
|
|
# If the maximum partition number after rechecking is lower than the currently
|
|
# expected partitioning, repartition the current process to expand the
|
|
# partition size.
|
|
loop = proc do
|
|
if (max_partition = repartition_check)&.<(@num_partitions)
|
|
repartition(max_partition)
|
|
end
|
|
end
|
|
|
|
allowed_partition_range = 1..@max_partition
|
|
emit_str = "invalid #{@channel} repartition notification"
|
|
emit_key = :"#{@channel}_notify_payload"
|
|
|
|
# Continuouly LISTENs for notifications on the monitor channel until shutdown.
|
|
# If notified about a higher partition number than the currently expected
|
|
# partitioning, repartition the current process to decrease the partition size.
|
|
DB.listen(@channel, loop:, after_listen: proc { notify }, timeout: @listen_timeout) do |_, _, payload|
|
|
throw :stop if @shutdown
|
|
|
|
unless (notify_partition_num = Integer(payload, exception: false)) && allowed_partition_range.cover?(notify_partition_num)
|
|
Clog.emit(emit_str) { {emit_key => payload} }
|
|
next
|
|
end
|
|
|
|
repartition(notify_partition_num) if notify_partition_num > @num_partitions
|
|
@partition_times[notify_partition_num] = Time.now
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def partition_boundary(partition_num, partition_size)
|
|
"%08x-0000-0000-0000-000000000000" % (partition_num * partition_size).to_i
|
|
end
|
|
|
|
# This calculates the partition of the id space that this process will monitor.
|
|
def calculate_strand_id_range
|
|
partition_size = (16**8) / @num_partitions.to_r
|
|
start_id = partition_boundary(@partition_number - 1, partition_size)
|
|
|
|
@strand_id_range = if @num_partitions == @partition_number
|
|
start_id.."ffffffff-ffff-ffff-ffff-ffffffffffff"
|
|
else
|
|
start_id...partition_boundary(@partition_number, partition_size)
|
|
end
|
|
end
|
|
|
|
# Updates the total number of partitions, and sets the repartition flag, so the
|
|
# next main loop iteration will run a scan query.
|
|
def repartition(np)
|
|
@num_partitions = np
|
|
calculate_strand_id_range
|
|
@repartitioned = true
|
|
Clog.emit("monitor repartitioning") {
|
|
{monitor_repartition: {
|
|
partition_number: @partition_number,
|
|
num_partitions: np,
|
|
range: @strand_id_range
|
|
}}
|
|
}
|
|
end
|
|
|
|
# Called every second. Used to exit the listen loop on shutdown, and to NOTIFY
|
|
# about the current process and remove stale processes when rechecking.
|
|
def repartition_check
|
|
throw :stop if @shutdown
|
|
|
|
t = Time.now
|
|
if t > @partition_recheck_time
|
|
@partition_recheck_time = t + @recheck_seconds
|
|
notify
|
|
stale = t - @stale_seconds
|
|
@partition_times.reject! { |_, time| time < stale }
|
|
@partition_times.keys.max
|
|
end
|
|
end
|
|
end
|