Files
ubicloud/model/strand.rb
Enes Cakir 583902f656 Add more data to the deadline pages
Currently, the deadline pages only show the resource type, ID, and
expected target label. It would be helpful to include additional data
based on the resource type for the on-call engineer. For example, when a
host crashes, we receive multiple deadline pages from virtual machines,
making it easier for the on-call engineer to identify the host by
reviewing the incident details in Slack.

Additionally, while I haven't implemented this, we could choose whether
or not to trigger the page based on the resource type to avoid page
storms. For instance, when creating a new deadline page for a virtual
machine, we can check if its host is unavailable. If the host is down,
we can decide not to trigger the page for the virtual machine.
2025-02-03 08:34:22 +03:00

216 lines
6.9 KiB
Ruby

# frozen_string_literal: true
require_relative "../model"
require "time"
class Strand < Sequel::Model
Strand.plugin :defaults_setter, cache: true
Strand.default_values[:stack] = proc { [{}] }
LEASE_EXPIRATION = 120
many_to_one :parent, key: :parent_id, class: self
one_to_many :children, key: :parent_id, class: self
one_to_many :semaphores
include ResourceMethods
def subject
UBID.decode(ubid)
end
def take_lease_and_reload
affected = DB[<<SQL, id].first
UPDATE strand
SET lease = now() + '120 seconds', try = try + 1, schedule = #{SCHEDULE}
WHERE id = ? AND (lease IS NULL OR lease < now()) AND exitval IS NULL
RETURNING lease
SQL
return false unless affected
lease_time = affected.fetch(:lease)
verbose_logging = rand(1000) == 0
Clog.emit("obtained lease") { {lease_acquired: {time: lease_time, delay: Time.now - schedule}} } if verbose_logging
reload
begin
yield
ensure
if @deleted
unless DB["SELECT FROM strand WHERE id = ?", id].empty?
fail "BUG: strand with @deleted set still exists in the database"
end
else
DB.transaction do
lease_clear_debug_snapshot = this.for_update.all
num_updated = DB[<<SQL, id, lease_time].update
UPDATE strand
SET lease = NULL
WHERE id = ? AND lease = ?
SQL
Clog.emit("lease cleared") { {lease_cleared: {num_updated: num_updated}} } if verbose_logging
unless num_updated == 1
Clog.emit("lease violated data") do
{lease_clear_debug_snapshot: lease_clear_debug_snapshot}
end
fail "BUG: lease violated"
end
end
end
end
end
def self.prog_verify(prog)
case prog.name
when /\AProg::(.*)\z/
$1
else
fail "BUG: prog must be in Prog module"
end
end
# :nocov:
SCHEDULE = Config.development? ? "(now() + least(5, try) * '1 second'::interval)" : "(now() + least(2 ^ least(try, 20), 600) * random() * '1 second'::interval)"
# :nocov:
def load(snap = nil)
Object.const_get("::Prog::" + prog).new(self, snap)
end
def unsynchronized_run
start_time = Time.now
prog_label = "#{prog}.#{label}"
if label == stack.first["deadline_target"]
if (pg = Page.from_tag_parts("Deadline", id, prog, stack.first["deadline_target"]))
pg.incr_resolve
end
stack.first.delete("deadline_target")
stack.first.delete("deadline_at")
modified!(:stack)
end
effective_prog = prog
stack.each do |frame|
if (deadline_at = frame["deadline_at"])
if Time.now > Time.parse(deadline_at.to_s)
sbj = subject
extra_data = case sbj
when Vm
{vm_host: sbj.vm_host&.ubid, data_center: sbj.vm_host&.data_center, boot_image: sbj.boot_image, location: sbj.location, arch: sbj.arch, vcpus: sbj.vcpus, ipv4: sbj.ephemeral_net4.to_s}
when VmHost
{data_center: sbj.data_center, location: sbj.location, arch: sbj.arch, ipv4: sbj.sshable.host, total_cores: sbj.total_cores, allocation_state: sbj.allocation_state, os_version: sbj.os_version, vm_count: sbj.vms_dataset.count}
when GithubRunner
{label: sbj.label, installation: sbj.installation.ubid, vm: sbj.vm&.ubid, vm_host: sbj.vm&.vm_host&.ubid, data_center: sbj.vm&.vm_host&.data_center}
else
{}
end
extra_data.compact!
Prog::PageNexus.assemble("#{ubid} has an expired deadline! #{effective_prog}.#{label} did not reach #{frame["deadline_target"]} on time", ["Deadline", id, effective_prog, frame["deadline_target"]], ubid, extra_data:)
modified!(:stack)
end
end
if (link = frame["link"])
effective_prog = link[0]
end
end
unless stack.first["last_label_changed_at"]
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
end
DB.transaction do
SemSnap.use(id) do |snap|
prg = load(snap)
prg.public_send(:before_run) if prg.respond_to?(:before_run)
prg.public_send(label)
end
rescue Prog::Base::Nap => e
save_changes
scheduled = DB[<<SQL, e.seconds, id].get
UPDATE strand
SET try = 0, schedule = now() + (? * '1 second'::interval)
WHERE id = ?
RETURNING schedule
SQL
# For convenience, reflect the updated record's schedule content
# in the model object, but since it's fresh, remove it from the
# changed columns so save_changes won't update it again.
self.schedule = scheduled
changed_columns.delete(:schedule)
e
rescue Prog::Base::Hop => hp
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("hopped") { {strand_hopped: {duration: Time.now - last_changed_at, from: prog_label, to: "#{hp.new_prog}.#{hp.new_label}"}} }
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
update(**hp.strand_update_args.merge(try: 0))
hp
rescue Prog::Base::Exit => ext
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("exited") { {strand_exited: {duration: Time.now - last_changed_at, from: prog_label}} }
update(exitval: ext.exitval, retval: nil)
if parent_id.nil?
# No parent Strand to reap here, so self-reap.
Semaphore.where(strand_id: id).destroy
destroy
@deleted = true
end
ext
else
fail "BUG: Prog #{prog}##{label} did not provide flow control"
end
ensure
duration = Time.now - start_time
Clog.emit("finished strand") { [self, {strand_finished: {duration:, prog_label:}}] } if duration > 1
end
def run(seconds = 0)
fail "already deleted" if @deleted
deadline = Time.now + seconds
take_lease_and_reload do
loop do
ret = unsynchronized_run
now = Time.now
if now > deadline ||
(ret.is_a?(Prog::Base::Nap) && ret.seconds != 0) ||
ret.is_a?(Prog::Base::Exit)
return ret
end
end
end
end
end
# We need to unrestrict primary key so strand.add_child works in Prog::Base.
Strand.unrestrict_primary_key
# Table: strand
# Columns:
# id | uuid | PRIMARY KEY
# parent_id | uuid |
# schedule | timestamp with time zone | NOT NULL DEFAULT now()
# lease | timestamp with time zone |
# prog | text | NOT NULL
# label | text | NOT NULL
# stack | jsonb | NOT NULL DEFAULT '[{}]'::jsonb
# exitval | jsonb |
# retval | jsonb |
# try | integer | NOT NULL DEFAULT 0
# Indexes:
# strand_pkey | PRIMARY KEY btree (id)
# Foreign key constraints:
# strand_parent_id_fkey | (parent_id) REFERENCES strand(id)
# Referenced By:
# semaphore | semaphore_strand_id_fkey | (strand_id) REFERENCES strand(id)
# strand | strand_parent_id_fkey | (parent_id) REFERENCES strand(id)