Reviewing leaf usage in progs, it always occurs right after reap. Combining leaf and reap methods avoids a redundant query for the strand's children. It's typical for nap or donate to be called after the leaf check after reap. Also build this into reap, by calling donate by default, or nap if a nap keyword argument is given. There are a few cases where reap was called without leaf/donate. Add a fallthrough keyword argument to support this, so if there are no children, it does not call either nap or donate Vm::HostNexus#wait_prep and Kubernetes::UpgradeKubernetesNode#wait_new_node both need the return value of the reapable child(ren). Add a reaper keyword argument for this, which is called once for each child. The most common pattern for using reap/leaf/donate was: ```ruby reap hop_download_lb_cert if leaf? donate ``` This turns into: ```ruby reap(:download_lb_cert) ``` The second most common pattern was: ```ruby reap donate unless leaf? pop "upgrade cancelled" # or other code ``` This turns into: ```ruby reap { pop "upgrade cancelled" } ``` In a few places, I changed operations on strand.children to strand.children_dataset. Now that we are no longer using cached children by default, it's better to do these checks in the database intead of in Ruby. These places deserve careful review: * Prog::Minio::MinioServerNexus#unavailable * Prog::Postgres::PostgresResourceNexus#wait * Prog::Postgres::PostgresServerNexus#unavailable For Prog::Vnet::LoadBalancerNexus#wait_update_vm_load_balancers, I removed a check on the children completely. It was checking for an exitval using children_dataset directly after reap, which should only be true if there was still an active lease for the child. This also deserves careful review. This broke many mocked tests. This fixes the mocked tests to use database-backed objects, ensuring that we are testing observable behavior and not implementation details.
347 lines
12 KiB
Ruby
347 lines
12 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "digest"
|
|
require "excon"
|
|
require "forwardable"
|
|
|
|
require_relative "../../lib/util"
|
|
|
|
class Prog::Ai::InferenceRouterReplicaNexus < Prog::Base
|
|
subject_is :inference_router_replica
|
|
|
|
extend Forwardable
|
|
def_delegators :inference_router_replica, :vm, :inference_router, :load_balancer_vm_port
|
|
|
|
def self.assemble(inference_router_id)
|
|
ubid = InferenceRouterReplica.generate_ubid
|
|
DB.transaction do
|
|
inference_router = InferenceRouter[inference_router_id]
|
|
vm_st = Prog::Vm::Nexus.assemble_with_sshable(
|
|
Config.inference_endpoint_service_project_id,
|
|
sshable_unix_user: "ubi",
|
|
location_id: inference_router.location_id,
|
|
name: ubid.to_s,
|
|
size: inference_router.vm_size,
|
|
boot_image: "ubuntu-noble",
|
|
private_subnet_id: inference_router.load_balancer.private_subnet.id,
|
|
enable_ip4: true
|
|
)
|
|
|
|
inference_router.load_balancer.add_vm(vm_st.subject)
|
|
|
|
replica = InferenceRouterReplica.create(
|
|
inference_router_id: inference_router_id,
|
|
vm_id: vm_st.id
|
|
) { it.id = ubid.to_uuid }
|
|
|
|
Strand.create(prog: "Ai::InferenceRouterReplicaNexus", label: "start") { it.id = replica.id }
|
|
end
|
|
end
|
|
|
|
def before_run
|
|
when_destroy_set? do
|
|
if strand.label != "destroy"
|
|
hop_destroy
|
|
elsif strand.stack.count > 1
|
|
pop "operation is cancelled due to the destruction of the inference router replica"
|
|
end
|
|
end
|
|
end
|
|
|
|
label def start
|
|
nap 5 unless vm.strand.label == "wait"
|
|
|
|
hop_bootstrap_rhizome
|
|
end
|
|
|
|
label def bootstrap_rhizome
|
|
register_deadline("wait", 15 * 60)
|
|
|
|
bud Prog::BootstrapRhizome, {"target_folder" => "inference_router", "subject_id" => vm.id, "user" => "ubi"}
|
|
hop_wait_bootstrap_rhizome
|
|
end
|
|
|
|
label def wait_bootstrap_rhizome
|
|
reap(:setup)
|
|
end
|
|
|
|
label def setup
|
|
update_config
|
|
|
|
workdir = "/ir/workdir"
|
|
release_tag = Config.inference_router_release_tag
|
|
access_token = Config.inference_router_access_token
|
|
asset_name = "inference-router-#{release_tag}-x86_64-unknown-linux-gnu"
|
|
vm.sshable.cmd("id -u inference-router >/dev/null 2>&1 || sudo useradd --system --no-create-home --shell /usr/sbin/nologin inference-router")
|
|
vm.sshable.cmd("sudo chown -R inference-router:inference-router #{workdir}")
|
|
vm.sshable.cmd("sudo wget -O #{workdir}/fetch_linux_amd64 https://github.com/gruntwork-io/fetch/releases/download/v0.4.6/fetch_linux_amd64")
|
|
vm.sshable.cmd("sudo chmod +x #{workdir}/fetch_linux_amd64")
|
|
vm.sshable.cmd("sudo #{workdir}/fetch_linux_amd64 --github-oauth-token=\"#{access_token}\" --repo=\"https://github.com/ubicloud/inference-router\" --tag=\"#{release_tag}\" --release-asset=\"inference-router-*\" #{workdir}/")
|
|
vm.sshable.cmd("sudo tar -xzf #{workdir}/#{asset_name}.tar.gz -C #{workdir}")
|
|
write_inference_router_service(asset_name, workdir)
|
|
vm.sshable.cmd("sudo systemctl daemon-reload")
|
|
vm.sshable.cmd("sudo systemctl enable --now inference-router")
|
|
|
|
hop_wait_router_up
|
|
end
|
|
|
|
def write_inference_router_service(asset_name, workdir)
|
|
service_definition = <<~SERVICE
|
|
[Unit]
|
|
Description=Inference Router
|
|
After=network.target
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=inference-router
|
|
Group=inference-router
|
|
WorkingDirectory=#{workdir}
|
|
Environment=RUST_BACKTRACE=1
|
|
Environment=RUST_LOG=INFO
|
|
ExecStart=#{workdir}/#{asset_name}/inference-router -c=#{workdir}/config.json
|
|
KillSignal=SIGINT
|
|
Restart=always
|
|
RestartSec=5
|
|
StandardOutput=journal
|
|
StandardError=journal
|
|
|
|
# File system and device restrictions
|
|
ReadOnlyPaths=/
|
|
ReadWritePaths=/ir/workdir
|
|
PrivateTmp=yes
|
|
PrivateMounts=yes
|
|
|
|
# User management
|
|
SupplementaryGroups=
|
|
|
|
# Kernel and system protections
|
|
ProtectKernelTunables=yes
|
|
ProtectKernelModules=yes
|
|
ProtectKernelLogs=yes
|
|
ProtectClock=yes
|
|
ProtectHostname=yes
|
|
ProtectControlGroups=yes
|
|
|
|
# Execution environment restrictions
|
|
NoNewPrivileges=yes
|
|
RestrictNamespaces=yes
|
|
RestrictRealtime=yes
|
|
RestrictSUIDSGID=yes
|
|
LockPersonality=yes
|
|
|
|
# Network restrictions
|
|
PrivateNetwork=no
|
|
|
|
# Additional hardening
|
|
KeyringMode=private
|
|
ProtectHome=yes
|
|
DynamicUser=yes
|
|
PrivateUsers=yes
|
|
CapabilityBoundingSet=
|
|
SystemCallFilter=@system-service
|
|
SystemCallFilter=~@privileged @resources @mount @debug @cpu-emulation @obsolete @raw-io @reboot @swap
|
|
SystemCallArchitectures=native
|
|
ProtectSystem=strict
|
|
DeviceAllow=
|
|
MemoryDenyWriteExecute=true
|
|
RemoveIPC=true
|
|
UMask=0077
|
|
|
|
# Resource limits
|
|
LimitNOFILE=65536
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
SERVICE
|
|
|
|
vm.sshable.cmd <<~CMD
|
|
sudo tee /etc/systemd/system/inference-router.service > /dev/null << 'EOF'
|
|
#{service_definition}
|
|
EOF
|
|
CMD
|
|
end
|
|
|
|
label def wait_router_up
|
|
hop_wait if available?
|
|
|
|
nap 5
|
|
end
|
|
|
|
label def wait
|
|
hop_unavailable unless available?
|
|
ping_inference_router
|
|
|
|
nap 120
|
|
end
|
|
|
|
label def destroy
|
|
decr_destroy
|
|
|
|
resolve_page
|
|
strand.children.each { it.destroy }
|
|
inference_router.load_balancer.evacuate_vm(vm)
|
|
inference_router.load_balancer.remove_vm(vm)
|
|
vm.incr_destroy
|
|
inference_router_replica.destroy
|
|
|
|
pop "inference router replica is deleted"
|
|
end
|
|
|
|
label def unavailable
|
|
if available?
|
|
resolve_page
|
|
hop_wait
|
|
end
|
|
|
|
create_page unless inference_router.maintenance_set?
|
|
nap 30
|
|
end
|
|
|
|
def available?
|
|
load_balancer_vm_port.reload.state == "up"
|
|
end
|
|
|
|
def create_page
|
|
extra_data = {
|
|
inference_router_ubid: inference_router.ubid,
|
|
inference_router_location: inference_router.location.name,
|
|
inference_router_name: inference_router.name,
|
|
inference_router_replica_count: inference_router.replica_count,
|
|
load_balancer_ubid: inference_router.load_balancer.ubid,
|
|
private_subnet_ubid: inference_router.load_balancer.private_subnet.ubid,
|
|
replica_ubid: inference_router_replica.ubid,
|
|
vm_ubid: vm.ubid,
|
|
vm_ip: vm.sshable.host,
|
|
vm_host_ubid: vm.vm_host.ubid,
|
|
vm_host_ip: vm.vm_host.sshable.host
|
|
}
|
|
Prog::PageNexus.assemble("Replica #{inference_router_replica.ubid.to_s[0..7]} of inference router #{inference_router.name} is unavailable",
|
|
["InferenceRouterReplicaUnavailable", inference_router_replica.ubid],
|
|
inference_router_replica.ubid, severity: "warning", extra_data:)
|
|
end
|
|
|
|
def resolve_page
|
|
Page.from_tag_parts("InferenceRouterReplicaUnavailable", inference_router_replica.ubid)&.incr_resolve
|
|
end
|
|
|
|
def update_config
|
|
api_key_ds = DB[:api_key].where(
|
|
owner_table: "project",
|
|
used_for: "inference_endpoint",
|
|
is_valid: true,
|
|
owner_id: Sequel[:project][:id]
|
|
).exists
|
|
|
|
eligible_projects_ds = Project.where(api_key_ds).order(:id)
|
|
free_quota_exhausted_projects_ds = FreeQuota.get_exhausted_projects("inference-tokens")
|
|
valid_payment_method_ds = DB[:payment_method]
|
|
.where(fraud: false)
|
|
.select_group(:billing_info_id)
|
|
.select_append { Sequel.as(Sequel.lit("1"), :valid_payment_method) }
|
|
eligible_projects_ds = eligible_projects_ds
|
|
.left_outer_join(valid_payment_method_ds, [:billing_info_id])
|
|
.exclude(valid_payment_method: nil, credit: 0.0, id: free_quota_exhausted_projects_ds)
|
|
|
|
eligible_projects = eligible_projects_ds.all
|
|
.select(&:active?)
|
|
.map do
|
|
{
|
|
ubid: it.ubid,
|
|
api_keys: it.api_keys
|
|
.select { |k| k.used_for == "inference_endpoint" && k.is_valid }
|
|
.sort_by { |k| k.id }
|
|
.map { |k| Digest::SHA2.hexdigest(k.key) }
|
|
}
|
|
end
|
|
|
|
targets = InferenceRouterTarget.order(:created_at).all.group_by(&:inference_router_model)
|
|
routes = targets.map do |inference_router_model, targets_for_model|
|
|
{
|
|
model_name: inference_router_model.model_name,
|
|
project_inflight_limit: inference_router_model.project_inflight_limit,
|
|
project_prompt_tps_limit: inference_router_model.project_prompt_tps_limit,
|
|
project_completion_tps_limit: inference_router_model.project_completion_tps_limit,
|
|
endpoints: targets_for_model
|
|
.group_by { |target| target.priority }
|
|
.sort_by { |priority, _| priority }
|
|
.map do |_, targets|
|
|
targets
|
|
.select { |target| target.enabled }
|
|
.map do |target|
|
|
target.values.slice(:host, :inflight_limit)
|
|
.merge(id: target.name, api_key: target.api_key)
|
|
.merge(target.extra_configs)
|
|
end
|
|
end
|
|
}
|
|
end
|
|
new_config = {
|
|
certificate: {
|
|
cert: inference_router.load_balancer.active_cert.cert,
|
|
key: OpenSSL::PKey.read(inference_router.load_balancer.active_cert.csr_key).to_pem
|
|
},
|
|
projects: eligible_projects,
|
|
routes: routes
|
|
}
|
|
new_config = new_config.merge(JSON.parse(File.read("config/inference_router_config.json")))
|
|
new_config_json = JSON.generate(new_config)
|
|
new_md5 = Digest::MD5.hexdigest(new_config_json)
|
|
config_path = "/ir/workdir/config.json"
|
|
current_md5 = vm.sshable.cmd("md5sum #{config_path} | awk '{ print $1 }'").strip
|
|
if current_md5 != new_md5
|
|
# print("new_md5: #{new_md5}, current_md5: #{current_md5}\n") # Uncomment for obtaining md5 for testing.
|
|
vm.sshable.cmd("sudo mkdir -p /ir/workdir && sudo tee #{config_path} > /dev/null", stdin: new_config_json)
|
|
vm.sshable.cmd("sudo pkill -f -HUP inference-router")
|
|
Clog.emit("Configuration updated successfully.")
|
|
end
|
|
end
|
|
|
|
# pushes latest config to inference router and collects billing information
|
|
def ping_inference_router
|
|
update_config
|
|
usage_response = vm.sshable.cmd("curl -k -m 10 --no-progress-meter https://localhost:8080/usage")
|
|
project_usage = JSON.parse(usage_response)
|
|
Clog.emit("Successfully pinged inference router.") { {inference_router: inference_router.ubid, replica: inference_router_replica.ubid, project_usage: project_usage} }
|
|
update_billing_records(project_usage, "prompt_billing_resource", "prompt_token_count")
|
|
update_billing_records(project_usage, "completion_billing_resource", "completion_token_count")
|
|
end
|
|
|
|
def update_billing_records(project_usage, billing_resource_key, usage_key)
|
|
begin_time = Date.today.to_time
|
|
end_time = (Date.today + 1).to_time
|
|
|
|
project_usage.each do |usage|
|
|
model = InferenceRouterModel.from_model_name(usage["model_name"])
|
|
resource_family = model[billing_resource_key.to_sym]
|
|
rate = BillingRate.from_resource_properties("InferenceTokens", resource_family, "global")
|
|
next if rate["unit_price"].zero?
|
|
rate_id = rate["id"]
|
|
tokens = usage[usage_key]
|
|
next if tokens.zero?
|
|
project = Project[id: UBID.to_uuid(usage["ubid"])]
|
|
|
|
begin
|
|
today_record = BillingRecord
|
|
.where(project_id: project.id, resource_id: inference_router.id, billing_rate_id: rate_id)
|
|
.where { Sequel.pg_range(it.span).overlaps(Sequel.pg_range(begin_time...end_time)) }
|
|
.first
|
|
|
|
if today_record
|
|
today_record.this.update(amount: Sequel[:amount] + tokens)
|
|
else
|
|
BillingRecord.create_with_id(
|
|
project_id: project.id,
|
|
resource_id: inference_router.id,
|
|
resource_name: "#{resource_family} #{begin_time.strftime("%Y-%m-%d")}",
|
|
billing_rate_id: rate_id,
|
|
span: Sequel.pg_range(begin_time...end_time),
|
|
amount: tokens
|
|
)
|
|
end
|
|
rescue Sequel::Error => ex
|
|
Clog.emit("Failed to update billing record") { {billing_record_update_error: {project_ubid: project.ubid, replica_ubid: inference_router_replica.ubid, tokens: tokens, exception: Util.exception_to_hash(ex)}} }
|
|
end
|
|
end
|
|
end
|
|
end
|