Files
ubicloud/model/strand.rb
Jeremy Evans 01a4956abd Avoid **hash.merge(other_hash) constructions
While this works, it's less efficient than alternatives.  If the method
accepts an options hash, the `**` can be dropped, avoiding a hash
allocation. If the method accepts keywords, `**hash, **other_hash` can
be used.

However, in the cases where we are using these constructions, one of
the hashes is a literal hash, so these constructions result in
2-3 hash allocations:

* 1 for the literal hash
* 1 for the Hash#merge
* 1 if the keyword splat is converted to a positional hash

using `**hash, kw: value` reduces this to a single hash allocation,
in addition to being simpler.

For the create_cache_entry change in the specs, use an anonymous keyword
splat parameter.  This is done for simplicity, it doesn't save
allocations in this case (as all callers are providing literal keywords).
2025-05-02 00:56:06 +09:00

228 lines
7.5 KiB
Ruby

# frozen_string_literal: true
require_relative "../model"
require "time"
class Strand < Sequel::Model
Strand.plugin :defaults_setter, cache: true
Strand.default_values[:stack] = proc { [{}] }
LEASE_EXPIRATION = 120
many_to_one :parent, key: :parent_id, class: self
one_to_many :children, key: :parent_id, class: self
one_to_many :semaphores
include ResourceMethods
def subject
UBID.decode(ubid)
end
def take_lease_and_reload
unless (ps = DB.prepared_statement(:strand_take_lease_and_reload))
# :nocov:
ps_sch = if Config.development?
Sequel.function(:least, 5, :try)
# :nocov:
else
Sequel.function(:least, Sequel[2]**Sequel.function(:least, :try, 20), 600) * Sequel.function(:random)
end
ps = DB[:strand]
.returning
.where(
Sequel[id: DB[:strand].select(:id).where(id: :$id).for_update.skip_locked, exitval: nil] &
(Sequel[:lease] < Sequel::CURRENT_TIMESTAMP)
)
.prepare(:update, :strand_take_lease_and_reload,
lease: Sequel::CURRENT_TIMESTAMP + Sequel.cast("120 seconds", :interval),
try: Sequel[:try] + 1,
schedule: Sequel::CURRENT_TIMESTAMP + (ps_sch * Sequel.cast("1 second", :interval)))
end
affected = ps.call(id:).first
return false unless affected
lease_time = affected.fetch(:lease)
verbose_logging = rand(1000) == 0
Clog.emit("obtained lease") { {lease_acquired: {time: lease_time, delay: Time.now - schedule}} } if verbose_logging
# Also operate as reload query
@values = affected
begin
yield
ensure
if @deleted
unless DB["SELECT FROM strand WHERE id = ?", id].empty?
fail "BUG: strand with @deleted set still exists in the database"
end
else
DB.transaction do
lease_clear_debug_snapshot = this.for_update.all
num_updated = DB[<<SQL, id, lease_time].update
UPDATE strand
SET lease = now() - '1000 years'::interval
WHERE id = ? AND lease = ?
SQL
Clog.emit("lease cleared") { {lease_cleared: {num_updated: num_updated}} } if verbose_logging
unless num_updated == 1
Clog.emit("lease violated data") do
{lease_clear_debug_snapshot: lease_clear_debug_snapshot}
end
fail "BUG: lease violated"
end
end
end
end
end
def self.prog_verify(prog)
case prog.name
when /\AProg::(.*)\z/
$1
else
fail "BUG: prog must be in Prog module"
end
end
def load(snap = nil)
Object.const_get("::Prog::" + prog).new(self, snap)
end
def unsynchronized_run
start_time = Time.now
prog_label = "#{prog}.#{label}"
if label == stack.first["deadline_target"]
if (pg = Page.from_tag_parts("Deadline", id, prog, stack.first["deadline_target"]))
pg.incr_resolve
end
stack.first.delete("deadline_target")
stack.first.delete("deadline_at")
modified!(:stack)
end
effective_prog = prog
stack.each do |frame|
if (deadline_at = frame["deadline_at"])
if Time.now > Time.parse(deadline_at.to_s)
sbj = subject
extra_data = case sbj
when Vm
{vm_host: sbj.vm_host&.ubid, data_center: sbj.vm_host&.data_center, boot_image: sbj.boot_image, location: sbj.location.display_name, arch: sbj.arch, vcpus: sbj.vcpus, ipv4: sbj.ephemeral_net4.to_s}
when VmHost
{data_center: sbj.data_center, location: sbj.location.display_name, arch: sbj.arch, ipv4: sbj.sshable.host, total_cores: sbj.total_cores, allocation_state: sbj.allocation_state, os_version: sbj.os_version, vm_count: sbj.vms_dataset.count}
when GithubRunner
{label: sbj.label, installation: sbj.installation.ubid, vm: sbj.vm&.ubid, vm_host: sbj.vm&.vm_host&.ubid, data_center: sbj.vm&.vm_host&.data_center}
else
{}
end
extra_data.compact!
Prog::PageNexus.assemble("#{ubid} has an expired deadline! #{effective_prog}.#{label} did not reach #{frame["deadline_target"]} on time", ["Deadline", id, effective_prog, frame["deadline_target"]], ubid, extra_data:)
modified!(:stack)
end
end
if (link = frame["link"])
effective_prog = link[0]
end
end
unless stack.first["last_label_changed_at"]
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
end
DB.transaction do
SemSnap.use(id) do |snap|
prg = load(snap)
prg.public_send(:before_run) if prg.respond_to?(:before_run)
prg.public_send(label)
end
rescue Prog::Base::Nap => e
save_changes
scheduled = DB[<<SQL, e.seconds, id].get
UPDATE strand
SET try = 0, schedule = now() + (? * '1 second'::interval)
WHERE id = ?
RETURNING schedule
SQL
# For convenience, reflect the updated record's schedule content
# in the model object, but since it's fresh, remove it from the
# changed columns so save_changes won't update it again.
self.schedule = scheduled
changed_columns.delete(:schedule)
e
rescue Prog::Base::Hop => hp
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("hopped") { {strand_hopped: {duration: Time.now - last_changed_at, from: prog_label, to: "#{hp.new_prog}.#{hp.new_label}"}} }
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
update(**hp.strand_update_args, try: 0)
hp
rescue Prog::Base::Exit => ext
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("exited") { {strand_exited: {duration: Time.now - last_changed_at, from: prog_label}} }
update(exitval: ext.exitval, retval: nil)
if parent_id.nil?
# No parent Strand to reap here, so self-reap.
Semaphore.where(strand_id: id).destroy
destroy
@deleted = true
end
ext
else
fail "BUG: Prog #{prog}##{label} did not provide flow control"
end
ensure
duration = Time.now - start_time
Clog.emit("finished strand") { [self, {strand_finished: {duration:, prog_label:}}] } if duration > 1
end
def run(seconds = 0)
fail "already deleted" if @deleted
deadline = Time.now + seconds
take_lease_and_reload do
loop do
ret = unsynchronized_run
now = Time.now
if now > deadline ||
(ret.is_a?(Prog::Base::Nap) && ret.seconds != 0) ||
ret.is_a?(Prog::Base::Exit)
return ret
end
end
end
end
end
# We need to unrestrict primary key so strand.add_child works in Prog::Base.
Strand.unrestrict_primary_key
# Table: strand
# Columns:
# id | uuid | PRIMARY KEY
# parent_id | uuid |
# schedule | timestamp with time zone | NOT NULL DEFAULT now()
# lease | timestamp with time zone | NOT NULL DEFAULT (now() - '1000 years'::interval)
# prog | text | NOT NULL
# label | text | NOT NULL
# stack | jsonb | NOT NULL DEFAULT '[{}]'::jsonb
# exitval | jsonb |
# retval | jsonb |
# try | integer | NOT NULL DEFAULT 0
# Indexes:
# strand_pkey | PRIMARY KEY btree (id)
# Foreign key constraints:
# strand_parent_id_fkey | (parent_id) REFERENCES strand(id)
# Referenced By:
# semaphore | semaphore_strand_id_fkey | (strand_id) REFERENCES strand(id)
# strand | strand_parent_id_fkey | (parent_id) REFERENCES strand(id)