Files
ubicloud/model/strand.rb
Jeremy Evans 6f70a6f765 Clear cached instance state in Strand#take_lease_and_reload
This avoids the use of instance_variable_delete, as that doesn't
play well with object shapes.  Internal code should always call
the subject method, and not access @subject directly.
2025-06-21 02:54:51 +09:00

268 lines
8.2 KiB
Ruby

# frozen_string_literal: true
require_relative "../model"
require "time"
class Strand < Sequel::Model
# We need to unrestrict primary key so strand.add_child works in Prog::Base.
unrestrict_primary_key
Strand.plugin :defaults_setter, cache: true
Strand.default_values[:stack] = proc { [{}] }
LEASE_EXPIRATION = 120
many_to_one :parent, key: :parent_id, class: self
one_to_many :children, key: :parent_id, class: self
one_to_many :semaphores
plugin ResourceMethods
def subject
return @subject if defined?(@subject) && @subject != :reload
@subject = UBID.decode(ubid)
end
RespirateMetrics = Struct.new(:scheduled, :scan_picked_up, :worker_started, :lease_checked, :lease_acquired, :queue_size, :available_workers, :old_strand) do
def scan_delay
scan_picked_up - scheduled
end
def queue_delay
worker_started - scan_picked_up
end
def lease_delay
lease_checked - worker_started
end
def total_delay
lease_checked - scheduled
end
end
def respirate_metrics
@respirate_metrics ||= RespirateMetrics.new(scheduled: schedule)
end
def scan_picked_up!
respirate_metrics.scan_picked_up = Time.now
end
def worker_started!
respirate_metrics.worker_started = Time.now
end
def lease_checked!(affected)
respirate_metrics.lease_checked = Time.now
respirate_metrics.lease_acquired = true if affected
end
def old_strand!
respirate_metrics.old_strand = true
end
def take_lease_and_reload
unless (ps = DB.prepared_statement(:strand_take_lease_and_reload))
# :nocov:
ps_sch = if Config.development?
Sequel.function(:least, 5, :try)
# :nocov:
else
Sequel.function(:least, Sequel[2]**Sequel.function(:least, :try, 20), 600) * Sequel.function(:random)
end
ps = DB[:strand]
.returning
.where(
Sequel[id: DB[:strand].select(:id).where(id: :$id).for_update.skip_locked, exitval: nil] &
(Sequel[:lease] < Sequel::CURRENT_TIMESTAMP)
)
.prepare_sql_type(:update)
.prepare(:first, :strand_take_lease_and_reload,
lease: Sequel::CURRENT_TIMESTAMP + Sequel.cast("120 seconds", :interval),
try: Sequel[:try] + 1,
schedule: Sequel::CURRENT_TIMESTAMP + (ps_sch * Sequel.cast("1 second", :interval)))
end
affected = ps.call(id:)
lease_checked!(affected)
return false unless affected
lease_time = affected.fetch(:lease)
# Also operate as reload query
_refresh_set_values(affected)
_clear_changed_columns(:refresh)
@subject = :reload
begin
yield
ensure
if @deleted
if exists?
fail "BUG: strand with @deleted set still exists in the database"
end
else
DB.transaction do
lease_clear_debug_snapshot = this.for_update.all
num_updated = DB[<<SQL, id, lease_time].update
UPDATE strand
SET lease = now() - '1000 years'::interval
WHERE id = ? AND lease = ?
SQL
unless num_updated == 1
Clog.emit("lease violated data") do
{lease_clear_debug_snapshot: lease_clear_debug_snapshot}
end
fail "BUG: lease violated"
end
end
end
end
end
def self.prog_verify(prog)
case prog.name
when /\AProg::(.*)\z/
$1
else
fail "BUG: prog must be in Prog module"
end
end
def load(snap = nil)
Object.const_get("::Prog::" + prog).new(self, snap)
end
def unsynchronized_run
start_time = Time.now
prog_label = "#{prog}.#{label}"
top_frame = stack.first
if label == top_frame["deadline_target"]
Page.from_tag_parts("Deadline", id, prog, top_frame["deadline_target"])&.incr_resolve
top_frame.delete("deadline_target")
top_frame.delete("deadline_at")
modified!(:stack)
end
effective_prog = prog
stack.each do |frame|
if (deadline_at = frame["deadline_at"])
if Time.now > Time.parse(deadline_at.to_s)
sbj = subject
extra_data = case sbj
when Vm
{vm_host: sbj.vm_host&.ubid, data_center: sbj.vm_host&.data_center, boot_image: sbj.boot_image, location: sbj.location.display_name, arch: sbj.arch, vcpus: sbj.vcpus, ipv4: sbj.ephemeral_net4.to_s}
when VmHost
{data_center: sbj.data_center, location: sbj.location.display_name, arch: sbj.arch, ipv4: sbj.sshable.host, total_cores: sbj.total_cores, allocation_state: sbj.allocation_state, os_version: sbj.os_version, vm_count: sbj.vms_dataset.count}
when GithubRunner
{label: sbj.label, installation: sbj.installation.ubid, vm: sbj.vm&.ubid, vm_host: sbj.vm&.vm_host&.ubid, data_center: sbj.vm&.vm_host&.data_center}
else
{}
end
extra_data.compact!
Prog::PageNexus.assemble("#{ubid} has an expired deadline! #{effective_prog}.#{label} did not reach #{frame["deadline_target"]} on time", ["Deadline", id, effective_prog, frame["deadline_target"]], ubid, extra_data:)
modified!(:stack)
end
end
if (link = frame["link"])
effective_prog = link[0]
end
end
unless top_frame["last_label_changed_at"]
top_frame["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
end
DB.transaction do
SemSnap.use(id) do |snap|
prg = load(snap)
prg.public_send(:before_run) if prg.respond_to?(:before_run)
prg.public_send(label)
end
rescue Prog::Base::Nap => e
save_changes
scheduled = DB[<<SQL, e.seconds, id].get
UPDATE strand
SET try = 0, schedule = now() + (? * '1 second'::interval)
WHERE id = ?
RETURNING schedule
SQL
# For convenience, reflect the updated record's schedule content
# in the model object, but since it's fresh, remove it from the
# changed columns so save_changes won't update it again.
self.schedule = scheduled
changed_columns.delete(:schedule)
e
rescue Prog::Base::Hop => hp
last_changed_at = Time.parse(top_frame["last_label_changed_at"])
Clog.emit("hopped") { {strand_hopped: {duration: Time.now - last_changed_at, from: prog_label, to: "#{hp.new_prog}.#{hp.new_label}"}} }
top_frame["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
update(**hp.strand_update_args, try: 0)
hp
rescue Prog::Base::Exit => ext
last_changed_at = Time.parse(top_frame["last_label_changed_at"])
Clog.emit("exited") { {strand_exited: {duration: Time.now - last_changed_at, from: prog_label}} }
update(exitval: ext.exitval, retval: nil)
if parent_id.nil?
# No parent Strand to reap here, so self-reap.
Semaphore.where(strand_id: id).destroy
destroy
@deleted = true
end
ext
else
fail "BUG: Prog #{prog}##{label} did not provide flow control"
end
ensure
duration = Time.now - start_time
Clog.emit("finished strand") { [self, {strand_finished: {duration:, prog_label:}}] } if duration > 1
end
def run(seconds = 0)
fail "already deleted" if @deleted
deadline = Time.now + seconds
take_lease_and_reload do
loop do
ret = unsynchronized_run
now = Time.now
if now > deadline ||
(ret.is_a?(Prog::Base::Nap) && ret.seconds != 0) ||
ret.is_a?(Prog::Base::Exit)
return ret
end
end
end
end
end
# Table: strand
# Columns:
# id | uuid | PRIMARY KEY
# parent_id | uuid |
# schedule | timestamp with time zone | NOT NULL DEFAULT now()
# lease | timestamp with time zone | NOT NULL DEFAULT (now() - '1000 years'::interval)
# prog | text | NOT NULL
# label | text | NOT NULL
# stack | jsonb | NOT NULL DEFAULT '[{}]'::jsonb
# exitval | jsonb |
# retval | jsonb |
# try | integer | NOT NULL DEFAULT 0
# Indexes:
# strand_pkey | PRIMARY KEY btree (id)
# Foreign key constraints:
# strand_parent_id_fkey | (parent_id) REFERENCES strand(id)
# Referenced By:
# semaphore | semaphore_strand_id_fkey | (strand_id) REFERENCES strand(id)
# strand | strand_parent_id_fkey | (parent_id) REFERENCES strand(id)