Files
ubicloud/scheduling/dispatcher.rb
Jeremy Evans afd3967f2e Make respirate and monitor share repartitioning code
This extends MonitorRepartitioner to be a bit more generic,
and uses a subclass of MonitorRepartition to handle respirate
partitioning.
2025-08-07 03:03:12 +09:00

513 lines
21 KiB
Ruby

# frozen_string_literal: true
# Scheduling::Dispatcher is used for running strands. It finds strands
# that should be run, and runs them using a thread pool. It keeps a
# record of which strands are currently being processed, so that it
# does not retrieve them.
class Scheduling::Dispatcher
attr_reader :shutting_down
STRAND_RUNTIME = Strand::LEASE_EXPIRATION / 4
APOPTOSIS_MUTEX = Mutex.new
class Repartitioner < MonitorRepartitioner
def initialize(*, dispatcher:, **)
@dispatcher = dispatcher
super(*, **)
end
# Update the dispatcher prepared statement when repartitioning.
def repartition(num_partitions)
super
@dispatcher.setup_prepared_statements(strand_id_range:)
end
end
# Arguments:
# apoptosis_timeout :: The number of seconds a strand is allowed to
# run before causing apoptosis.
# listen_timeout :: The number of seconds to wait for a single notification
# when listening. Only has an effect when the process uses
# partitioning. Generally only changed in the tests to make
# shutdown faster.
# pool_size :: The number of threads in the thread pool.
# partition_number :: The partition number of the current respirate process. A nil
# value means the process does not use partitioning.
def initialize(apoptosis_timeout: Strand::LEASE_EXPIRATION - 29, pool_size: Config.dispatcher_max_threads, partition_number: nil, listen_timeout: 1)
@shutting_down = false
# How long to wait in seconds from the start of strand run
# for the strand to finish until killing the process.
@apoptosis_timeout = apoptosis_timeout
# Hash of currently executing strands. This can be accessed by
# multiple threads concurrently, so access is protected via a mutex.
@current_strands = {}
# Mutex for current strands
@mutex = Mutex.new
# Set configured limits on pool size. This will raise if the maximum number
# of threads is lower than the minimum.
pool_size = pool_size.clamp(Config.dispatcher_min_threads, Config.dispatcher_max_threads)
needed_db_connections = 1
needed_db_connections += 1 if partition_number
# Ensure thread pool size is sane. It needs to be at least 1, we cannot
# use more threads than database connections (db_pool - 1), and we need
# separate database connections for the scan thread and the repartition
# thread.
pool_size = pool_size.clamp(1, Config.db_pool - 1 - needed_db_connections)
# The queue size is 4 times the size of the thread pool by default, as that should
# ensure that for a busy thread pool, there are always strands to run.
# This should only cause issues if the thread pool can process more than
# 4 times its size in the time it takes the main thread to refill the queue.
@queue_size = (pool_size * Config.dispatcher_queue_size_ratio).round.clamp(1, nil)
# The Queue that all threads in the thread pool pull from. This is a
# SizedQueue to allow for backoff in the case that the thread pool cannot
# process jobs fast enough. When the queue is full, the main thread to
# push strands into the queue blocks until the queue is no longer full.
@strand_queue = SizedQueue.new(@queue_size)
# An array of thread pool data. This starts pool_size * 2 threads. Half
# of the threads are strand threads, responsible for running the strands.
# The other half are apoptosis threads, each responsible for monitoring
# the related strand thread. The strand thread notifies the apoptosis
# thread via a queue when it starts, and also when it finishes. If a
# strand thread does not notify the apoptosis thread of it finishing
# before the apoptosis timeout, then the process exits.
@thread_data = Array.new(pool_size) do
start_thread_pair(@strand_queue)
end
# The Queue for submitting metrics. This is pushed to for each strand run.
@metrics_queue = Queue.new
# The thread that processes the metrics queue and emits metrics.
@metrics_thread = Thread.new { metrics_thread(@metrics_queue) }
# The partition number for the current process.
@partition_number = partition_number
# The partition number as a string, used in NOTIFY statements.
@partition_number_string = partition_number.to_s
# How long to wait for each NOTIFY. By default, this is 1 second, to
# ensure that heartbeat NOTIFY for the current process and possible
# rebalancing takes place about once every second.
@listen_timeout = listen_timeout
# The delay experienced for strands for this partition. Stays at 0 for
# an unpartitioned respirate. Partitioned respirate will update this
# every time it emits metrics, so that if it is backed up processing
# current strands, it assumes other respirate processes are also backed
# up, and will not pick up their strands for an additional time.
@current_strand_delay = 0
# Default number of strands per second. This will be updated based on
# based on metrics. Used for calculating the sleep duration.
@strands_per_second = 1
if partition_number
# Handles repartitioning when new partitions show up or old partitions
# go stale.
@repartitioner = Repartitioner.new(partition_number, channel: :respirate,
max_partition: 256, dispatcher: self, listen_timeout:,
recheck_seconds: listen_timeout * 2, stale_seconds: listen_timeout * 4)
# The thread that listens for changes in the number of respirate processes
# and adjusts the partition range accordingly.
@repartition_thread = Thread.new { @repartitioner.listen }
else
# Setup an unpartitioned prepared statement.
# This method is called implicitly with the appropriate partition when
# initializing the repartitioner in the if branch.
setup_prepared_statements(strand_id_range: nil)
end
end
# Wait for all threads to exit after shutdown is set.
# Designed for use in the tests.
def shutdown_and_cleanup_threads
# Make idempotent
return if @cleaned_up
shutdown
@cleaned_up = true
# Signal all threads to shutdown. This isn't done by
# default in shutdown as pushing to the queue can block.
# We use SizedQueue#close here to allow shutdown to proceed
# even if the scan thread is blocked on pushing to the queue
# and all strand threads are busy processing strands.
@strand_queue.close
# Close down the metrics processing. This pushes nil to the
# metrics_queue instead of using close, avoiding the need
# to rescue ClosedQueueError in the run_strand ensure block.
@metrics_queue.push(nil)
@metrics_thread.join
# Close down the repartition thread if is exists. Note that
# this can block for up to a second.
@repartition_thread&.join
# After all queues have been pushed to, it is safe to
# attempt joining them.
@thread_data.each do |data|
data[:apoptosis_thread].join
data[:strand_thread].join
end
end
# Signal that the dispatcher should shut down. This only sets a flag.
# Strand threads will continue running their current thread, and
# apoptosis threads will continue running until after their related
# strand thread signals them to exit.
def shutdown
@shutting_down = true
@repartitioner&.shutdown!
end
def setup_prepared_statements(strand_id_range:)
# A prepared statement to get the strands to run. A prepared statement
# is used to reduce parsing/planning work in the database.
ds = Strand
.where(Sequel[:lease] < Sequel::CURRENT_TIMESTAMP)
.where(exitval: nil)
.order_by(:schedule)
.limit(@queue_size)
.exclude(id: Sequel.function(:ANY, Sequel.cast(:$skip_strands, "uuid[]")))
.select(:id, :schedule, :lease)
.for_no_key_update
.skip_locked
# If a partition is given, limit the strands to the partition.
# Create a separate prepared statement for older strands, not tied to
# the current partition, to allow for graceful degradation.
if strand_id_range
# The old strand prepared statement does not change based on the
# number of partitions, so no reason to reprepare it.
@old_strand_ps ||= ds
.where(Sequel[:schedule] < Sequel::CURRENT_TIMESTAMP - Sequel.cast(Sequel.cast(:$old_strand_delay, String) + " seconds", :interval))
.prepare(:select, :get_old_strand_cohort)
ds = ds.where(id: strand_id_range)
Clog.emit("respirate repartitioning") { {partition: strand_id_range} }
else
@old_strand_ps = nil
end
@strand_ps = ds
.where(Sequel[:schedule] < Sequel::CURRENT_TIMESTAMP)
.prepare(:select, :get_strand_cohort)
end
# Thread responsible for collecting and emitting metrics. This emits after every
# 1000 strand runs, as that allows easy calculations of the related metrics and is
# not too to burdensome on the logging infrastructure, while still being helpful.
def metrics_thread(metrics_queue)
array = []
t = Time.now
while (metric = metrics_queue.pop)
array << metric
if array.size == METRICS_EVERY
new_t = Time.now
Clog.emit("respirate metrics") { {respirate_metrics: metrics_hash(array, new_t - t)} }
t = new_t
array.clear
end
end
end
METRIC_TYPES = %i[scan_delay queue_delay lease_delay total_delay queue_size available_workers].freeze
# The batch size for metrics output. Mezmo batches their real time graph in 30 second
# intervals, and a batch size of 1000 meant that there would be intervals where Mezmo
# would see no results. Even when it saw results, the results would be choppy. By
# outputing more frequently, we should get smoother and more accurate graphs.
METRICS_EVERY = 200
# Calculate the necessary offsets up front for the median/p75/p85/p95/p99 numbers,
# and the multiplier to get the lease acquired percentage.
METRICS_MEDIAN = (METRICS_EVERY * 0.5r).floor
METRICS_P75 = (METRICS_EVERY * 0.75r).floor
METRICS_P85 = (METRICS_EVERY * 0.85r).floor
METRICS_P95 = (METRICS_EVERY * 0.95r).floor
METRICS_P99 = (METRICS_EVERY * 0.99r).floor
METRICS_PERCENTAGE = 100.0 / METRICS_EVERY
# Metrics to emit. This assumes an array size of 1000. The following metrics are emitted:
#
# Strand delay metrics:
#
# scan_delay :: Time between when the strand was scheduled, and when the scan query
# picked the strand up.
# queue_delay :: Time between when the scan query picked up the strand, and when a
# worker thread started working on the strand.
# lease_delay :: Time between when a worker thread started working on the strand, and
# when it acquired (or failed to acquire) the strand's lease.
#
# Respirate internals metrics:
#
# queue_size :: The size of the strand queue after the worker thread picked up the strand.
# This is the backlog of strands waiting to be processed.
# available_workers :: The number of idle worker threads that are waiting to work on strands.
# There should only be an idle worker if the queue size is currently 0.
#
# For the above metrics, we compute the average, median, P75, P85, P95, P99, and maximum
# values.
#
# In addition to the above metrics, there are additional metrics:
#
# lease_acquire_percentage :: Percentage of strands where the lease was successfully acquired.
# For single respirate processes, or normally running respirate
# partitioned processes (1 process per partition), this should be
# 100.0. For multi-process, non-partitioned respirate, this can
# be significantly lower, as multiple processes try to process
# the same strand concurrently, and some fail to acquire the lease.
# old_strand_percentage :: Percentage of strands that were processed that were outside the
# current partition. Should always be 0 if respirate is not partitioned.
def metrics_hash(array, elapsed_time)
respirate_metrics = {}
METRIC_TYPES.each do |metric_type|
metrics = array.map(&metric_type)
metrics.sort!
average = metrics.sum / METRICS_EVERY
median = metrics[METRICS_MEDIAN]
p75 = metrics[METRICS_P75]
p85 = metrics[METRICS_P85]
p95 = metrics[METRICS_P95]
p99 = metrics[METRICS_P99]
max = metrics.last
respirate_metrics[metric_type] = {average:, median:, p75:, p85:, p95:, p99:, max:}
end
respirate_metrics[:lease_acquire_percentage] = array.count(&:lease_acquired) * METRICS_PERCENTAGE
respirate_metrics[:old_strand_percentage] = array.count(&:old_strand) * METRICS_PERCENTAGE
respirate_metrics[:lease_expired_percentage] = array.count(&:lease_expired) * METRICS_PERCENTAGE
respirate_metrics[:strand_count] = METRICS_EVERY
@strands_per_second = METRICS_EVERY / elapsed_time
respirate_metrics[:strands_per_second] = @strands_per_second
# Use the p95 of total delay to set the delay for the current strands.
# Ignore the delay for old strands when calculating delay for current strands.
# Using the maximum/p99 numbers may cause too long delay if there are
# outliers. Using lower numbers (e.g. median/p75) may not accurately reflect the
# current delay numbers if the delay is increasing rapidly.
array.reject!(&:old_strand)
array.map!(&:total_delay)
respirate_metrics[:current_strand_delay] = @current_strand_delay = array[(array.count * 0.95r).floor] || 0
respirate_metrics
end
# The amount of time to sleep if no strands or old strands were picked up
# during the last scan loop.
def sleep_duration
# Set base sleep duration based on on queue size and the number of strands processed
# per second. If you are processing 10 strands per second, and there are 5 strands
# in the queue, it should take about 0.5 seconds to get through the existing strands.
# Multiply by 0.75, since @strands_per_second is only updated occassionally by
# the metrics thread.
sleep_duration = ((@strand_queue.size * 0.75) / @strands_per_second)
# You don't want to query the database too often, especially when the queue is empty
# and respirate is idle. Estimate how idle the respirate process is by looking at
# the percentage of available workers, and use that as a lower bound. If you have
# 6 available workers and 8 total workers, that's close to idle, set a minimum
# sleep time of 0.75 seconds. If you have 2 available workers and 8 total workers,
# that's pretty busy, set a minimum sleep time of 0.25 seconds.
available_workers = @strand_queue.num_waiting
workers = @thread_data.size
sleep_duration = sleep_duration.clamp(available_workers.fdiv(workers), nil)
# Finally, set asbolute minimum sleep time to 0.2 seconds, to not overload the
# database, and set absolute maximum sleep time to 1 second, to not wait too long
# to pick up strands.
sleep_duration.clamp(0.2, 1)
end
# Start a strand/apoptosis thread pair, where the strand thread will
# pull from the given strand queue.
#
# On strand run start:
#
# * The strand thread pushes the strand ubid to the start queue,
# and the apoptosis thread pops it and starts a timed pop on the
# finish queue.
#
# On strand run finish:
#
# * The strand thread pushes to the finish queue to signal to the
# apoptosis thread that it is finished. The strand goes back
# to monitoring the strand queue, and the apoptosis thread goes
# back to monitoring the start queue.
#
# On strand run timeout:
#
# * If the timed pop of the finish queue by the apoptosis thread
# does not complete in time, the apoptosis thread kills the process.
def start_thread_pair(strand_queue)
start_queue = Queue.new
finish_queue = Queue.new
{
start_queue:,
finish_queue:,
apoptosis_thread: Thread.new { apoptosis_thread(start_queue, finish_queue) },
strand_thread: Thread.new { DB.synchronize { strand_thread(strand_queue, start_queue, finish_queue) } }
}
end
# If the process is shutting down, return an empty array. Otherwise
# call the database prepared statement to find the strands to run,
# excluding the strands that are currently running in the thread pool.
# If respirate processes are partitioned, this will only return
# strands for the current partition.
def scan
@shutting_down ? [] : @strand_ps.call(skip_strands:).each(&:scan_picked_up!)
end
# Similar to scan, but return older strands which may not be
# related to the current partition. This is to allow for
# graceful degradation if a partitioned respirate process
# crashes or experiences apoptosis.
def scan_old
unless @shutting_down
@old_strand_ps&.call(skip_strands:, old_strand_delay:)&.each(&:scan_picked_up!)&.each(&:old_strand!)
end || []
end
# The number of seconds to wait before picking up strands from
# outside the current partition. This uses how long it is
# taking to process strands in the current partition, adds 20%
# to that, then adds an additional 5 seconds. The reason for
# 20% is the current strand delay is only recalculated every
# METRICS_EVERY, so this allows some padding in case the delay
# number is growing quickly.
def old_strand_delay
(@current_strand_delay * 1.2) + 5
end
# A pg_array for the strand ids to skip. These are the strand
# ids that have been enqueued or are currently being processed
# by strand threads.
def skip_strands
Sequel.pg_array(@mutex.synchronize { @current_strands.keys })
end
# The number of strands the thread pool is currently running.
def num_current_strands
@mutex.synchronize { @current_strands.size }
end
# The entry point for apoptosis threads. This loops until the
# dispatcher shuts down, monitoring the start queue, and once
# signaled via the start queue, kills the process unless it is
# signaled via the finish queue within the apoptosis timeout.
def apoptosis_thread(start_queue, finish_queue)
timeout = @apoptosis_timeout
until @shutting_down
break unless apoptosis_run(timeout, start_queue, finish_queue)
end
rescue
apoptosis_failure
end
# Performs a single apoptosis run. Waits until signaled by the
# start queue, then does a timed pop of the finish queue, killing
# the process if the pop times out.
def apoptosis_run(timeout, start_queue, finish_queue)
return unless (strand_ubid = start_queue.pop)
Thread.current.name = "apoptosis:#{strand_ubid}"
unless finish_queue.pop(timeout:)
apoptosis_failure
end
true
end
# Handle timeout of a strand thread by killing the process.
def apoptosis_failure
# Timed out, dump threads and exit.
# Don't thread print concurrently.
APOPTOSIS_MUTEX.synchronize do
ThreadPrinter.run
Kernel.exit! 2
end
end
# The entry point for strand threads. The loops until the
# dispatcher shuts down, monitoring the strand queue, signalling
# the related apoptosis thread when starting, running the strand,
# and then signalling the related apoptosis thread when the
# strand run finishes.
def strand_thread(strand_queue, start_queue, finish_queue)
while (strand = strand_queue.pop) && !@shutting_down
strand.worker_started!
metrics = strand.respirate_metrics
metrics.queue_size = strand_queue.size
metrics.available_workers = strand_queue.num_waiting
run_strand(strand, start_queue, finish_queue)
end
ensure
# Signal related apoptosis thread to shutdown
start_queue.push(nil)
end
# Handle the running of a single strand. Signals the apoptosis
# thread via the start queue when starting, and the finish queue
# when exiting.
def run_strand(strand, start_queue, finish_queue)
strand_ubid = strand.ubid.freeze
Thread.current.name = strand_ubid
start_queue.push(strand_ubid)
strand.run(STRAND_RUNTIME)
rescue => ex
Clog.emit("exception terminates strand run") { Util.exception_to_hash(ex) }
cause = ex
loop do
break unless (cause = cause.cause)
Clog.emit("nested exception") { Util.exception_to_hash(cause) }
end
ex
ensure
# Always signal apoptosis thread that the strand has finished,
# even for non-StandardError exits
finish_queue.push(true)
@mutex.synchronize { @current_strands.delete(strand.id) }
@metrics_queue.push(strand.respirate_metrics)
# If there are any sessions in the thread-local (really fiber-local) ssh
# cache after the strand run, close them eagerly to close the related
# file descriptors, then clear the cache to avoid a memory leak.
if (cache = Thread.current[:clover_ssh_cache]) && !cache.empty?
cache.each_value do
# closing the ssh connection shouldn't raise, but just in case it
# does, we want to ignore it so the strand thread doesn't exit.
it.close
rescue
end
cache.clear
end
end
# Find strands that need to be run, and push each onto the
# strand queue. This can block if the strand queue is full,
# to allow for backoff in the case of a busy thread pool.
def start_cohort(strands = scan)
strand_queue = @strand_queue
current_strands = @current_strands
strands.each do |strand|
break if @shutting_down
@mutex.synchronize { current_strands[strand.id] = true }
strand_queue.push(strand)
rescue ClosedQueueError
end
strands.size == 0
end
end