Files
ubicloud/prog/ai/inference_router_replica_nexus.rb
Jeremy Evans 15b91ff0c4 Absorb leaf into reap
Reviewing leaf usage in progs, it always occurs right after reap.
Combining leaf and reap methods avoids a redundant query for the
strand's children.

It's typical for nap or donate to be called after the leaf check
after reap.  Also build this into reap, by calling donate by default,
or nap if a nap keyword argument is given.

There are a few cases where reap was called without leaf/donate.
Add a fallthrough keyword argument to support this, so if there are
no children, it does not call either nap or donate

Vm::HostNexus#wait_prep and Kubernetes::UpgradeKubernetesNode#wait_new_node
both need the return value of the reapable child(ren). Add a reaper
keyword argument for this, which is called once for each child.

The most common pattern for using reap/leaf/donate was:

```ruby
reap
hop_download_lb_cert if leaf?
donate
```

This turns into:

```ruby
reap(:download_lb_cert)
```

The second most common pattern was:

```ruby
reap
donate unless leaf?
pop "upgrade cancelled" # or other code
```

This turns into:

```ruby
reap { pop "upgrade cancelled" }
```

In a few places, I changed operations on strand.children to
strand.children_dataset.  Now that we are no longer using
cached children by default, it's better to do these checks
in the database intead of in Ruby.  These places deserve careful
review:

* Prog::Minio::MinioServerNexus#unavailable
* Prog::Postgres::PostgresResourceNexus#wait
* Prog::Postgres::PostgresServerNexus#unavailable

For Prog::Vnet::LoadBalancerNexus#wait_update_vm_load_balancers,
I removed a check on the children completely. It was checking
for an exitval using children_dataset directly after reap,
which should only be true if there was still an active lease
for the child.  This also deserves careful review.

This broke many mocked tests.  This fixes the mocked tests
to use database-backed objects, ensuring that we are testing
observable behavior and not implementation details.
2025-06-26 03:49:53 +09:00

347 lines
12 KiB
Ruby

