Files
ubicloud/model/strand.rb
Enes Cakir eef9c54107 Revert "Add the implicit_subquery extension"
This reverts commit 0085befc4a.

This commit caused puma to fail to start, breaking production.

You can reproduce it locally by running:

    RACK_ENV=production bundle exec puma -t 5:5 -p ${PORT:-9292} -e ${RACK_ENV:-production}

The head of logs:

    2024-10-17 08:58:08  Starting process with command `bundle exec puma -t 5:5 -p ${PORT:-3000} -e ${RACK_ENV:-development}`
    2024-10-17 08:58:09  Puma starting in single mode...
    2024-10-17 08:58:09  * Puma version: 6.4.3 (ruby 3.2.5-p208) ("The Eagle of Durango")
    2024-10-17 08:58:09  *  Min threads: 5
    2024-10-17 08:58:09  *  Max threads: 5
    2024-10-17 08:58:09  *  Environment: production
    2024-10-17 08:58:09  *          PID: 2
    2024-10-17 08:58:11  ! Unable to load application: Sequel::DatabaseError: PG::SyntaxError: ERROR:  syntax error at or near ")"
    2024-10-17 08:58:11  LINE 1: ...T NULL AS "v" FROM (SHOW max_prepared_transactions) AS "t1" ...
    2024-10-17 08:58:11  bundler: failed to load command: puma (/app/vendor/bundle/ruby/3.2.0/bin/puma)
    2024-10-17 08:58:11  /app/vendor/bundle/ruby/3.2.0/gems/sequel-5.84.0/lib/sequel/adapters/postgres.rb:171:in `exec': PG::SyntaxError: ERROR:  syntax error at or near ")" (Sequel::DatabaseError)
    2024-10-17 08:58:11  LINE 1: ...T NULL AS "v" FROM (SHOW max_prepared_transactions) AS "t1" ...
    ...
    2024-10-17 08:58:11  	from /app/clover.rb:12:in `freeze'
    2024-10-17 08:58:11  	from config.ru:7:in `block (2 levels) in <top (required)>'
    ...
    2024-10-17 08:58:11  /app/vendor/bundle/ruby/3.2.0/gems/sequel-5.84.0/lib/sequel/adapters/postgres.rb:171:in `exec': ERROR:  syntax error at or near ")" (PG::SyntaxError)
    2024-10-17 08:58:11  LINE 1: ...T NULL AS "v" FROM (SHOW max_prepared_transactions) AS "t1" ...
    ...
    2024-10-17 08:58:11  	from /app/clover.rb:12:in `freeze'
    2024-10-17 08:58:11  	from config.ru:7:in `block (2 levels) in <top (required)>'
2024-10-17 12:15:05 +03:00

183 lines
5.3 KiB
Ruby

