Files
ubicloud/model/strand.rb
Jeremy Evans d1f8e9b332 Use FOR NO KEY UPDATE instead of FOR UPDATE
FOR UPDATE takes stronger locks, and is only needed if you plan on
deleting the row or modifying one of key columns.  If you use FOR
UPDATE, you block other transactions insert rows that reference a
row using a FOR UPDATE lock.  From looking at these cases, I don't
think that is the case, so using FOR NO KEY UPDATE should allow
the assurances we need without unnecessarily blocking other
concurrent transactions.
2025-07-04 09:26:06 +09:00

292 lines
9.3 KiB
Ruby

# frozen_string_literal: true
require_relative "../model"
require "time"
class Strand < Sequel::Model
# We need to unrestrict primary key so strand.add_child works in Prog::Base.
unrestrict_primary_key
Strand.plugin :defaults_setter, cache: true
Strand.default_values[:stack] = proc { [{}] }
LEASE_EXPIRATION = 120
many_to_one :parent, key: :parent_id, class: self
one_to_many :children, key: :parent_id, class: self
one_to_many :semaphores
plugin ResourceMethods
def subject
return @subject if defined?(@subject) && @subject != :reload
@subject = UBID.decode(ubid)
end
RespirateMetrics = Struct.new(:scheduled, :scan_picked_up, :worker_started, :lease_checked, :lease_acquired, :queue_size, :available_workers, :old_strand, :lease_expired) do
def scan_delay
scan_picked_up - scheduled
end
def queue_delay
worker_started - scan_picked_up
end
def lease_delay
lease_checked - worker_started
end
def total_delay
lease_checked - scheduled
end
end
# If the lease time is after this, we must be dealing with an
# expired lease, since normal lease times are either in the future
# or 1000 years in the past.
EXPIRED_LEASE_TIME = Time.utc(2025)
def respirate_metrics
lease_expired = lease > EXPIRED_LEASE_TIME
@respirate_metrics ||= RespirateMetrics.new(scheduled: lease_expired ? lease : schedule, lease_expired:)
end
def scan_picked_up!
respirate_metrics.scan_picked_up = Time.now
end
def worker_started!
respirate_metrics.worker_started = Time.now
end
def lease_checked!(affected)
respirate_metrics.lease_checked = Time.now
respirate_metrics.lease_acquired = true if affected
end
def old_strand!
respirate_metrics.old_strand = true
end
# :nocov:
ps_sch = if Config.development?
Sequel.function(:least, 5, :try)
# :nocov:
else
Sequel.function(:least, Sequel[2]**Sequel.function(:least, :try, 20), 600) * Sequel.function(:random)
end
TAKE_LEASE_PS = DB[:strand]
.returning
.where(
Sequel[id: DB[:strand].select(:id).where(id: :$id).for_no_key_update.skip_locked, exitval: nil] &
(Sequel[:lease] < Sequel::CURRENT_TIMESTAMP)
)
.prepare_sql_type(:update)
.prepare(:first, :strand_take_lease_and_reload,
lease: Sequel::CURRENT_TIMESTAMP + Sequel.cast("120 seconds", :interval),
try: Sequel[:try] + 1,
schedule: Sequel::CURRENT_TIMESTAMP + (ps_sch * Sequel.cast("1 second", :interval)))
RELEASE_LEASE_PS = DB[<<SQL, :$id, :$lease_time].prepare(:update, :strand_release_lease)
UPDATE strand SET lease = now() - '1000 years'::interval WHERE id = ? AND lease = ?
SQL
def take_lease_and_reload
affected = TAKE_LEASE_PS.call(id:)
lease_checked!(affected)
return false unless affected
lease_time = affected.fetch(:lease)
# Also operate as reload query
_refresh_set_values(affected)
_clear_changed_columns(:refresh)
@subject = :reload
begin
yield
ensure
if @deleted
if exists?
fail "BUG: strand with @deleted set still exists in the database"
end
else
begin
unless RELEASE_LEASE_PS.call(id:, lease_time:) == 1
Clog.emit("lease violated data") { {lease_clear_debug_snapshot: this.all} }
fail "BUG: lease violated"
end
ensure
if @exited
active_siblings_ds = Strand.from { strand.as(:siblings) }
.where(parent_id: Sequel[:strand][:id])
.where(Sequel.lit("lease < now() AND exitval IS NOT NULL"))
.select(1)
# If exited child has no active siblings, schedule parent immediately,
# so all exited children can be reaped.
#
# To avoid race conditions, we do this after the lease for the child
# has been released. It's possible that multiple children could be
# calling this update concurrently, but that is fine. We must avoid
# the case where this is not called by the last exiting child, as
# that otherwise can result in up to 120s delay in parent strand
# execution.
Strand
.where(id: parent_id)
.exclude(active_siblings_ds.exists)
.update(schedule: Sequel::CURRENT_TIMESTAMP)
end
end
end
end
end
def self.prog_verify(prog)
case prog.name
when /\AProg::(.*)\z/
$1
else
fail "BUG: prog must be in Prog module"
end
end
def load(snap = nil)
Object.const_get("::Prog::" + prog).new(self, snap)
end
def unsynchronized_run
start_time = Time.now
prog_label = "#{prog}.#{label}"
top_frame = stack.first
if label == top_frame["deadline_target"]
Page.from_tag_parts("Deadline", id, prog, top_frame["deadline_target"])&.incr_resolve
top_frame.delete("deadline_target")
top_frame.delete("deadline_at")
modified!(:stack)
end
effective_prog = prog
stack.each do |frame|
if (deadline_at = frame["deadline_at"])
if Time.now > Time.parse(deadline_at.to_s)
sbj = subject
extra_data = case sbj
when Vm
{vm_host: sbj.vm_host&.ubid, data_center: sbj.vm_host&.data_center, boot_image: sbj.boot_image, location: sbj.location.display_name, arch: sbj.arch, vcpus: sbj.vcpus, ipv4: sbj.ephemeral_net4.to_s}
when VmHost
{data_center: sbj.data_center, location: sbj.location.display_name, arch: sbj.arch, ipv4: sbj.sshable.host, total_cores: sbj.total_cores, allocation_state: sbj.allocation_state, os_version: sbj.os_version, vm_count: sbj.vms_dataset.count}
when GithubRunner
{label: sbj.label, installation: sbj.installation.ubid, vm: sbj.vm&.ubid, vm_host: sbj.vm&.vm_host&.ubid, data_center: sbj.vm&.vm_host&.data_center}
else
{}
end
extra_data.compact!
Prog::PageNexus.assemble("#{ubid} has an expired deadline! #{effective_prog}.#{label} did not reach #{frame["deadline_target"]} on time", ["Deadline", id, effective_prog, frame["deadline_target"]], ubid, extra_data:)
modified!(:stack)
end
end
if (link = frame["link"])
effective_prog = link[0]
end
end
unless top_frame["last_label_changed_at"]
top_frame["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
end
DB.transaction do
SemSnap.use(id) do |snap|
prg = load(snap)
prg.public_send(:before_run) if prg.respond_to?(:before_run)
prg.public_send(label)
end
rescue Prog::Base::Nap => e
save_changes
scheduled = DB[<<SQL, e.seconds, id].get
UPDATE strand
SET try = 0, schedule = now() + (? * '1 second'::interval)
WHERE id = ?
RETURNING schedule
SQL
# For convenience, reflect the updated record's schedule content
# in the model object, but since it's fresh, remove it from the
# changed columns so save_changes won't update it again.
self.schedule = scheduled
changed_columns.delete(:schedule)
e
rescue Prog::Base::Hop => hp
last_changed_at = Time.parse(top_frame["last_label_changed_at"])
Clog.emit("hopped") { {strand_hopped: {duration: Time.now - last_changed_at, from: prog_label, to: "#{hp.new_prog}.#{hp.new_label}"}} }
top_frame["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
update(**hp.strand_update_args, try: 0)
hp
rescue Prog::Base::Exit => ext
last_changed_at = Time.parse(top_frame["last_label_changed_at"])
Clog.emit("exited") { {strand_exited: {duration: Time.now - last_changed_at, from: prog_label}} }
update(exitval: ext.exitval, retval: nil)
if parent_id
@exited = true
else
# No parent Strand to reap here, so self-reap.
semaphores_dataset.destroy
destroy
@deleted = true
end
ext
else
fail "BUG: Prog #{prog}##{label} did not provide flow control"
end
ensure
duration = Time.now - start_time
Clog.emit("finished strand") { [self, {strand_finished: {duration:, prog_label:}}] } if duration > 1
end
def run(seconds = 0)
fail "already deleted" if @deleted
deadline = Time.now + seconds
take_lease_and_reload do
loop do
ret = unsynchronized_run
now = Time.now
if now > deadline ||
(ret.is_a?(Prog::Base::Nap) && ret.seconds != 0) ||
ret.is_a?(Prog::Base::Exit)
return ret
end
end
end
end
end
# Table: strand
# Columns:
# id | uuid | PRIMARY KEY
# parent_id | uuid |
# schedule | timestamp with time zone | NOT NULL DEFAULT now()
# lease | timestamp with time zone | NOT NULL DEFAULT (now() - '1000 years'::interval)
# prog | text | NOT NULL
# label | text | NOT NULL
# stack | jsonb | NOT NULL DEFAULT '[{}]'::jsonb
# exitval | jsonb |
# retval | jsonb |
# try | integer | NOT NULL DEFAULT 0
# Indexes:
# strand_pkey | PRIMARY KEY btree (id)
# Foreign key constraints:
# strand_parent_id_fkey | (parent_id) REFERENCES strand(id)
# Referenced By:
# semaphore | semaphore_strand_id_fkey | (strand_id) REFERENCES strand(id)
# strand | strand_parent_id_fkey | (parent_id) REFERENCES strand(id)