Files
ubicloud/prog/base.rb
Enes Cakir 1d6efd4c9b Get semaphores from the subject of prog
Right now, we define semaphores in both the model and prog, but they are
generally the same. We can retrieve the list of semaphores from the
subject in prog.

We can still define semaphores in prog if it doesn't have a subject,
like in Prog::Test::HetznerServer.
2024-12-26 12:14:33 +03:00

242 lines
6.1 KiB
Ruby

# frozen_string_literal: true
class Prog::Base
attr_reader :strand, :subject_id
def initialize(strand, snap = nil)
@snap = snap || SemSnap.new(strand.id)
@strand = strand
@subject_id = frame.dig("subject_id") || @strand.id
end
def self.subject_is(*names)
names.each do |name|
class_eval %(
def #{name}
@#{name} ||= #{camelize(name.to_s)}[@subject_id]
end
), __FILE__, __LINE__ - 4
subject_class = Object.const_get(camelize(name.to_s))
if subject_class.respond_to?(:semaphore_names)
semaphore(*subject_class.semaphore_names)
end
end
end
def self.semaphore(*names)
names.map!(&:intern)
names.each do |name|
define_method :"incr_#{name}" do
@snap.incr(name)
end
define_method :"decr_#{name}" do
@snap.decr(name)
end
class_eval %{
def when_#{name}_set?
if @snap.set?(#{name.inspect})
yield
end
end
}, __FILE__, __LINE__ - 6
end
end
def self.labels
@labels || []
end
def self.label(label)
(@labels ||= []) << label
define_method :"hop_#{label}" do
dynamic_hop label
end
end
def nap(seconds = 30)
fail Nap.new(seconds)
end
def pop(*args)
outval = Sequel.pg_jsonb_wrap(
case args
in [String => s]
{"msg" => s}
in [Hash => h]
h
else
fail "BUG: must pop with string or hash"
end
)
if strand.stack.length > 0 && (link = frame["link"])
# This is a multi-level stack with a back-link, i.e. one prog
# calling another in the same Strand of execution. The thing to
# do here is pop the stack entry.
pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, strand.stack.first["deadline_target"])
pg&.incr_resolve
old_prog = strand.prog
old_label = strand.label
prog, label = link
fail Hop.new(old_prog, old_label,
{retval: outval,
stack: Sequel.pg_jsonb_wrap(@strand.stack[1..]),
prog: prog, label: label})
else
fail "BUG: expect no stacks exceeding depth 1 with no back-link" if strand.stack.length > 1
pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, strand.stack.first["deadline_target"])
pg&.incr_resolve
# Child strand with zero or one stack frames, set exitval. Clear
# retval to avoid confusion, as it would have been set in a
# previous intra-strand stack pop.
fail Exit.new(strand, outval)
end
end
class FlowControl < RuntimeError; end
class Exit < FlowControl
attr_reader :exitval
def initialize(strand, exitval)
@strand = strand
@exitval = exitval
end
def to_s
"Strand exits from #{@strand.prog}##{@strand.label} with #{@exitval}"
end
end
class Hop < FlowControl
attr_reader :strand_update_args, :old_prog
def initialize(old_prog, old_label, strand_update_args)
@old_prog = old_prog
@old_label = old_label
@strand_update_args = strand_update_args
end
def new_label
@strand_update_args[:label] || @old_label
end
def new_prog
@strand_update_args[:prog] || @old_prog
end
def to_s
"hop #{@old_prog}##{@old_label} -> #{new_prog}##{new_label}"
end
end
class Nap < FlowControl
attr_reader :seconds
def initialize(seconds)
@seconds = seconds
end
def to_s
"nap for #{seconds} seconds"
end
end
def frame
@frame ||= strand.stack.first.dup.freeze
end
def retval
strand.retval
end
def push(prog, new_frame = {}, label = "start")
old_prog = strand.prog
old_label = strand.label
new_frame = {"subject_id" => @subject_id, "link" => [strand.prog, old_label]}.merge(new_frame)
fail Hop.new(old_prog, old_label,
{prog: Strand.prog_verify(prog), label: label,
stack: [new_frame] + strand.stack, retval: nil})
end
def bud(prog, new_frame = {}, label = "start")
new_frame = {"subject_id" => @subject_id}.merge(new_frame)
strand.add_child(
id: Strand.generate_uuid,
prog: Strand.prog_verify(prog),
label: label,
stack: Sequel.pg_jsonb_wrap([new_frame])
)
end
def donate
strand.children.map(&:run)
nap 1
end
def reap
reapable = strand.children_dataset.where(
Sequel.lit("(lease IS NULL OR lease < now()) AND exitval IS NOT NULL")
).all
reaped_ids = reapable.map do |child|
# Clear any semaphores that get added to a exited Strand prog,
# since incr is entitled to be run at *any time* (including
# after exitval is set, though it doesn't do anything) and any
# such incements will prevent deletion of a Strand via
# foreign_key
Semaphore.where(strand_id: child.id).destroy
child.destroy
child.id
end.freeze
strand.children.delete_if { reaped_ids.include?(_1.id) }
reapable
end
def leaf?
strand.children.empty?
end
# A hop is a kind of jump, as in, like a jump instruction.
private def dynamic_hop(label)
fail "BUG: #hop only accepts a symbol" unless label.is_a? Symbol
fail "BUG: not valid hop target" unless self.class.labels.include? label
label = label.to_s
fail Hop.new(@strand.prog, @strand.label, {label: label, retval: nil})
end
def register_deadline(deadline_target, deadline_in, allow_extension: false)
current_frame = strand.stack.first
if (deadline_at = current_frame["deadline_at"]).nil? ||
(old_deadline_target = current_frame["deadline_target"]) != deadline_target ||
allow_extension ||
Time.parse(deadline_at.to_s) > Time.now + deadline_in
if old_deadline_target != deadline_target && (pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, old_deadline_target))
pg.incr_resolve
end
current_frame["deadline_target"] = deadline_target
current_frame["deadline_at"] = Time.now + deadline_in
strand.modified!(:stack)
end
end
# Copied from sequel/model/inflections.rb's camelize, to convert
# table names into idiomatic model class names.
private_class_method def self.camelize(s)
s.gsub(/\/(.?)/) { |x| "::#{x[-1..].upcase}" }.gsub(/(^|_)(.)/) { |x| x[-1..].upcase }
end
end