Files
ubicloud/prog/base.rb
Jeremy Evans 15b91ff0c4 Absorb leaf into reap
Reviewing leaf usage in progs, it always occurs right after reap.
Combining leaf and reap methods avoids a redundant query for the
strand's children.

It's typical for nap or donate to be called after the leaf check
after reap.  Also build this into reap, by calling donate by default,
or nap if a nap keyword argument is given.

There are a few cases where reap was called without leaf/donate.
Add a fallthrough keyword argument to support this, so if there are
no children, it does not call either nap or donate

Vm::HostNexus#wait_prep and Kubernetes::UpgradeKubernetesNode#wait_new_node
both need the return value of the reapable child(ren). Add a reaper
keyword argument for this, which is called once for each child.

The most common pattern for using reap/leaf/donate was:

```ruby
reap
hop_download_lb_cert if leaf?
donate
```

This turns into:

```ruby
reap(:download_lb_cert)
```

The second most common pattern was:

```ruby
reap
donate unless leaf?
pop "upgrade cancelled" # or other code
```

This turns into:

```ruby
reap { pop "upgrade cancelled" }
```

In a few places, I changed operations on strand.children to
strand.children_dataset.  Now that we are no longer using
cached children by default, it's better to do these checks
in the database intead of in Ruby.  These places deserve careful
review:

* Prog::Minio::MinioServerNexus#unavailable
* Prog::Postgres::PostgresResourceNexus#wait
* Prog::Postgres::PostgresServerNexus#unavailable

For Prog::Vnet::LoadBalancerNexus#wait_update_vm_load_balancers,
I removed a check on the children completely. It was checking
for an exitval using children_dataset directly after reap,
which should only be true if there was still an active lease
for the child.  This also deserves careful review.

This broke many mocked tests.  This fixes the mocked tests
to use database-backed objects, ensuring that we are testing
observable behavior and not implementation details.
2025-06-26 03:49:53 +09:00

271 lines
7.0 KiB
Ruby

