Files
ubicloud/prog/test/firewall_rules.rb
Jeremy Evans e6b7e5e879 Change rubocop TargetRubyVersion to 3.4
Disable Style/RedundantLineContinuation, as it incorrectly removes
line continutations in rhizome/host/lib/vm_setup.rb that are not
redundant.

All code changes are for _1 => it in blocks.
2025-04-26 06:51:19 +09:00

189 lines
7.0 KiB
Ruby

# frozen_string_literal: true
require "net/http"
require "uri"
class Prog::Test::FirewallRules < Prog::Test::Base
subject_is :firewall
label def start
vms = [vm1, vm2, vm_outside]
vms.each do |vm|
vm.sshable.cmd("sudo yum install -y nc") if vm.boot_image.include?("almalinux")
vm.sshable.cmd("sudo apt-get update && sudo apt-get install -y netcat-openbsd") if vm.boot_image.include?("debian")
end
vm1.sshable.cmd("echo '[Unit]
Description=A lightweight port 8080 listener
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/nc -l 8080
' | sudo tee /etc/systemd/system/listening_ipv4.service > /dev/null")
vm1.sshable.cmd("echo '[Unit]
Description=A lightweight port 8080 listener
After=network.target
[Service]
Type=simple
ExecStart=nc -l 8080 -6
' | sudo tee /etc/systemd/system/listening_ipv6.service > /dev/null")
vm1.sshable.cmd("sudo systemctl daemon-reload")
vm1.sshable.cmd("sudo systemctl enable listening_ipv4.service")
vm1.sshable.cmd("sudo systemctl enable listening_ipv6.service")
update_stack({
"vm_to_be_connected_id" => vm1.id
})
hop_perform_tests_none
end
label def perform_tests_none
update_firewall_rules(config: :perform_tests_none) unless frame["firewalls"] == "none"
update_stack({
"firewalls" => "none"
})
if firewall.private_subnets.first.update_firewall_rules_set? || firewall.private_subnets.first.vms.any? { |vm| vm.update_firewall_rules_set? }
nap 5
end
vm1.sshable.cmd("true")
vm2.sshable.cmd("true")
vm1.sshable.cmd("ping -c 2 google.com")
vm2.sshable.cmd("ping -c 2 google.com")
vm1.sshable.cmd("sudo systemctl start listening_ipv4.service")
test_connection(vm1.ephemeral_net4, vm2, should_fail: true, ipv4: true, hop_method_symbol: :hop_perform_tests_public_ipv4)
fail_test "#{vm2.inhost_name} should not be able to connect to #{vm1.inhost_name} on port 8080"
end
label def perform_tests_public_ipv4
update_firewall_rules(config: :perform_tests_public_ipv4) unless frame["firewalls"] == "public_ipv4"
update_stack({
"firewalls" => "public_ipv4"
})
if firewall.private_subnets.first.update_firewall_rules_set? || firewall.private_subnets.first.vms.any? { |vm| vm.update_firewall_rules_set? }
nap 5
end
test_connection(vm1.ephemeral_net4, vm2, should_fail: false, ipv4: true, hop_method_symbol: nil)
test_connection(vm1.ephemeral_net4, vm_outside, should_fail: true, ipv4: true, hop_method_symbol: :hop_perform_tests_public_ipv6)
fail_test "#{vm_outside.inhost_name} should not be able to connect to #{vm1.inhost_name} on port 8080"
end
label def perform_tests_public_ipv6
update_firewall_rules(config: :perform_tests_public_ipv6) unless frame["firewalls"] == "public_ipv6"
update_stack({
"firewalls" => "public_ipv6"
})
if firewall.private_subnets.first.update_firewall_rules_set? || firewall.private_subnets.first.vms.any? { |vm| vm.update_firewall_rules_set? }
nap 5
end
vm1.sshable.cmd("sudo systemctl stop listening_ipv4.service")
vm1.sshable.cmd("sudo systemctl start listening_ipv6.service")
test_connection(vm1.ephemeral_net6.nth(2), vm2, should_fail: false, ipv4: false, hop_method_symbol: nil)
test_connection(vm1.ephemeral_net6.nth(2), vm_outside, should_fail: true, ipv4: false, hop_method_symbol: :hop_perform_tests_private_ipv4)
fail_test "#{vm_outside.inhost_name} should not be able to connect to #{vm1.ephemeral_net6.nth(2)} on port 8080"
end
label def perform_tests_private_ipv4
update_firewall_rules(config: :perform_tests_private_ipv4) unless frame["firewalls"] == "private_ipv4"
update_stack({
"firewalls" => "private_ipv4"
})
if firewall.private_subnets.first.update_firewall_rules_set? || firewall.private_subnets.first.vms.any? { |vm| vm.update_firewall_rules_set? }
nap 5
end
vm1.sshable.cmd("sudo systemctl stop listening_ipv6.service")
vm1.sshable.cmd("sudo systemctl start listening_ipv4.service")
test_connection(vm1.nics.first.private_ipv4.nth(0), vm2, should_fail: false, ipv4: true, hop_method_symbol: nil)
test_connection(vm1.ephemeral_net4, vm_outside, should_fail: true, ipv4: true, hop_method_symbol: :hop_perform_tests_private_ipv6)
fail_test "#{vm_outside.inhost_name} should not be able to connect to #{vm1.nics.first.private_ipv4.nth(0)} on port 8080"
end
label def perform_tests_private_ipv6
update_firewall_rules(config: :perform_tests_private_ipv6) unless frame["firewalls"] == "private_ipv6"
update_stack({
"firewalls" => "private_ipv6"
})
if firewall.private_subnets.first.update_firewall_rules_set? || firewall.private_subnets.first.vms.any? { |vm| vm.update_firewall_rules_set? }
nap 5
end
vm1.sshable.cmd("sudo systemctl stop listening_ipv4.service")
vm1.sshable.cmd("sudo systemctl start listening_ipv6.service")
test_connection(vm1.private_ipv6, vm2, should_fail: false, ipv4: false, hop_method_symbol: nil)
test_connection(vm1.ephemeral_net6.nth(2), vm2, should_fail: true, ipv4: false, hop_method_symbol: :hop_finish)
fail_test "#{vm2.inhost_name} should not be able to connect to #{vm1.ephemeral_net6.nth(2)} on port 8080"
end
label def finish
pop "Verified Firewall Rules!"
end
label def failed
nap 15
end
def update_firewall_rules(config: nil)
uri = URI("https://api.ipify.org")
my_ip = Net::HTTP.get(uri)
firewall_rules = [
{cidr: "#{my_ip}/32", port_range: Sequel.pg_range(22..22)}
]
firewall_rules << case config
when :perform_tests_none
nil
when :perform_tests_public_ipv4
{cidr: vm2.ephemeral_net4.to_s, port_range: Sequel.pg_range(8080..8080)}
when :perform_tests_public_ipv6
{cidr: vm2.ephemeral_net6.nth(2).to_s, port_range: Sequel.pg_range(8080..8080)}
when :perform_tests_private_ipv4
{cidr: vm2.nics.first.private_ipv4.to_s, port_range: Sequel.pg_range(8080..8080)}
when :perform_tests_private_ipv6
{cidr: vm2.private_ipv6.to_s, port_range: Sequel.pg_range(8080..8080)}
else
raise "Unknown config: #{config}"
end
firewall.replace_firewall_rules(firewall_rules.compact)
end
def vm1
connected_id = frame["vm_to_be_connected_id"]
@vm1 ||= if connected_id
firewall.private_subnets.first.vms.find { |vm| vm.id == connected_id }
else
firewall.private_subnets.first.vms.first
end
end
def vm2
@vm2 ||= firewall.private_subnets.first.vms.reject { it.id == vm1.id }.first
end
def vm_outside
@vm_outside ||= PrivateSubnet.find { |ps| ps.id != vm1.private_subnets.first.id }.vms.first
end
def test_connection(to_connect_ip, connecting, should_fail: false, ipv4: true, hop_method_symbol: nil)
test_version_arg = ipv4 ? "" : "-6"
connecting.sshable.cmd("nc -zvw 1 #{to_connect_ip} 8080 #{test_version_arg}")
rescue
send(hop_method_symbol) if should_fail
fail_test "#{connecting.inhost_name} should be able to connect to #{to_connect_ip} on port 8080"
end
end