Reviewing leaf usage in progs, it always occurs right after reap. Combining leaf and reap methods avoids a redundant query for the strand's children. It's typical for nap or donate to be called after the leaf check after reap. Also build this into reap, by calling donate by default, or nap if a nap keyword argument is given. There are a few cases where reap was called without leaf/donate. Add a fallthrough keyword argument to support this, so if there are no children, it does not call either nap or donate Vm::HostNexus#wait_prep and Kubernetes::UpgradeKubernetesNode#wait_new_node both need the return value of the reapable child(ren). Add a reaper keyword argument for this, which is called once for each child. The most common pattern for using reap/leaf/donate was: ```ruby reap hop_download_lb_cert if leaf? donate ``` This turns into: ```ruby reap(:download_lb_cert) ``` The second most common pattern was: ```ruby reap donate unless leaf? pop "upgrade cancelled" # or other code ``` This turns into: ```ruby reap { pop "upgrade cancelled" } ``` In a few places, I changed operations on strand.children to strand.children_dataset. Now that we are no longer using cached children by default, it's better to do these checks in the database intead of in Ruby. These places deserve careful review: * Prog::Minio::MinioServerNexus#unavailable * Prog::Postgres::PostgresResourceNexus#wait * Prog::Postgres::PostgresServerNexus#unavailable For Prog::Vnet::LoadBalancerNexus#wait_update_vm_load_balancers, I removed a check on the children completely. It was checking for an exitval using children_dataset directly after reap, which should only be true if there was still an active lease for the child. This also deserves careful review. This broke many mocked tests. This fixes the mocked tests to use database-backed objects, ensuring that we are testing observable behavior and not implementation details.
271 lines
7.0 KiB
Ruby
271 lines
7.0 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class Prog::Base
|
|
attr_reader :strand, :subject_id
|
|
|
|
def initialize(strand, snap = nil)
|
|
@snap = snap || SemSnap.new(strand.id)
|
|
@strand = strand
|
|
@subject_id = frame.dig("subject_id") || @strand.id
|
|
end
|
|
|
|
def self.subject_is(*names)
|
|
names.each do |name|
|
|
class_eval %(
|
|
def #{name}
|
|
@#{name} ||= #{camelize(name.to_s)}[@subject_id]
|
|
end
|
|
), __FILE__, __LINE__ - 4
|
|
subject_class = Object.const_get(camelize(name.to_s))
|
|
if subject_class.respond_to?(:semaphore_names)
|
|
semaphore(*subject_class.semaphore_names)
|
|
end
|
|
end
|
|
end
|
|
|
|
def self.semaphore(*names)
|
|
names.map!(&:intern)
|
|
names.each do |name|
|
|
define_method :"incr_#{name}" do
|
|
@snap.incr(name)
|
|
end
|
|
|
|
define_method :"decr_#{name}" do
|
|
@snap.decr(name)
|
|
end
|
|
|
|
class_eval %{
|
|
def when_#{name}_set?
|
|
if @snap.set?(#{name.inspect})
|
|
yield
|
|
end
|
|
end
|
|
}, __FILE__, __LINE__ - 6
|
|
end
|
|
end
|
|
|
|
def self.labels
|
|
@labels || []
|
|
end
|
|
|
|
def self.label(label)
|
|
(@labels ||= []) << label
|
|
|
|
define_method :"hop_#{label}" do
|
|
dynamic_hop label
|
|
end
|
|
end
|
|
|
|
def nap(seconds = 30)
|
|
fail Nap.new(seconds)
|
|
end
|
|
|
|
def pop(arg)
|
|
outval = Sequel.pg_jsonb_wrap(
|
|
case arg
|
|
when String
|
|
{"msg" => arg}
|
|
when Hash
|
|
arg
|
|
else
|
|
fail "BUG: must pop with string or hash"
|
|
end
|
|
)
|
|
|
|
if strand.stack.length > 0 && (link = frame["link"])
|
|
# This is a multi-level stack with a back-link, i.e. one prog
|
|
# calling another in the same Strand of execution. The thing to
|
|
# do here is pop the stack entry.
|
|
pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, strand.stack.first["deadline_target"])
|
|
pg&.incr_resolve
|
|
|
|
old_prog = strand.prog
|
|
old_label = strand.label
|
|
prog, label = link
|
|
|
|
fail Hop.new(old_prog, old_label,
|
|
{retval: outval,
|
|
stack: Sequel.pg_jsonb_wrap(@strand.stack[1..]),
|
|
prog: prog, label: label})
|
|
else
|
|
fail "BUG: expect no stacks exceeding depth 1 with no back-link" if strand.stack.length > 1
|
|
|
|
pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, strand.stack.first["deadline_target"])
|
|
pg&.incr_resolve
|
|
|
|
# Child strand with zero or one stack frames, set exitval. Clear
|
|
# retval to avoid confusion, as it would have been set in a
|
|
# previous intra-strand stack pop.
|
|
fail Exit.new(strand, outval)
|
|
end
|
|
end
|
|
|
|
class FlowControl < RuntimeError; end
|
|
|
|
class Exit < FlowControl
|
|
attr_reader :exitval
|
|
|
|
def initialize(strand, exitval)
|
|
@strand = strand
|
|
@exitval = exitval
|
|
set_backtrace []
|
|
end
|
|
|
|
def to_s
|
|
"Strand exits from #{@strand.prog}##{@strand.label} with #{@exitval}"
|
|
end
|
|
end
|
|
|
|
class Hop < FlowControl
|
|
attr_reader :strand_update_args, :old_prog
|
|
|
|
def initialize(old_prog, old_label, strand_update_args)
|
|
@old_prog = old_prog
|
|
@old_label = old_label
|
|
@strand_update_args = strand_update_args
|
|
set_backtrace []
|
|
end
|
|
|
|
def new_label
|
|
@strand_update_args[:label] || @old_label
|
|
end
|
|
|
|
def new_prog
|
|
@strand_update_args[:prog] || @old_prog
|
|
end
|
|
|
|
def to_s
|
|
"hop #{@old_prog}##{@old_label} -> #{new_prog}##{new_label}"
|
|
end
|
|
end
|
|
|
|
class Nap < FlowControl
|
|
attr_reader :seconds
|
|
|
|
def initialize(seconds)
|
|
@seconds = seconds
|
|
set_backtrace []
|
|
end
|
|
|
|
def to_s
|
|
"nap for #{seconds} seconds"
|
|
end
|
|
end
|
|
|
|
def frame
|
|
@frame ||= strand.stack.first.dup.freeze
|
|
end
|
|
|
|
def retval
|
|
strand.retval
|
|
end
|
|
|
|
def push(prog, new_frame = {}, label = "start")
|
|
old_prog = strand.prog
|
|
old_label = strand.label
|
|
new_frame = {"subject_id" => @subject_id, "link" => [strand.prog, old_label]}.merge(new_frame)
|
|
|
|
fail Hop.new(old_prog, old_label,
|
|
{prog: Strand.prog_verify(prog), label: label,
|
|
stack: [new_frame] + strand.stack, retval: nil})
|
|
end
|
|
|
|
def bud(prog, new_frame = {}, label = "start")
|
|
new_frame = {"subject_id" => @subject_id}.merge(new_frame)
|
|
strand.add_child(
|
|
id: Strand.generate_uuid,
|
|
prog: Strand.prog_verify(prog),
|
|
label: label,
|
|
stack: Sequel.pg_jsonb_wrap([new_frame])
|
|
)
|
|
end
|
|
|
|
def donate
|
|
strand.children_dataset.each(&:run)
|
|
nap 1
|
|
end
|
|
|
|
# Process child strands
|
|
#
|
|
# Reapable children (child strands that have exited) are destroyed.
|
|
# If a reaper argument is given, it is called with each child after
|
|
# the child is destroyed.
|
|
#
|
|
# If there are no reapable children:
|
|
#
|
|
# * If hop is given: hops to the target
|
|
# * If block is given: yields to block
|
|
#
|
|
# If there are still active children:
|
|
#
|
|
# * If fallthrough is given: returns nil
|
|
# * If nap is given: naps for given time
|
|
# * Otherwise, donates to run a child process
|
|
def reap(hop = nil, reaper: nil, nap: nil, fallthrough: false)
|
|
children = strand
|
|
.children_dataset
|
|
.select_append(Sequel.lit("lease < now() AND exitval IS NOT NULL").as(:reapable))
|
|
.all
|
|
|
|
reapable_children, active_children = children.partition { it.values.delete(:reapable) }
|
|
|
|
reapable_children.each do |child|
|
|
# Clear any semaphores that get added to a exited Strand prog,
|
|
# since incr is entitled to be run at *any time* (including
|
|
# after exitval is set, though it doesn't do anything) and any
|
|
# such incements will prevent deletion of a Strand via
|
|
# foreign_key
|
|
child.semaphores_dataset.destroy
|
|
child.destroy
|
|
reaper&.call(child)
|
|
end
|
|
|
|
# Parent is now a leaf, hop to given label, or yield if no label
|
|
if active_children.empty?
|
|
if hop
|
|
dynamic_hop(hop)
|
|
elsif block_given?
|
|
yield
|
|
end
|
|
end
|
|
|
|
unless fallthrough
|
|
# Parent is not a leaf, nap for given time, or donate if no
|
|
# nap time is given.
|
|
nap ? nap(nap) : donate
|
|
end
|
|
end
|
|
|
|
# A hop is a kind of jump, as in, like a jump instruction.
|
|
private def dynamic_hop(label)
|
|
fail "BUG: #hop only accepts a symbol" unless label.is_a? Symbol
|
|
fail "BUG: not valid hop target" unless self.class.labels.include? label
|
|
label = label.to_s
|
|
fail Hop.new(@strand.prog, @strand.label, {label: label, retval: nil})
|
|
end
|
|
|
|
def register_deadline(deadline_target, deadline_in, allow_extension: false)
|
|
current_frame = strand.stack.first
|
|
if (deadline_at = current_frame["deadline_at"]).nil? ||
|
|
(old_deadline_target = current_frame["deadline_target"]) != deadline_target ||
|
|
allow_extension ||
|
|
Time.parse(deadline_at.to_s) > Time.now + deadline_in
|
|
|
|
if old_deadline_target != deadline_target && (pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, old_deadline_target))
|
|
pg.incr_resolve
|
|
end
|
|
|
|
current_frame["deadline_target"] = deadline_target
|
|
current_frame["deadline_at"] = Time.now + deadline_in
|
|
|
|
strand.modified!(:stack)
|
|
end
|
|
end
|
|
|
|
# Copied from sequel/model/inflections.rb's camelize, to convert
|
|
# table names into idiomatic model class names.
|
|
private_class_method def self.camelize(s)
|
|
s.gsub(/\/(.?)/) { |x| "::#{x[-1..].upcase}" }.gsub(/(^|_)(.)/) { |x| x[-1..].upcase }
|
|
end
|
|
end
|