# frozen_string_literal: true
class Prog::Base
attr_reader :strand, :subject_id
def initialize(strand, snap = nil)
@snap = snap || SemSnap.new(strand.id)
@strand = strand
@subject_id = frame.dig("subject_id") || @strand.id
end
def self.subject_is(*names)
names.each do |name|
class_eval %(
def #{name}
@#{name} ||= #{camelize(name.to_s)}[@subject_id]
end
), __FILE__, __LINE__ - 4
subject_class = Object.const_get(camelize(name.to_s))
if subject_class.respond_to?(:semaphore_names)
semaphore(*subject_class.semaphore_names)
end
end
end
def self.semaphore(*names)
names.map!(&:intern)
names.each do |name|
define_method :"incr_#{name}" do
@snap.incr(name)
end
define_method :"decr_#{name}" do
@snap.decr(name)
end
class_eval %{
def when_#{name}_set?
if @snap.set?(#{name.inspect})
yield
end
end
}, __FILE__, __LINE__ - 6
end
end
def self.labels
@labels || []
end
def self.label(label)
(@labels ||= []) << label
define_method :"hop_#{label}" do
dynamic_hop label
end
end
def nap(seconds = 30)
fail Nap.new(seconds)
end
def pop(arg)
outval = Sequel.pg_jsonb_wrap(
case arg
when String
{"msg" => arg}
when Hash
arg
else
fail "BUG: must pop with string or hash"
end
)
if strand.stack.length > 0 && (link = frame["link"])
# This is a multi-level stack with a back-link, i.e. one prog
# calling another in the same Strand of execution. The thing to
# do here is pop the stack entry.
pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, strand.stack.first["deadline_target"])
pg&.incr_resolve
old_prog = strand.prog
old_label = strand.label
prog, label = link
fail Hop.new(old_prog, old_label,
{retval: outval,
stack: Sequel.pg_jsonb_wrap(@strand.stack[1..]),
prog: prog, label: label})
else
fail "BUG: expect no stacks exceeding depth 1 with no back-link" if strand.stack.length > 1
pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, strand.stack.first["deadline_target"])
pg&.incr_resolve
# Child strand with zero or one stack frames, set exitval. Clear
# retval to avoid confusion, as it would have been set in a
# previous intra-strand stack pop.
fail Exit.new(strand, outval)
end
end
class FlowControl < RuntimeError; end
class Exit < FlowControl
attr_reader :exitval
def initialize(strand, exitval)
@strand = strand
@exitval = exitval
set_backtrace []
end
def to_s
"Strand exits from #{@strand.prog}##{@strand.label} with #{@exitval}"
end
end
class Hop < FlowControl
attr_reader :strand_update_args, :old_prog
def initialize(old_prog, old_label, strand_update_args)
@old_prog = old_prog
@old_label = old_label
@strand_update_args = strand_update_args
set_backtrace []
end
def new_label
@strand_update_args[:label] || @old_label
end
def new_prog
@strand_update_args[:prog] || @old_prog
end
def to_s
"hop #{@old_prog}##{@old_label} -> #{new_prog}##{new_label}"
end
end
class Nap < FlowControl
attr_reader :seconds
def initialize(seconds)
@seconds = seconds
set_backtrace []
end
def to_s
"nap for #{seconds} seconds"
end
end
def frame
@frame ||= strand.stack.first.dup.freeze
end
def retval
strand.retval
end
def push(prog, new_frame = {}, label = "start")
old_prog = strand.prog
old_label = strand.label
new_frame = {"subject_id" => @subject_id, "link" => [strand.prog, old_label]}.merge(new_frame)
fail Hop.new(old_prog, old_label,
{prog: Strand.prog_verify(prog), label: label,
stack: [new_frame] + strand.stack, retval: nil})
end
def bud(prog, new_frame = {}, label = "start")
new_frame = {"subject_id" => @subject_id}.merge(new_frame)
strand.add_child(
id: Strand.generate_uuid,
prog: Strand.prog_verify(prog),
label: label,
stack: Sequel.pg_jsonb_wrap([new_frame])
)
end
def donate
strand.children_dataset.each(&:run)
nap 1
end
# Process child strands
#
# Reapable children (child strands that have exited) are destroyed.
# If a reaper argument is given, it is called with each child after
# the child is destroyed.
#
# If there are no reapable children:
#
# * If hop is given: hops to the target
# * If block is given: yields to block
#
# If there are still active children:
#
# * If fallthrough is given: returns nil
# * If nap is given: naps for given time
# * Otherwise, donates to run a child process
def reap(hop = nil, reaper: nil, nap: nil, fallthrough: false)
children = strand
.children_dataset
.select_append(Sequel.lit("lease < now() AND exitval IS NOT NULL").as(:reapable))
.all
reapable_children, active_children = children.partition { it.values.delete(:reapable) }
reapable_children.each do |child|
# Clear any semaphores that get added to a exited Strand prog,
# since incr is entitled to be run at *any time* (including
# after exitval is set, though it doesn't do anything) and any
# such incements will prevent deletion of a Strand via
# foreign_key
child.semaphores_dataset.destroy
child.destroy
reaper&.call(child)
end
# Parent is now a leaf, hop to given label, or yield if no label
if active_children.empty?
if hop
dynamic_hop(hop)
elsif block_given?
yield
end
end
unless fallthrough
# Parent is not a leaf, nap for given time, or donate if no
# nap time is given.
nap ? nap(nap) : donate
end
end
# A hop is a kind of jump, as in, like a jump instruction.
private def dynamic_hop(label)
fail "BUG: #hop only accepts a symbol" unless label.is_a? Symbol
fail "BUG: not valid hop target" unless self.class.labels.include? label
label = label.to_s
fail Hop.new(@strand.prog, @strand.label, {label: label, retval: nil})
end
def register_deadline(deadline_target, deadline_in, allow_extension: false)
current_frame = strand.stack.first
if (deadline_at = current_frame["deadline_at"]).nil? ||
(old_deadline_target = current_frame["deadline_target"]) != deadline_target ||
allow_extension ||
Time.parse(deadline_at.to_s) > Time.now + deadline_in
if old_deadline_target != deadline_target && (pg = Page.from_tag_parts("Deadline", strand.id, strand.prog, old_deadline_target))
pg.incr_resolve
end
current_frame["deadline_target"] = deadline_target
current_frame["deadline_at"] = Time.now + deadline_in
strand.modified!(:stack)
end
end
# Copied from sequel/model/inflections.rb's camelize, to convert
# table names into idiomatic model class names.
private_class_method def self.camelize(s)
s.gsub(/\/(.?)/) { |x| "::#{x[-1..].upcase}" }.gsub(/(^|_)(.)/) { |x| x[-1..].upcase }
end
end