Files
ubicloud/model/strand.rb
Enes Cakir b8e0008d94 Log how much time spent on each label
We log the duration once the strand has finished running, but this only
pertains to a single run. Currently, we lack a metric to determine the
time spent on each label, which is crucial for identifying the label
that consumes most of our time. This PR adds the last label change
timestamp to the stack and logs the duration when the strand is hopped
or exited.
2024-07-23 10:01:32 +03:00

183 lines
5.3 KiB
Ruby

# frozen_string_literal: true
require_relative "../model"
require "time"
class Strand < Sequel::Model
Strand.plugin :defaults_setter, cache: true
Strand.default_values[:stack] = proc { [{}] }
LEASE_EXPIRATION = 120
many_to_one :parent, key: :parent_id, class: self
one_to_many :children, key: :parent_id, class: self
one_to_many :semaphores
include ResourceMethods
def subject
UBID.decode(ubid)
end
def take_lease_and_reload
affected = DB[<<SQL, id].first
UPDATE strand
SET lease = now() + '120 seconds', try = try + 1, schedule = #{SCHEDULE}
WHERE id = ? AND (lease IS NULL OR lease < now()) AND exitval IS NULL
RETURNING lease
SQL
return false unless affected
lease_time = affected.fetch(:lease)
Clog.emit("obtained lease") { {lease_acquired: {time: lease_time, delay: Time.now - schedule}} }
reload
begin
yield
ensure
if @deleted
unless DB["SELECT FROM strand WHERE id = ?", id].empty?
fail "BUG: strand with @deleted set still exists in the database"
end
else
DB.transaction do
lease_clear_debug_snapshot = this.for_update.all
num_updated = DB[<<SQL, id, lease_time].update
UPDATE strand
SET lease = NULL
WHERE id = ? AND lease = ?
SQL
Clog.emit("lease cleared") { {lease_cleared: {num_updated: num_updated}} }
unless num_updated == 1
Clog.emit("lease violated data") do
{lease_clear_debug_snapshot: lease_clear_debug_snapshot}
end
fail "BUG: lease violated"
end
end
end
end
end
def self.prog_verify(prog)
case prog.name
when /\AProg::(.*)\z/
$1
else
fail "BUG: prog must be in Prog module"
end
end
# :nocov:
SCHEDULE = Config.development? ? "(now() + least(5, try) * '1 second'::interval)" : "(now() + least(2 ^ least(try, 20), 600) * random() * '1 second'::interval)"
# :nocov:
def load(snap = nil)
Object.const_get("::Prog::" + prog).new(self, snap)
end
def unsynchronized_run
start_time = Time.now
prog_label = "#{prog}.#{label}"
Clog.emit("starting strand") { [self, {strand_started: {prog_label: prog_label}}] }
if label == stack.first["deadline_target"].to_s
if (pg = Page.from_tag_parts("Deadline", id, prog, stack.first["deadline_target"]))
pg.incr_resolve
end
stack.first.delete("deadline_target")
stack.first.delete("deadline_at")
modified!(:stack)
end
effective_prog = prog
stack.each do |frame|
if (deadline_at = frame["deadline_at"])
if Time.now > Time.parse(deadline_at.to_s)
Prog::PageNexus.assemble("#{ubid} has an expired deadline! #{effective_prog}.#{label} did not reach #{frame["deadline_target"]} on time", ["Deadline", id, effective_prog, frame["deadline_target"]], ubid)
modified!(:stack)
end
end
if (link = frame["link"])
effective_prog = link[0]
end
end
unless stack.first["last_label_changed_at"]
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
end
DB.transaction do
SemSnap.use(id) do |snap|
prg = load(snap)
prg.public_send(:before_run) if prg.respond_to?(:before_run)
prg.public_send(label)
end
rescue Prog::Base::Nap => e
save_changes
scheduled = DB[<<SQL, e.seconds, id].get
UPDATE strand
SET try = 0, schedule = now() + (? * '1 second'::interval)
WHERE id = ?
RETURNING schedule
SQL
# For convenience, reflect the updated record's schedule content
# in the model object, but since it's fresh, remove it from the
# changed columns so save_changes won't update it again.
self.schedule = scheduled
changed_columns.delete(:schedule)
e
rescue Prog::Base::Hop => hp
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("hopped") { {strand_hopped: {duration: Time.now - last_changed_at, from: prog_label, to: "#{hp.new_prog}.#{hp.new_label}"}} }
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
update(**hp.strand_update_args.merge(try: 0))
hp
rescue Prog::Base::Exit => ext
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("exited") { {strand_exited: {duration: Time.now - last_changed_at, from: prog_label}} }
update(exitval: ext.exitval, retval: nil)
if parent_id.nil?
# No parent Strand to reap here, so self-reap.
Semaphore.where(strand_id: id).destroy
destroy
@deleted = true
end
ext
else
fail "BUG: Prog #{prog}##{label} did not provide flow control"
end
ensure
Clog.emit("finished strand") { [self, {strand_finished: {duration: Time.now - start_time, prog_label: prog_label}}] }
end
def run(seconds = 0)
fail "already deleted" if @deleted
deadline = Time.now + seconds
take_lease_and_reload do
loop do
ret = unsynchronized_run
now = Time.now
if now > deadline ||
(ret.is_a?(Prog::Base::Nap) && ret.seconds != 0) ||
ret.is_a?(Prog::Base::Exit)
return ret
end
end
end
end
end
# We need to unrestrict primary key so strand.add_child works in Prog::Base.
Strand.unrestrict_primary_key