# frozen_string_literal: true
require_relative "../model"
require "time"
class Strand < Sequel::Model
Strand.plugin :defaults_setter, cache: true
Strand.default_values[:stack] = proc { [{}] }
LEASE_EXPIRATION = 120
many_to_one :parent, key: :parent_id, class: self
one_to_many :children, key: :parent_id, class: self
one_to_many :semaphores
include ResourceMethods
def subject
UBID.decode(ubid)
end
def take_lease_and_reload
affected = DB[<<SQL, id].first
UPDATE strand
SET lease = now() + '120 seconds', try = try + 1, schedule = #{SCHEDULE}
WHERE id = ? AND (lease IS NULL OR lease < now()) AND exitval IS NULL
RETURNING lease
SQL
return false unless affected
lease_time = affected.fetch(:lease)
Clog.emit("obtained lease") { {lease_acquired: {time: lease_time, delay: Time.now - schedule}} }
reload
begin
yield
ensure
if @deleted
unless DB["SELECT FROM strand WHERE id = ?", id].empty?
fail "BUG: strand with @deleted set still exists in the database"
end
else
DB.transaction do
lease_clear_debug_snapshot = this.for_update.all
num_updated = DB[<<SQL, id, lease_time].update
UPDATE strand
SET lease = NULL
WHERE id = ? AND lease = ?
SQL
Clog.emit("lease cleared") { {lease_cleared: {num_updated: num_updated}} }
unless num_updated == 1
Clog.emit("lease violated data") do
{lease_clear_debug_snapshot: lease_clear_debug_snapshot}
end
fail "BUG: lease violated"
end
end
end
end
end
def self.prog_verify(prog)
case prog.name
when /\AProg::(.*)\z/
$1
else
fail "BUG: prog must be in Prog module"
end
end
# :nocov:
SCHEDULE = Config.development? ? "(now() + least(5, try) * '1 second'::interval)" : "(now() + least(2 ^ least(try, 20), 600) * random() * '1 second'::interval)"
# :nocov:
def load(snap = nil)
Object.const_get("::Prog::" + prog).new(self, snap)
end
def unsynchronized_run
start_time = Time.now
prog_label = "#{prog}.#{label}"
Clog.emit("starting strand") { [self, {strand_started: {prog_label: prog_label}}] }
if label == stack.first["deadline_target"].to_s
if (pg = Page.from_tag_parts("Deadline", id, prog, stack.first["deadline_target"]))
pg.incr_resolve
end
stack.first.delete("deadline_target")
stack.first.delete("deadline_at")
modified!(:stack)
end
effective_prog = prog
stack.each do |frame|
if (deadline_at = frame["deadline_at"])
if Time.now > Time.parse(deadline_at.to_s)
Prog::PageNexus.assemble("#{ubid} has an expired deadline! #{effective_prog}.#{label} did not reach #{frame["deadline_target"]} on time", ["Deadline", id, effective_prog, frame["deadline_target"]], ubid)
modified!(:stack)
end
end
if (link = frame["link"])
effective_prog = link[0]
end
end
unless stack.first["last_label_changed_at"]
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
end
DB.transaction do
SemSnap.use(id) do |snap|
prg = load(snap)
prg.public_send(:before_run) if prg.respond_to?(:before_run)
prg.public_send(label)
end
rescue Prog::Base::Nap => e
save_changes
scheduled = DB[<<SQL, e.seconds, id].get
UPDATE strand
SET try = 0, schedule = now() + (? * '1 second'::interval)
WHERE id = ?
RETURNING schedule
SQL
# For convenience, reflect the updated record's schedule content
# in the model object, but since it's fresh, remove it from the
# changed columns so save_changes won't update it again.
self.schedule = scheduled
changed_columns.delete(:schedule)
e
rescue Prog::Base::Hop => hp
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("hopped") { {strand_hopped: {duration: Time.now - last_changed_at, from: prog_label, to: "#{hp.new_prog}.#{hp.new_label}"}} }
stack.first["last_label_changed_at"] = Time.now.to_s
modified!(:stack)
update(**hp.strand_update_args.merge(try: 0))
hp
rescue Prog::Base::Exit => ext
last_changed_at = Time.parse(stack.first["last_label_changed_at"])
Clog.emit("exited") { {strand_exited: {duration: Time.now - last_changed_at, from: prog_label}} }
update(exitval: ext.exitval, retval: nil)
if parent_id.nil?
# No parent Strand to reap here, so self-reap.
Semaphore.where(strand_id: id).destroy
destroy
@deleted = true
end
ext
else
fail "BUG: Prog #{prog}##{label} did not provide flow control"
end
ensure
Clog.emit("finished strand") { [self, {strand_finished: {duration: Time.now - start_time, prog_label: prog_label}}] }
end
def run(seconds = 0)
fail "already deleted" if @deleted
deadline = Time.now + seconds
take_lease_and_reload do
loop do
ret = unsynchronized_run
now = Time.now
if now > deadline ||
(ret.is_a?(Prog::Base::Nap) && ret.seconds != 0) ||
ret.is_a?(Prog::Base::Exit)
return ret
end
end
end
end
end
# We need to unrestrict primary key so strand.add_child works in Prog::Base.
Strand.unrestrict_primary_key