The next step after this is to set `NOT NULL` and change the dispatcher the scan predicate. While doing a bit of optimizing on common queries against `strand`, the exclusion of leased tuples is somewhat more complicated than necessary: => CREATE INDEX strand_scan_test ON strand (schedule, lease); => SET enable_seqscan = false; -- useful in test, with empty strand relation => EXPLAIN SELECT * FROM "strand" WHERE ((lease IS NULL OR lease < now()) AND schedule < now() AND exitval IS NULL) ORDER BY "schedule" LIMIT 80; QUERY PLAN ------------------------------------------------------------------------------------------ Limit (cost=0.29..60.47 rows=80 width=259) -> Index Scan using strand_scan_test on strand (cost=0.29..304.95 rows=405 width=259) Index Cond: (schedule < now()) Filter: ((lease IS NULL) OR (lease < now())) (4 rows) This is mostly okay, since the fraction of tuples that are leased is generally small, but would be better to evaluate the entire predicate in the index condition. Unfortunately, Postgres doesn't know how to start two distinct scans into the index (e.g. one to scan for `lease IS NULL`, a second for the `lease < now()` condition. For illustration, a `UNION ALL` can accomplish this: => EXPLAIN (SELECT * FROM strand WHERE lease IS NULL AND schedule < now() AND exitval IS NULL ORDER BY schedule LIMIT 80) UNION ALL (SELECT * FROM strand WHERE lease < now() AND schedule < now() AND exitval IS NULL ORDER BY schedule LIMIT 80) ORDER BY schedule LIMIT 80; QUERY PLAN ------------------------------------------------------------------------------------------------------------- Limit (cost=0.27..16.35 rows=2 width=212) -> Merge Append (cost=0.27..16.35 rows=2 width=212) Sort Key: strand.schedule -> Limit (cost=0.13..8.15 rows=1 width=212) -> Index Scan using strand_scan_test on strand (cost=0.13..8.15 rows=1 width=212) Index Cond: ((schedule < now()) AND (lease IS NULL)) -> Limit (cost=0.13..8.15 rows=1 width=212) -> Index Scan using strand_scan_test on strand strand_1 (cost=0.13..8.15 rows=1 width=212) Index Cond: ((schedule < now()) AND (lease < now())) But another way to do this, pursued by this patch, is to eliminate the two distinct algebras, one for `IS NULL` and one for `<` altogether, by eliminating NULLs, so that the following query is complete to identify all scheduled work with an expired lease (as seen in the second half of `UNION ALL` above): SELECT * FROM strand WHERE lease < now() AND schedule < now() AND exitval IS NULL ORDER BY schedule LIMIT 80 To do this, when clearing a lease, backdate the current time by 1000 years, thus, they'd look like `1025-04-01 16:07:12.805075-07:52:58`. This format has a few positive properties: 1. It's early enough to maintain correctness of leasing mutual exclusion. 2. It's immediately able to be visually identified by a leading `1` 3. It doesn't have a zero-padded integer for a year (e.g. `0025`) that can sometimes cause confusion in parsing routines. 4. It's never a negative year. 5. The lower bits can inform you when the lease was released, i.e. the calculation is reversible. 6. It's easy to split the data set into cleared and uncleared leases, by using a constant value (e.g. year 2000): SELECT * FROM strand WHERE lease < '2000 01 01'
115 lines
3.4 KiB
Ruby
115 lines
3.4 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "spec_helper"
|
|
|
|
RSpec.describe Strand do
|
|
let(:st) {
|
|
described_class.new(id: described_class.generate_uuid,
|
|
prog: "Test",
|
|
label: "start")
|
|
}
|
|
|
|
context "when leasing" do
|
|
it "can take a lease only if one is not already taken" do
|
|
st.save_changes
|
|
did_it = st.take_lease_and_reload {
|
|
expect(st.take_lease_and_reload {
|
|
:never_happens
|
|
}).to be false
|
|
|
|
:did_it
|
|
}
|
|
expect(did_it).to be :did_it
|
|
end
|
|
|
|
it "does an integrity check that deleted records are gone" do
|
|
st.label = "hop_exit"
|
|
st.save_changes
|
|
original = DB.method(:[])
|
|
original = original.super_method unless original.owner == Sequel::Database
|
|
expect(DB).to receive(:[]) do |*args, **kwargs|
|
|
case args
|
|
when ["SELECT FROM strand WHERE id = ?", st.id]
|
|
instance_double(Sequel::Dataset, empty?: false)
|
|
else
|
|
original.call(*args, **kwargs)
|
|
end
|
|
end.at_least(:once)
|
|
|
|
expect { st.run }.to raise_error RuntimeError, "BUG: strand with @deleted set still exists in the database"
|
|
end
|
|
|
|
it "does an integrity check that the lease was modified as expected" do
|
|
st.label = "napper"
|
|
st.save_changes
|
|
original = DB.method(:[])
|
|
original = original.super_method unless original.owner == Sequel::Database
|
|
expect(DB).to receive(:[]) do |*args, **kwargs|
|
|
case args[0]
|
|
when /UPDATE strand
|
|
SET lease = .*
|
|
WHERE id = \? AND lease = \?
|
|
/
|
|
instance_double(Sequel::Dataset, update: 0)
|
|
else
|
|
original.call(*args, **kwargs)
|
|
end
|
|
end.at_least(:once)
|
|
|
|
expect(Clog).to receive(:emit).with("lease violated data").and_call_original
|
|
allow(Clog).to receive(:emit).and_call_original
|
|
expect { st.run }.to raise_error RuntimeError, "BUG: lease violated"
|
|
end
|
|
end
|
|
|
|
it "can load a prog" do
|
|
expect(st.load).to be_instance_of Prog::Test
|
|
end
|
|
|
|
it "can hop" do
|
|
st.save_changes
|
|
st.label = "hop_entry"
|
|
expect(st).to receive(:load).and_return Prog::Test.new(st)
|
|
expect {
|
|
st.unsynchronized_run
|
|
}.to change(st, :label).from("hop_entry").to("hop_exit")
|
|
end
|
|
|
|
it "rejects prog names that are not in the right module" do
|
|
expect {
|
|
described_class.prog_verify(Object)
|
|
}.to raise_error RuntimeError, "BUG: prog must be in Prog module"
|
|
end
|
|
|
|
it "crashes if a label does not provide flow control" do
|
|
expect {
|
|
st.unsynchronized_run
|
|
}.to raise_error RuntimeError, "BUG: Prog Test#start did not provide flow control"
|
|
end
|
|
|
|
it "can run labels consecutively if a deadline is not reached" do
|
|
st.label = "hop_entry"
|
|
st.save_changes
|
|
expect {
|
|
st.run(10)
|
|
}.to change { [st.label, st.exitval] }.from(["hop_entry", nil]).to(["hop_exit", {msg: "hop finished"}])
|
|
end
|
|
|
|
it "logs end of strand if it took long" do
|
|
st.label = "napper"
|
|
st.save_changes
|
|
expect(Time).to receive(:now).and_return(Time.now - 10, Time.now, Time.now)
|
|
expect(Clog).to receive(:emit).with("finished strand").and_call_original
|
|
st.unsynchronized_run
|
|
end
|
|
|
|
it "occasionally logs acquisition and release of lease" do
|
|
st.label = "napper"
|
|
st.save_changes
|
|
expect(st).to receive(:rand).and_return(0)
|
|
expect(Clog).to receive(:emit).with("obtained lease").and_call_original
|
|
expect(Clog).to receive(:emit).with("lease cleared").and_call_original
|
|
st.take_lease_and_reload {}
|
|
end
|
|
end
|