Files
ubicloud/scheduling/dispatcher.rb
Jeremy Evans d1f8e9b332 Use FOR NO KEY UPDATE instead of FOR UPDATE
FOR UPDATE takes stronger locks, and is only needed if you plan on
deleting the row or modifying one of key columns.  If you use FOR
UPDATE, you block other transactions insert rows that reference a
row using a FOR UPDATE lock.  From looking at these cases, I don't
think that is the case, so using FOR NO KEY UPDATE should allow
the assurances we need without unnecessarily blocking other
concurrent transactions.
2025-07-04 09:26:06 +09:00

556 lines
23 KiB
Ruby

# frozen_string_literal: true
# Scheduling::Dispatcher is used for running strands. It finds strands
# that should be run, and runs them using a thread pool. It keeps a
# record of which strands are currently being processed, so that it
# does not retrieve them.
class Scheduling::Dispatcher
attr_reader :shutting_down
STRAND_RUNTIME = Strand::LEASE_EXPIRATION / 4
APOPTOSIS_MUTEX = Mutex.new
# Arguments:
# apoptosis_timeout :: The number of seconds a strand is allowed to
# run before causing apoptosis.
# listen_timeout :: The number of seconds to wait for a single notification
# when listening. Only has an effect when the process uses
# partitioning. Generally only changed in the tests to make
# shutdown faster.
# pool_size :: The number of threads in the thread pool.
# partition_number :: The partition number of the current respirate process. A nil
# value means the process does not use partitioning.
def initialize(apoptosis_timeout: Strand::LEASE_EXPIRATION - 29, pool_size: Config.dispatcher_max_threads, partition_number: nil, listen_timeout: 1)
@shutting_down = false
# How long to wait in seconds from the start of strand run
# for the strand to finish until killing the process.
@apoptosis_timeout = apoptosis_timeout
# Hash of currently executing strands. This can be accessed by
# multiple threads concurrently, so access is protected via a mutex.
@current_strands = {}
# Mutex for current strands
@mutex = Mutex.new
# Set configured limits on pool size. This will raise if the maximum number
# of threads is lower than the minimum.
pool_size = pool_size.clamp(Config.dispatcher_min_threads, Config.dispatcher_max_threads)
needed_db_connections = 1
needed_db_connections += 1 if partition_number
# Ensure thread pool size is sane. It needs to be at least 1, we cannot
# use more threads than database connections (db_pool - 1), and we need
# separate database connections for the scan thread and the repartition
# thread.
pool_size = pool_size.clamp(1, Config.db_pool - 1 - needed_db_connections)
# The queue size is 4 times the size of the thread pool by default, as that should
# ensure that for a busy thread pool, there are always strands to run.
# This should only cause issues if the thread pool can process more than
# 4 times its size in the time it takes the main thread to refill the queue.
@queue_size = (pool_size * Config.dispatcher_queue_size_ratio).round.clamp(1, nil)
# The Queue that all threads in the thread pool pull from. This is a
# SizedQueue to allow for backoff in the case that the thread pool cannot
# process jobs fast enough. When the queue is full, the main thread to
# push strands into the queue blocks until the queue is no longer full.
@strand_queue = SizedQueue.new(@queue_size)
# An array of thread pool data. This starts pool_size * 2 threads. Half
# of the threads are strand threads, responsible for running the strands.
# The other half are apoptosis threads, each responsible for monitoring
# the related strand thread. The strand thread notifies the apoptosis
# thread via a queue when it starts, and also when it finishes. If a
# strand thread does not notify the apoptosis thread of it finishing
# before the apoptosis timeout, then the process exits.
@thread_data = Array.new(pool_size) do
start_thread_pair(@strand_queue)
end
# The Queue for submitting metrics. This is pushed to for each strand run.
@metrics_queue = Queue.new
# The thread that processes the metrics queue and emits metrics.
@metrics_thread = Thread.new { metrics_thread(@metrics_queue) }
# The partition number for the current process.
@partition_number = partition_number
# The partition number as a string, used in NOTIFY statements.
@partition_number_string = partition_number.to_s
# How long to wait for each NOTIFY. By default, this is 1 second, to
# ensure that heartbeat NOTIFY for the current process and possible
# rebalancing takes place about once every second.
@listen_timeout = listen_timeout
# The delay experienced for strands for this partition. Stays at 0 for
# an unpartitioned respirate. Partitioned respirate will update this
# every time it emits metrics, so that if it is backed up processing
# current strands, it assumes other respirate processes are also backed
# up, and will not pick up their strands for an additional time.
@current_strand_delay = 0
# Default number of strands per second. This will be updated based on
# based on metrics. Used for calculating the sleep duration.
@strands_per_second = 1
# Ensure initial unpartitioned prepared statement setup before listening
# for partition changes. In a partitioned environment, this should
# quickly be called be the repartition thread once it determines
# the number of partitions. Assume by default that this is the highest
# partition.
setup_prepared_statements(num_partitions: partition_number)
if partition_number
# The thread that listens for changes in the number of respirate processes
# and adjusts the partition range accordingly.
@repartition_thread = Thread.new { repartition_thread }
end
end
# Wait for all threads to exit after shutdown is set.
# Designed for use in the tests.
def shutdown_and_cleanup_threads
# Make idempotent
return if @cleaned_up
shutdown
@cleaned_up = true
# Signal all threads to shutdown. This isn't done by
# default in shutdown as pushing to the queue can block.
# We use SizedQueue#close here to allow shutdown to proceed
# even if the scan thread is blocked on pushing to the queue
# and all strand threads are busy processing strands.
@strand_queue.close
# Close down the metrics processing. This pushes nil to the
# metrics_queue instead of using close, avoiding the need
# to rescue ClosedQueueError in the run_strand ensure block.
@metrics_queue.push(nil)
@metrics_thread.join
# Close down the repartition thread if is exists. Note that
# this can block for up to a second.
@repartition_thread&.join
# After all queues have been pushed to, it is safe to
# attempt joining them.
@thread_data.each do |data|
data[:apoptosis_thread].join
data[:strand_thread].join
end
end
# Signal that the dispatcher should shut down. This only sets a flag.
# Strand threads will continue running their current thread, and
# apoptosis threads will continue running until after their related
# strand thread signals them to exit.
def shutdown
@shutting_down = true
end
def notify_partition
DB.notify(:respirate, payload: @partition_number_string)
end
def repartition_check(partition_times)
throw :stop if @shutting_down
t = Time.now
if t > @partition_recheck_time
@partition_recheck_time = t + (@listen_timeout * 2)
notify_partition
stale = t - (@listen_timeout * 4)
partition_times.reject! { |_, time| time < stale }
partition_times.keys.max
end
end
def repartition_thread
partition_times = {}
num_partitions = @partition_number
loop = proc do
if (max_partition = repartition_check(partition_times))&.<(num_partitions)
num_partitions = max_partition
setup_prepared_statements(num_partitions:)
end
end
# Set the initial recheck time. Use rand to prevent the thundering herd.
@partition_recheck_time = Time.now + (@listen_timeout * rand * 2)
DB.listen(:respirate, loop:, after_listen: proc { notify_partition }, timeout: @listen_timeout) do |_, _, payload|
unless (partition_num = Integer(payload, exception: false)) && (partition_num <= 256)
Clog.emit("invalid respirate repartition notification") { {payload:} }
next
end
if partition_num > num_partitions
num_partitions = partition_num
setup_prepared_statements(num_partitions:)
end
partition_times[partition_num] = Time.now
end
end
def setup_prepared_statements(num_partitions: nil)
# A prepared statement to get the strands to run. A prepared statement
# is used to reduce parsing/planning work in the database.
ds = Strand
.where(Sequel[:lease] < Sequel::CURRENT_TIMESTAMP)
.where(exitval: nil)
.order_by(:schedule)
.limit(@queue_size)
.exclude(id: Sequel.function(:ANY, Sequel.cast(:$skip_strands, "uuid[]")))
.select(:id, :schedule, :lease)
.for_no_key_update
.skip_locked
# If a partition is given, limit the strands to the partition.
# Create a separate prepared statement for older strands, not tied to
# the current partition, to allow for graceful degradation.
if @partition_number && num_partitions
# The old strand prepared statement does not change based on the
# number of partitions, so no reason to reprepare it.
@old_strand_ps ||= ds
.where(Sequel[:schedule] < Sequel::CURRENT_TIMESTAMP - Sequel.cast(Sequel.cast(:$old_strand_delay, String) + " seconds", :interval))
.prepare(:select, :get_old_strand_cohort)
partition = strand_id_range(num_partitions)
ds = ds.where(id: partition)
Clog.emit("respirate repartitioning") { {partition:} }
else
@old_strand_ps = nil
end
@strand_ps = ds
.where(Sequel[:schedule] < Sequel::CURRENT_TIMESTAMP)
.prepare(:select, :get_strand_cohort)
end
def partition_boundary(partition_num, partition_size)
"%08x-0000-0000-0000-000000000000" % (partition_num * partition_size).to_i
end
def strand_id_range(num_partitions)
partition_size = (16**8) / num_partitions.to_r
start_id = partition_boundary(@partition_number - 1, partition_size)
if num_partitions == @partition_number
start_id.."ffffffff-ffff-ffff-ffff-ffffffffffff"
else
start_id...partition_boundary(@partition_number, partition_size)
end
end
# Thread responsible for collecting and emitting metrics. This emits after every
# 1000 strand runs, as that allows easy calculations of the related metrics and is
# not too to burdensome on the logging infrastructure, while still being helpful.
def metrics_thread(metrics_queue)
array = []
t = Time.now
while (metric = metrics_queue.pop)
array << metric
if array.size == METRICS_EVERY
new_t = Time.now
Clog.emit("respirate metrics") { {respirate_metrics: metrics_hash(array, new_t - t)} }
t = new_t
array.clear
end
end
end
METRIC_TYPES = %i[scan_delay queue_delay lease_delay total_delay queue_size available_workers].freeze
# The batch size for metrics output. Mezmo batches their real time graph in 30 second
# intervals, and a batch size of 1000 meant that there would be intervals where Mezmo
# would see no results. Even when it saw results, the results would be choppy. By
# outputing more frequently, we should get smoother and more accurate graphs.
METRICS_EVERY = 200
# Calculate the necessary offsets up front for the median/p75/p85/p95/p99 numbers,
# and the multiplier to get the lease acquired percentage.
METRICS_MEDIAN = (METRICS_EVERY * 0.5r).floor
METRICS_P75 = (METRICS_EVERY * 0.75r).floor
METRICS_P85 = (METRICS_EVERY * 0.85r).floor
METRICS_P95 = (METRICS_EVERY * 0.95r).floor
METRICS_P99 = (METRICS_EVERY * 0.99r).floor
METRICS_PERCENTAGE = 100.0 / METRICS_EVERY
# Metrics to emit. This assumes an array size of 1000. The following metrics are emitted:
#
# Strand delay metrics:
#
# scan_delay :: Time between when the strand was scheduled, and when the scan query
# picked the strand up.
# queue_delay :: Time between when the scan query picked up the strand, and when a
# worker thread started working on the strand.
# lease_delay :: Time between when a worker thread started working on the strand, and
# when it acquired (or failed to acquire) the strand's lease.
#
# Respirate internals metrics:
#
# queue_size :: The size of the strand queue after the worker thread picked up the strand.
# This is the backlog of strands waiting to be processed.
# available_workers :: The number of idle worker threads that are waiting to work on strands.
# There should only be an idle worker if the queue size is currently 0.
#
# For the above metrics, we compute the average, median, P75, P85, P95, P99, and maximum
# values.
#
# In addition to the above metrics, there are additional metrics:
#
# lease_acquire_percentage :: Percentage of strands where the lease was successfully acquired.
# For single respirate processes, or normally running respirate
# partitioned processes (1 process per partition), this should be
# 100.0. For multi-process, non-partitioned respirate, this can
# be significantly lower, as multiple processes try to process
# the same strand concurrently, and some fail to acquire the lease.
# old_strand_percentage :: Percentage of strands that were processed that were outside the
# current partition. Should always be 0 if respirate is not partitioned.
def metrics_hash(array, elapsed_time)
respirate_metrics = {}
METRIC_TYPES.each do |metric_type|
metrics = array.map(&metric_type)
metrics.sort!
average = metrics.sum / METRICS_EVERY
median = metrics[METRICS_MEDIAN]
p75 = metrics[METRICS_P75]
p85 = metrics[METRICS_P85]
p95 = metrics[METRICS_P95]
p99 = metrics[METRICS_P99]
max = metrics.last
respirate_metrics[metric_type] = {average:, median:, p75:, p85:, p95:, p99:, max:}
end
respirate_metrics[:lease_acquire_percentage] = array.count(&:lease_acquired) * METRICS_PERCENTAGE
respirate_metrics[:old_strand_percentage] = array.count(&:old_strand) * METRICS_PERCENTAGE
respirate_metrics[:lease_expired_percentage] = array.count(&:lease_expired) * METRICS_PERCENTAGE
respirate_metrics[:strand_count] = METRICS_EVERY
@strands_per_second = METRICS_EVERY / elapsed_time
respirate_metrics[:strands_per_second] = @strands_per_second
# Use the p95 of total delay to set the delay for the current strands.
# Ignore the delay for old strands when calculating delay for current strands.
# Using the maximum/p99 numbers may cause too long delay if there are
# outliers. Using lower numbers (e.g. median/p75) may not accurately reflect the
# current delay numbers if the delay is increasing rapidly.
array.reject!(&:old_strand)
array.map!(&:total_delay)
respirate_metrics[:current_strand_delay] = @current_strand_delay = array[(array.count * 0.95r).floor] || 0
respirate_metrics
end
# The amount of time to sleep if no strands or old strands were picked up
# during the last scan loop.
def sleep_duration
# Set base sleep duration based on on queue size and the number of strands processed
# per second. If you are processing 10 strands per second, and there are 5 strands
# in the queue, it should take about 0.5 seconds to get through the existing strands.
# Multiply by 0.75, since @strands_per_second is only updated occassionally by
# the metrics thread.
sleep_duration = ((@strand_queue.size * 0.75) / @strands_per_second)
# You don't want to query the database too often, especially when the queue is empty
# and respirate is idle. Estimate how idle the respirate process is by looking at
# the percentage of available workers, and use that as a lower bound. If you have
# 6 available workers and 8 total workers, that's close to idle, set a minimum
# sleep time of 0.75 seconds. If you have 2 available workers and 8 total workers,
# that's pretty busy, set a minimum sleep time of 0.25 seconds.
available_workers = @strand_queue.num_waiting
workers = @thread_data.size
sleep_duration = sleep_duration.clamp(available_workers.fdiv(workers), nil)
# Finally, set asbolute minimum sleep time to 0.2 seconds, to not overload the
# database, and set absolute maximum sleep time to 1 second, to not wait too long
# to pick up strands.
sleep_duration.clamp(0.2, 1)
end
# Start a strand/apoptosis thread pair, where the strand thread will
# pull from the given strand queue.
#
# On strand run start:
#
# * The strand thread pushes the strand ubid to the start queue,
# and the apoptosis thread pops it and starts a timed pop on the
# finish queue.
#
# On strand run finish:
#
# * The strand thread pushes to the finish queue to signal to the
# apoptosis thread that it is finished. The strand goes back
# to monitoring the strand queue, and the apoptosis thread goes
# back to monitoring the start queue.
#
# On strand run timeout:
#
# * If the timed pop of the finish queue by the apoptosis thread
# does not complete in time, the apoptosis thread kills the process.
def start_thread_pair(strand_queue)
start_queue = Queue.new
finish_queue = Queue.new
{
start_queue:,
finish_queue:,
apoptosis_thread: Thread.new { apoptosis_thread(start_queue, finish_queue) },
strand_thread: Thread.new { DB.synchronize { strand_thread(strand_queue, start_queue, finish_queue) } }
}
end
# If the process is shutting down, return an empty array. Otherwise
# call the database prepared statement to find the strands to run,
# excluding the strands that are currently running in the thread pool.
# If respirate processes are partitioned, this will only return
# strands for the current partition.
def scan
@shutting_down ? [] : @strand_ps.call(skip_strands:).each(&:scan_picked_up!)
end
# Similar to scan, but return older strands which may not be
# related to the current partition. This is to allow for
# graceful degradation if a partitioned respirate process
# crashes or experiences apoptosis.
def scan_old
unless @shutting_down
@old_strand_ps&.call(skip_strands:, old_strand_delay:)&.each(&:scan_picked_up!)&.each(&:old_strand!)
end || []
end
# The number of seconds to wait before picking up strands from
# outside the current partition. This uses how long it is
# taking to process strands in the current partition, adds 20%
# to that, then adds an additional 5 seconds. The reason for
# 20% is the current strand delay is only recalculated every
# METRICS_EVERY, so this allows some padding in case the delay
# number is growing quickly.
def old_strand_delay
(@current_strand_delay * 1.2) + 5
end
# A pg_array for the strand ids to skip. These are the strand
# ids that have been enqueued or are currently being processed
# by strand threads.
def skip_strands
Sequel.pg_array(@mutex.synchronize { @current_strands.keys })
end
# The number of strands the thread pool is currently running.
def num_current_strands
@mutex.synchronize { @current_strands.size }
end
# The entry point for apoptosis threads. This loops until the
# dispatcher shuts down, monitoring the start queue, and once
# signaled via the start queue, kills the process unless it is
# signaled via the finish queue within the apoptosis timeout.
def apoptosis_thread(start_queue, finish_queue)
timeout = @apoptosis_timeout
until @shutting_down
break unless apoptosis_run(timeout, start_queue, finish_queue)
end
rescue
apoptosis_failure
end
# Performs a single apoptosis run. Waits until signaled by the
# start queue, then does a timed pop of the finish queue, killing
# the process if the pop times out.
def apoptosis_run(timeout, start_queue, finish_queue)
return unless (strand_ubid = start_queue.pop)
Thread.current.name = "apoptosis:#{strand_ubid}"
unless finish_queue.pop(timeout:)
apoptosis_failure
end
true
end
# Handle timeout of a strand thread by killing the process.
def apoptosis_failure
# Timed out, dump threads and exit.
# Don't thread print concurrently.
APOPTOSIS_MUTEX.synchronize do
ThreadPrinter.run
Kernel.exit!
end
end
# The entry point for strand threads. The loops until the
# dispatcher shuts down, monitoring the strand queue, signalling
# the related apoptosis thread when starting, running the strand,
# and then signalling the related apoptosis thread when the
# strand run finishes.
def strand_thread(strand_queue, start_queue, finish_queue)
while (strand = strand_queue.pop) && !@shutting_down
strand.worker_started!
metrics = strand.respirate_metrics
metrics.queue_size = strand_queue.size
metrics.available_workers = strand_queue.num_waiting
run_strand(strand, start_queue, finish_queue)
end
ensure
# Signal related apoptosis thread to shutdown
start_queue.push(nil)
end
# Handle the running of a single strand. Signals the apoptosis
# thread via the start queue when starting, and the finish queue
# when exiting.
def run_strand(strand, start_queue, finish_queue)
strand_ubid = strand.ubid.freeze
Thread.current.name = strand_ubid
start_queue.push(strand_ubid)
strand.run(STRAND_RUNTIME)
rescue => ex
Clog.emit("exception terminates strand run") { Util.exception_to_hash(ex) }
cause = ex
loop do
break unless (cause = cause.cause)
Clog.emit("nested exception") { Util.exception_to_hash(cause) }
end
ex
ensure
# Always signal apoptosis thread that the strand has finished,
# even for non-StandardError exits
finish_queue.push(true)
@mutex.synchronize { @current_strands.delete(strand.id) }
@metrics_queue.push(strand.respirate_metrics)
# If there are any sessions in the thread-local (really fiber-local) ssh
# cache after the strand run, close them eagerly to close the related
# file descriptors, then clear the cache to avoid a memory leak.
if (cache = Thread.current[:clover_ssh_cache]) && !cache.empty?
cache.each_value do
# closing the ssh connection shouldn't raise, but just in case it
# does, we want to ignore it so the strand thread doesn't exit.
it.close
rescue
end
cache.clear
end
end
# Find strands that need to be run, and push each onto the
# strand queue. This can block if the strand queue is full,
# to allow for backoff in the case of a busy thread pool.
def start_cohort(strands = scan)
strand_queue = @strand_queue
current_strands = @current_strands
strands.each do |strand|
break if @shutting_down
@mutex.synchronize { current_strands[strand.id] = true }
strand_queue.push(strand)
rescue ClosedQueueError
end
strands.size == 0
end
end