Files
ubicloud/prog/vnet/subnet_nexus.rb
Jeremy Evans 15b91ff0c4 Absorb leaf into reap
Reviewing leaf usage in progs, it always occurs right after reap.
Combining leaf and reap methods avoids a redundant query for the
strand's children.

It's typical for nap or donate to be called after the leaf check
after reap.  Also build this into reap, by calling donate by default,
or nap if a nap keyword argument is given.

There are a few cases where reap was called without leaf/donate.
Add a fallthrough keyword argument to support this, so if there are
no children, it does not call either nap or donate

Vm::HostNexus#wait_prep and Kubernetes::UpgradeKubernetesNode#wait_new_node
both need the return value of the reapable child(ren). Add a reaper
keyword argument for this, which is called once for each child.

The most common pattern for using reap/leaf/donate was:

```ruby
reap
hop_download_lb_cert if leaf?
donate
```

This turns into:

```ruby
reap(:download_lb_cert)
```

The second most common pattern was:

```ruby
reap
donate unless leaf?
pop "upgrade cancelled" # or other code
```

This turns into:

```ruby
reap { pop "upgrade cancelled" }
```

In a few places, I changed operations on strand.children to
strand.children_dataset.  Now that we are no longer using
cached children by default, it's better to do these checks
in the database intead of in Ruby.  These places deserve careful
review:

* Prog::Minio::MinioServerNexus#unavailable
* Prog::Postgres::PostgresResourceNexus#wait
* Prog::Postgres::PostgresServerNexus#unavailable

For Prog::Vnet::LoadBalancerNexus#wait_update_vm_load_balancers,
I removed a check on the children completely. It was checking
for an exitval using children_dataset directly after reap,
which should only be true if there was still an active lease
for the child.  This also deserves careful review.

This broke many mocked tests.  This fixes the mocked tests
to use database-backed objects, ensuring that we are testing
observable behavior and not implementation details.
2025-06-26 03:49:53 +09:00

263 lines
8.0 KiB
Ruby