# frozen_string_literal: true
require "digest"
require "excon"
require "forwardable"
require_relative "../../lib/util"
class Prog::Ai::InferenceRouterReplicaNexus < Prog::Base
subject_is :inference_router_replica
extend Forwardable
def_delegators :inference_router_replica, :vm, :inference_router, :load_balancer_vm_port
def self.assemble(inference_router_id)
ubid = InferenceRouterReplica.generate_ubid
DB.transaction do
inference_router = InferenceRouter[inference_router_id]
vm_st = Prog::Vm::Nexus.assemble_with_sshable(
Config.inference_endpoint_service_project_id,
sshable_unix_user: "ubi",
location_id: inference_router.location_id,
name: ubid.to_s,
size: inference_router.vm_size,
boot_image: "ubuntu-noble",
private_subnet_id: inference_router.load_balancer.private_subnet.id,
enable_ip4: true
)
inference_router.load_balancer.add_vm(vm_st.subject)
replica = InferenceRouterReplica.create(
inference_router_id: inference_router_id,
vm_id: vm_st.id
) { it.id = ubid.to_uuid }
Strand.create(prog: "Ai::InferenceRouterReplicaNexus", label: "start") { it.id = replica.id }
end
end
def before_run
when_destroy_set? do
if strand.label != "destroy"
hop_destroy
elsif strand.stack.count > 1
pop "operation is cancelled due to the destruction of the inference router replica"
end
end
end
label def start
nap 5 unless vm.strand.label == "wait"
hop_bootstrap_rhizome
end
label def bootstrap_rhizome
register_deadline("wait", 15 * 60)
bud Prog::BootstrapRhizome, {"target_folder" => "inference_router", "subject_id" => vm.id, "user" => "ubi"}
hop_wait_bootstrap_rhizome
end
label def wait_bootstrap_rhizome
reap(:setup)
end
label def setup
update_config
workdir = "/ir/workdir"
release_tag = Config.inference_router_release_tag
access_token = Config.inference_router_access_token
asset_name = "inference-router-#{release_tag}-x86_64-unknown-linux-gnu"
vm.sshable.cmd("id -u inference-router >/dev/null 2>&1 || sudo useradd --system --no-create-home --shell /usr/sbin/nologin inference-router")
vm.sshable.cmd("sudo chown -R inference-router:inference-router #{workdir}")
vm.sshable.cmd("sudo wget -O #{workdir}/fetch_linux_amd64 https://github.com/gruntwork-io/fetch/releases/download/v0.4.6/fetch_linux_amd64")
vm.sshable.cmd("sudo chmod +x #{workdir}/fetch_linux_amd64")
vm.sshable.cmd("sudo #{workdir}/fetch_linux_amd64 --github-oauth-token=\"#{access_token}\" --repo=\"https://github.com/ubicloud/inference-router\" --tag=\"#{release_tag}\" --release-asset=\"inference-router-*\" #{workdir}/")
vm.sshable.cmd("sudo tar -xzf #{workdir}/#{asset_name}.tar.gz -C #{workdir}")
write_inference_router_service(asset_name, workdir)
vm.sshable.cmd("sudo systemctl daemon-reload")
vm.sshable.cmd("sudo systemctl enable --now inference-router")
hop_wait_router_up
end
def write_inference_router_service(asset_name, workdir)
service_definition = <<~SERVICE
[Unit]
Description=Inference Router
After=network.target
[Service]
Type=simple
User=inference-router
Group=inference-router
WorkingDirectory=#{workdir}
Environment=RUST_BACKTRACE=1
Environment=RUST_LOG=INFO
ExecStart=#{workdir}/#{asset_name}/inference-router -c=#{workdir}/config.json
KillSignal=SIGINT
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
# File system and device restrictions
ReadOnlyPaths=/
ReadWritePaths=/ir/workdir
PrivateTmp=yes
PrivateMounts=yes
# User management
SupplementaryGroups=
# Kernel and system protections
ProtectKernelTunables=yes
ProtectKernelModules=yes
ProtectKernelLogs=yes
ProtectClock=yes
ProtectHostname=yes
ProtectControlGroups=yes
# Execution environment restrictions
NoNewPrivileges=yes
RestrictNamespaces=yes
RestrictRealtime=yes
RestrictSUIDSGID=yes
LockPersonality=yes
# Network restrictions
PrivateNetwork=no
# Additional hardening
KeyringMode=private
ProtectHome=yes
DynamicUser=yes
PrivateUsers=yes
CapabilityBoundingSet=
SystemCallFilter=@system-service
SystemCallFilter=~@privileged @resources @mount @debug @cpu-emulation @obsolete @raw-io @reboot @swap
SystemCallArchitectures=native
ProtectSystem=strict
DeviceAllow=
MemoryDenyWriteExecute=true
RemoveIPC=true
UMask=0077
# Resource limits
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
SERVICE
vm.sshable.cmd <<~CMD
sudo tee /etc/systemd/system/inference-router.service > /dev/null << 'EOF'
#{service_definition}
EOF
CMD
end
label def wait_router_up
hop_wait if available?
nap 5
end
label def wait
hop_unavailable unless available?
ping_inference_router
nap 120
end
label def destroy
decr_destroy
resolve_page
strand.children.each { it.destroy }
inference_router.load_balancer.evacuate_vm(vm)
inference_router.load_balancer.remove_vm(vm)
vm.incr_destroy
inference_router_replica.destroy
pop "inference router replica is deleted"
end
label def unavailable
if available?
resolve_page
hop_wait
end
create_page unless inference_router.maintenance_set?
nap 30
end
def available?
load_balancer_vm_port.reload.state == "up"
end
def create_page
extra_data = {
inference_router_ubid: inference_router.ubid,
inference_router_location: inference_router.location.name,
inference_router_name: inference_router.name,
inference_router_replica_count: inference_router.replica_count,
load_balancer_ubid: inference_router.load_balancer.ubid,
private_subnet_ubid: inference_router.load_balancer.private_subnet.ubid,
replica_ubid: inference_router_replica.ubid,
vm_ubid: vm.ubid,
vm_ip: vm.sshable.host,
vm_host_ubid: vm.vm_host.ubid,
vm_host_ip: vm.vm_host.sshable.host
}
Prog::PageNexus.assemble("Replica #{inference_router_replica.ubid.to_s[0..7]} of inference router #{inference_router.name} is unavailable",
["InferenceRouterReplicaUnavailable", inference_router_replica.ubid],
inference_router_replica.ubid, severity: "warning", extra_data:)
end
def resolve_page
Page.from_tag_parts("InferenceRouterReplicaUnavailable", inference_router_replica.ubid)&.incr_resolve
end
def update_config
api_key_ds = DB[:api_key].where(
owner_table: "project",
used_for: "inference_endpoint",
is_valid: true,
owner_id: Sequel[:project][:id]
).exists
eligible_projects_ds = Project.where(api_key_ds).order(:id)
free_quota_exhausted_projects_ds = FreeQuota.get_exhausted_projects("inference-tokens")
valid_payment_method_ds = DB[:payment_method]
.where(fraud: false)
.select_group(:billing_info_id)
.select_append { Sequel.as(Sequel.lit("1"), :valid_payment_method) }
eligible_projects_ds = eligible_projects_ds
.left_outer_join(valid_payment_method_ds, [:billing_info_id])
.exclude(valid_payment_method: nil, credit: 0.0, id: free_quota_exhausted_projects_ds)
eligible_projects = eligible_projects_ds.all
.select(&:active?)
.map do
{
ubid: it.ubid,
api_keys: it.api_keys
.select { |k| k.used_for == "inference_endpoint" && k.is_valid }
.sort_by { |k| k.id }
.map { |k| Digest::SHA2.hexdigest(k.key) }
}
end
targets = InferenceRouterTarget.order(:created_at).all.group_by(&:inference_router_model)
routes = targets.map do |inference_router_model, targets_for_model|
{
model_name: inference_router_model.model_name,
project_inflight_limit: inference_router_model.project_inflight_limit,
project_prompt_tps_limit: inference_router_model.project_prompt_tps_limit,
project_completion_tps_limit: inference_router_model.project_completion_tps_limit,
endpoints: targets_for_model
.group_by { |target| target.priority }
.sort_by { |priority, _| priority }
.map do |_, targets|
targets
.select { |target| target.enabled }
.map do |target|
target.values.slice(:host, :inflight_limit)
.merge(id: target.name, api_key: target.api_key)
.merge(target.extra_configs)
end
end
}
end
new_config = {
certificate: {
cert: inference_router.load_balancer.active_cert.cert,
key: OpenSSL::PKey.read(inference_router.load_balancer.active_cert.csr_key).to_pem
},
projects: eligible_projects,
routes: routes
}
new_config = new_config.merge(JSON.parse(File.read("config/inference_router_config.json")))
new_config_json = JSON.generate(new_config)
new_md5 = Digest::MD5.hexdigest(new_config_json)
config_path = "/ir/workdir/config.json"
current_md5 = vm.sshable.cmd("md5sum #{config_path} | awk '{ print $1 }'").strip
if current_md5 != new_md5
# print("new_md5: #{new_md5}, current_md5: #{current_md5}\n") # Uncomment for obtaining md5 for testing.
vm.sshable.cmd("sudo mkdir -p /ir/workdir && sudo tee #{config_path} > /dev/null", stdin: new_config_json)
vm.sshable.cmd("sudo pkill -f -HUP inference-router")
Clog.emit("Configuration updated successfully.")
end
end
# pushes latest config to inference router and collects billing information
def ping_inference_router
update_config
usage_response = vm.sshable.cmd("curl -k -m 10 --no-progress-meter https://localhost:8080/usage")
project_usage = JSON.parse(usage_response)
Clog.emit("Successfully pinged inference router.") { {inference_router: inference_router.ubid, replica: inference_router_replica.ubid, project_usage: project_usage} }
update_billing_records(project_usage, "prompt_billing_resource", "prompt_token_count")
update_billing_records(project_usage, "completion_billing_resource", "completion_token_count")
end
def update_billing_records(project_usage, billing_resource_key, usage_key)
begin_time = Date.today.to_time
end_time = (Date.today + 1).to_time
project_usage.each do |usage|
model = InferenceRouterModel.from_model_name(usage["model_name"])
resource_family = model[billing_resource_key.to_sym]
rate = BillingRate.from_resource_properties("InferenceTokens", resource_family, "global")
next if rate["unit_price"].zero?
rate_id = rate["id"]
tokens = usage[usage_key]
next if tokens.zero?
project = Project[id: UBID.to_uuid(usage["ubid"])]
begin
today_record = BillingRecord
.where(project_id: project.id, resource_id: inference_router.id, billing_rate_id: rate_id)
.where { Sequel.pg_range(it.span).overlaps(Sequel.pg_range(begin_time...end_time)) }
.first
if today_record
today_record.this.update(amount: Sequel[:amount] + tokens)
else
BillingRecord.create_with_id(
project_id: project.id,
resource_id: inference_router.id,
resource_name: "#{resource_family} #{begin_time.strftime("%Y-%m-%d")}",
billing_rate_id: rate_id,
span: Sequel.pg_range(begin_time...end_time),
amount: tokens
)
end
rescue Sequel::Error => ex
Clog.emit("Failed to update billing record") { {billing_record_update_error: {project_ubid: project.ubid, replica_ubid: inference_router_replica.ubid, tokens: tokens, exception: Util.exception_to_hash(ex)}} }
end
end
end
end