# frozen_string_literal: true
class Prog::Vnet::SubnetNexus < Prog::Base
subject_is :private_subnet
def self.assemble(project_id, name: nil, location_id: Location::HETZNER_FSN1_ID, ipv6_range: nil, ipv4_range: nil, allow_only_ssh: false, firewall_id: nil)
unless (project = Project[project_id])
fail "No existing project"
end
unless (location = Location[location_id])
fail "No existing location"
end
if allow_only_ssh && firewall_id
fail "Cannot specify both allow_only_ssh and firewall_id"
end
ubid = PrivateSubnet.generate_ubid
name ||= PrivateSubnet.ubid_to_name(ubid)
Validation.validate_name(name)
ipv6_range ||= random_private_ipv6(location, project).to_s
ipv4_range ||= random_private_ipv4(location, project).to_s
DB.transaction do
ps = PrivateSubnet.create(name: name, location_id: location.id, net6: ipv6_range, net4: ipv4_range, state: "waiting", project_id:) { it.id = ubid.to_uuid }
firewall_dataset = project.firewalls_dataset.where(location_id:)
if firewall_id
unless (firewall = firewall_dataset.first(Sequel[:firewall][:id] => firewall_id))
fail "Firewall with id #{firewall_id} and location #{location.name} does not exist"
end
else
port_range = allow_only_ssh ? 22..22 : 0..65535
fw_name = "#{name[0, 55]}-default"
# As is typical when checking before inserting, there is a race condition here with
# a user concurrently manually creating a firewall with the same name. However,
# the worst case scenario is a bogus error message, and the user could try creating
# the private subnet again.
unless firewall_dataset.where(Sequel[:firewall][:name] => fw_name).empty?
fw_name = "#{name[0, 47]}-default-#{Array.new(7) { UBID.from_base32(rand(32)) }.join}"
end
firewall = Firewall.create(name: fw_name, location_id: location.id, project_id:)
["0.0.0.0/0", "::/0"].each { |cidr| FirewallRule.create_with_id(firewall_id: firewall.id, cidr: cidr, port_range: Sequel.pg_range(port_range)) }
end
firewall.associate_with_private_subnet(ps, apply_firewalls: false)
Strand.create(prog: "Vnet::SubnetNexus", label: "start") { it.id = ubid.to_uuid }
end
end
def before_run
when_destroy_set? do
if strand.label != "destroy"
register_deadline(nil, 10 * 60)
hop_destroy
end
end
end
label def start
if private_subnet.location.provider == "aws"
PrivateSubnetAwsResource.create { it.id = private_subnet.id } unless private_subnet.private_subnet_aws_resource
bud Prog::Aws::Vpc, {"subject_id" => private_subnet.id}, :create_vpc
hop_wait_vpc_created
else
hop_wait
end
end
label def wait_vpc_created
reap(:wait, nap: 2)
end
label def wait
when_refresh_keys_set? do
private_subnet.update(state: "refreshing_keys")
hop_refresh_keys
end
when_add_new_nic_set? do
private_subnet.update(state: "adding_new_nic")
hop_add_new_nic
end
when_update_firewall_rules_set? do
private_subnet.vms.map(&:incr_update_firewall_rules)
decr_update_firewall_rules
end
if private_subnet.last_rekey_at < Time.now - 60 * 60 * 24
private_subnet.incr_refresh_keys
end
nap 10 * 60
end
def gen_encryption_key
"0x" + SecureRandom.bytes(36).unpack1("H*")
end
def gen_spi
"0x" + SecureRandom.bytes(4).unpack1("H*")
end
def gen_reqid
SecureRandom.random_number(100000) + 1
end
label def add_new_nic
register_deadline("wait", 3 * 60)
nics_snap = nics_to_rekey
nap 10 if nics_snap.any? { |nic| nic.lock_set? }
nics_snap.each do |nic|
nic.update(encryption_key: gen_encryption_key, rekey_payload: {spi4: gen_spi, spi6: gen_spi, reqid: gen_reqid})
nic.incr_start_rekey
nic.incr_lock
private_subnet.create_tunnels(nics_snap, nic)
end
decr_add_new_nic
hop_wait_inbound_setup
end
label def refresh_keys
decr_refresh_keys
nics = active_nics
nap 10 if nics.any? { |nic| nic.lock_set? }
nics.each do |nic|
nic.update(encryption_key: gen_encryption_key, rekey_payload: {spi4: gen_spi, spi6: gen_spi, reqid: gen_reqid})
nic.incr_start_rekey
nic.incr_lock
end
hop_wait_inbound_setup
end
label def wait_inbound_setup
nics = rekeying_nics
if nics.all? { |nic| nic.strand.label == "wait_rekey_outbound_trigger" }
nics.each(&:incr_trigger_outbound_update)
hop_wait_outbound_setup
end
nap 5
end
label def wait_outbound_setup
nics = rekeying_nics
if nics.all? { |nic| nic.strand.label == "wait_rekey_old_state_drop_trigger" }
nics.each(&:incr_old_state_drop_trigger)
hop_wait_old_state_drop
end
nap 5
end
label def wait_old_state_drop
nics = rekeying_nics
if nics.all? { |nic| nic.strand.label == "wait" }
private_subnet.update(state: "waiting", last_rekey_at: Time.now)
nics.each do |nic|
nic.update(encryption_key: nil, rekey_payload: nil)
nic.unlock
end
hop_wait
end
nap 5
end
label def destroy
if private_subnet.nics.any? { |n| !n.vm_id.nil? }
register_deadline(nil, 10 * 60, allow_extension: true) if private_subnet.nics.any? { |n| n.vm&.prevent_destroy_set? }
Clog.emit("Cannot destroy subnet with active nics, first clean up the attached resources") { private_subnet }
nap 5
end
decr_destroy
strand.children.each { it.destroy }
if private_subnet.location.provider == "aws"
private_subnet.nics.map(&:incr_destroy)
private_subnet.firewalls.map(&:destroy)
bud Prog::Aws::Vpc, {"subject_id" => private_subnet.id}, :destroy
hop_wait_aws_vpc_destroyed
end
private_subnet.firewalls.map { it.disassociate_from_private_subnet(private_subnet, apply_firewalls: false) }
private_subnet.connected_subnets.each do |subnet|
private_subnet.disconnect_subnet(subnet)
end
if private_subnet.nics.empty? && private_subnet.load_balancers.empty?
private_subnet.destroy
pop "subnet destroyed"
else
private_subnet.nics.map { |n| n.incr_destroy }
private_subnet.load_balancers.map { |lb| lb.incr_destroy }
nap 1
end
end
label def wait_aws_vpc_destroyed
reap(nap: 10) do
private_subnet.private_subnet_aws_resource.destroy
private_subnet.destroy
pop "vpc destroyed"
end
end
def self.random_private_ipv6(location, project)
network_address = NetAddr::IPv6.new((SecureRandom.bytes(7) + 0xfd.chr).unpack1("Q<") << 64)
network_mask = NetAddr::Mask128.new(64)
selected_addr = NetAddr::IPv6Net.new(network_address, network_mask)
selected_addr = random_private_ipv6(location, project) if project.private_subnets_dataset[net6: selected_addr.to_s, location_id: location.id]
selected_addr
end
def self.random_private_ipv4(location, project, cidr_size = 26)
raise ArgumentError, "CIDR size must be between 0 and 32" unless cidr_size.between?(0, 32)
private_range = PrivateSubnet.random_subnet
addr = NetAddr::IPv4Net.parse(private_range)
selected_addr = if addr.netmask.prefix_len < cidr_size
addr.nth_subnet(cidr_size, SecureRandom.random_number(2**(cidr_size - addr.netmask.prefix_len) - 1).to_i + 1)
else
random_private_ipv4(location, project, cidr_size)
end
selected_addr = random_private_ipv4(location, project, cidr_size) if PrivateSubnet::BANNED_IPV4_SUBNETS.any? { it.rel(selected_addr) } || project.private_subnets_dataset[net4: selected_addr.to_s, location_id: location.id]
selected_addr
end
def active_nics
nics_with_strand_label("wait").all
end
def nics_to_rekey
nics_with_strand_label(%w[wait wait_setup]).all
end
def rekeying_nics
all_connected_nics.eager(:strand).exclude(rekey_payload: nil).all
end
private
def all_connected_nics
private_subnet.find_all_connected_nics
end
def nics_with_strand_label(label)
all_connected_nics.join(:strand, {id: :id, label:}).select_all(:nic)
end
end