Files
ubicloud/scheduling/allocator.rb
Hadi Moshayedi 2790c5b95e Add IO rate limiting
This change enables setting limits on total IOPS, read bandwidth, and
write bandwidth for each `VmStorageVolume`. These limits can be
specified in `Nexus::assemble`:

```
Prog::Vm::Nexus.assemble(
  ...
  storage_volumes: [{
      size_gib: 40,
      encrypted: true,
      max_read_mbytes_per_sec: 200,
      max_write_mbytes_per_sec: 150,
      max_ios_per_sec: 25600
    }],
  ...)
```

The implementation uses SPDK's `bdev_set_qos_limit` RPC call during
volume setup on the Rhizome side.

This feature prepares for future support of burstable instances with
limited IO capacity and allows proportional IO allocation based on VM
size. These additional allocation strategies will be addressed in
upcoming PRs.
2024-11-20 13:26:27 -08:00

359 lines
14 KiB
Ruby

# frozen_string_literal: true
module Scheduling::Allocator
# :nocov:
def self.freeze
target_host_utilization
super
end
# :nocov:
def self.target_host_utilization
@target_host_utilization ||= Config.allocator_target_host_utilization
end
def self.allocate(vm, storage_volumes, distinct_storage_devices: false, gpu_count: 0, allocation_state_filter: ["accepting"], host_filter: [], host_exclusion_filter: [], location_filter: [], location_preference: [])
request = Request.new(
vm.id,
vm.cores,
vm.mem_gib,
storage_volumes.map { _1["size_gib"] }.sum,
storage_volumes.size.times.zip(storage_volumes).to_h.sort_by { |k, v| v["size_gib"] * -1 },
vm.boot_image,
distinct_storage_devices,
gpu_count,
vm.ip4_enabled,
target_host_utilization,
vm.arch,
allocation_state_filter,
host_filter,
host_exclusion_filter,
location_filter,
location_preference
)
allocation = Allocation.best_allocation(request)
fail "#{vm} no space left on any eligible host" unless allocation
allocation.update(vm)
Clog.emit("vm allocated") { {allocation: allocation.to_s, duration: Time.now - vm.created_at} }
end
Request = Struct.new(:vm_id, :cores, :mem_gib, :storage_gib, :storage_volumes, :boot_image, :distinct_storage_devices, :gpu_count, :ip4_enabled,
:target_host_utilization, :arch_filter, :allocation_state_filter, :host_filter, :host_exclusion_filter, :location_filter, :location_preference)
class Allocation
attr_reader :score
# :nocov:
def self.freeze
random_score
super
end
# :nocov:
def self.random_score
@max_random_score ||= Config.allocator_max_random_score
rand(0..@max_random_score)
end
def self.best_allocation(request)
candidate_hosts(request).map { Allocation.new(_1, request) }
.select { _1.is_valid }
.min_by { _1.score + random_score }
end
def self.candidate_hosts(request)
ds = DB[:vm_host]
.join(:storage_devices, vm_host_id: Sequel[:vm_host][:id])
.join(:total_ipv4, routed_to_host_id: Sequel[:vm_host][:id])
.join(:used_ipv4, routed_to_host_id: Sequel[:vm_host][:id])
.left_join(:gpus, vm_host_id: Sequel[:vm_host][:id])
.left_join(:vm_provisioning, vm_host_id: Sequel[:vm_host][:id])
.select(Sequel[:vm_host][:id].as(:vm_host_id), :total_cores, :used_cores, :total_hugepages_1g, :used_hugepages_1g, :location, :num_storage_devices, :available_storage_gib, :total_storage_gib, :storage_devices, :total_ipv4, :used_ipv4, Sequel.function(:coalesce, :num_gpus, 0).as(:num_gpus), Sequel.function(:coalesce, :available_gpus, 0).as(:available_gpus), :available_iommu_groups, Sequel.function(:coalesce, :vm_provisioning_count, 0).as(:vm_provisioning_count))
.where(arch: request.arch_filter)
.where { (total_hugepages_1g - used_hugepages_1g >= request.mem_gib) }
.where { (total_cores - used_cores >= request.cores) }
.with(:total_ipv4, DB[:address]
.select_group(:routed_to_host_id)
.select_append { round(sum(power(2, 32 - masklen(cidr)))).cast(:integer).as(total_ipv4) }
.where { (family(cidr) =~ 4) })
.with(:used_ipv4, DB[:address].left_join(:assigned_vm_address, address_id: :id)
.select_group(:routed_to_host_id)
.select_append { (count(Sequel[:assigned_vm_address][:id]) + 1).as(used_ipv4) })
.with(:storage_devices, DB[:storage_device]
.select_group(:vm_host_id)
.select_append { count.function.*.as(num_storage_devices) }
.select_append { sum(available_storage_gib).as(available_storage_gib) }
.select_append { sum(total_storage_gib).as(total_storage_gib) }
.select_append { json_agg(json_build_object(Sequel.lit("'id'"), Sequel[:storage_device][:id], Sequel.lit("'total_storage_gib'"), total_storage_gib, Sequel.lit("'available_storage_gib'"), available_storage_gib)).order(available_storage_gib).as(storage_devices) }
.where(enabled: true)
.having { sum(available_storage_gib) >= request.storage_gib }
.having { count.function.* >= (request.distinct_storage_devices ? request.storage_volumes.count : 1) })
.with(:gpus, DB[:pci_device]
.select_group(:vm_host_id)
.select_append { count.function.*.as(num_gpus) }
.select_append { sum(Sequel.case({{vm_id: nil} => 1}, 0)).as(available_gpus) }
.select_append { array_remove(array_agg(Sequel.case({{vm_id: nil} => :iommu_group}, nil)), nil).as(available_iommu_groups) }
.where(device_class: ["0300", "0302"]))
.with(:vm_provisioning, DB[:vm]
.select_group(:vm_host_id)
.select_append { count.function.*.as(vm_provisioning_count) }
.where(display_state: "creating"))
ds = ds.join(:boot_image, Sequel[:vm_host][:id] => Sequel[:boot_image][:vm_host_id])
.where(Sequel[:boot_image][:name] => request.boot_image)
.exclude(Sequel[:boot_image][:activated_at] => nil)
request.storage_volumes.select { _1[1]["read_only"] && _1[1]["image"] }.map { [_1[0], _1[1]["image"]] }.each do |idx, img|
table_alias = :"boot_image_#{idx}"
ds = ds.join(Sequel[:boot_image].as(table_alias), Sequel[:vm_host][:id] => Sequel[table_alias][:vm_host_id])
.where(Sequel[table_alias][:name] => img)
.exclude(Sequel[table_alias][:activated_at] => nil)
end
ds = ds.where { used_ipv4 < total_ipv4 } if request.ip4_enabled
ds = ds.where { available_gpus >= request.gpu_count } if request.gpu_count > 0
ds = ds.where(Sequel[:vm_host][:id] => request.host_filter) unless request.host_filter.empty?
ds = ds.exclude(Sequel[:vm_host][:id] => request.host_exclusion_filter) unless request.host_exclusion_filter.empty?
ds = ds.where(location: request.location_filter) unless request.location_filter.empty?
ds = ds.where(allocation_state: request.allocation_state_filter) unless request.allocation_state_filter.empty?
ds.all
end
def self.update_vm(vm_host, vm)
ip4, address = vm_host.ip4_random_vm_network if vm.ip4_enabled
fail "no ip4 addresses left" if vm.ip4_enabled && !ip4
vm.update(
vm_host_id: vm_host.id,
ephemeral_net6: vm_host.ip6_random_vm_network.to_s,
local_vetho_ip: vm_host.veth_pair_random_ip4_addr.to_s,
allocated_at: Time.now
)
AssignedVmAddress.create_with_id(dst_vm_id: vm.id, ip: ip4.to_s, address_id: address.id) if ip4
vm.sshable&.update(host: vm.ephemeral_net4 || NetAddr.parse_net(vm.ephemeral_net6).nth(2))
end
def initialize(candidate_host, request)
@candidate_host = candidate_host
@request = request
@vm_host_allocations = [VmHostAllocation.new(:used_cores, candidate_host[:total_cores], candidate_host[:used_cores], request.cores),
VmHostAllocation.new(:used_hugepages_1g, candidate_host[:total_hugepages_1g], candidate_host[:used_hugepages_1g], request.mem_gib)]
@device_allocations = [StorageAllocation.new(candidate_host, request)]
@device_allocations << GpuAllocation.new(candidate_host, request) if request.gpu_count > 0
@allocations = @vm_host_allocations + @device_allocations
@score = calculate_score
end
def is_valid
@allocations.all? { _1.is_valid }
end
def update(vm)
vm_host = VmHost[@candidate_host[:vm_host_id]]
DB.transaction do
Allocation.update_vm(vm_host, vm)
VmHost.dataset.where(id: @candidate_host[:vm_host_id]).update(@vm_host_allocations.map { _1.get_vm_host_update }.reduce(&:merge))
@device_allocations.each { _1.update(vm, vm_host) }
end
end
def to_s
"#{UBID.from_uuidish(@request.vm_id)} (arch=#{@request.arch_filter}, cpu=#{@request.cores}, mem=#{@request.mem_gib}, storage=#{@request.storage_gib}) -> #{UBID.from_uuidish(@candidate_host[:vm_host_id])} (cpu=#{@candidate_host[:used_cores]}/#{@candidate_host[:total_cores]}, mem=#{@candidate_host[:used_hugepages_1g]}/#{@candidate_host[:total_hugepages_1g]}, storage=#{@candidate_host[:total_storage_gib] - @candidate_host[:available_storage_gib]}/#{@candidate_host[:total_storage_gib]}), score=#{@score}"
end
private
def calculate_score
util = @allocations.map { _1.utilization }
# utilization score, in range [0, 2]
score = @request.target_host_utilization - util.sum.fdiv(util.size)
score = score.abs + 1 if score < 0
# imbalance score, in range [0, 1]
score += util.max - util.min
# penalty for ongoing vm provisionings on the host
score += @candidate_host[:vm_provisioning_count] * 0.5
# penalty for AX161, TODO: remove after migration to AX162
score += 0.5 if @candidate_host[:total_cores] == 32
# penalty of 5 if host has a GPU but VM doesn't require a GPU
score += 5 unless @request.gpu_count > 0 || @candidate_host[:num_gpus] == 0
# penalty of 10 if location preference is not honored
score += 10 unless @request.location_preference.empty? || @request.location_preference.include?(@candidate_host[:location])
score
end
end
class VmHostAllocation
attr_reader :total, :used, :requested
def initialize(column, total, used, requested)
fail "resource '#{column}' uses more than is available: #{used} > #{total}" if used > total
@column = column
@total = total
@used = used
@requested = requested
end
def is_valid
@requested + @used <= @total
end
def utilization
(@used + @requested).fdiv(@total)
end
def get_vm_host_update
{@column => Sequel[@column] + @requested}
end
end
class GpuAllocation
attr_reader
def initialize(candidate_host, request)
@used = candidate_host[:num_gpus] - candidate_host[:available_gpus]
@total = candidate_host[:num_gpus]
@requested = request.gpu_count
@iommu_groups = candidate_host[:available_iommu_groups].take(@requested)
end
def is_valid
@used < @total
end
def utilization
(@used + 1).fdiv(@total)
end
def update(vm, vm_host)
fail "concurrent GPU allocation" if
PciDevice.dataset
.where(vm_host_id: vm_host.id)
.where(vm_id: nil)
.where(iommu_group: @iommu_groups)
.update(vm_id: vm.id) < @requested
end
end
class StorageAllocation
attr_reader :is_valid, :total, :used, :requested, :volume_to_device_map
def initialize(candidate_host, request)
@candidate_host = candidate_host
@request = request
@is_valid = map_volumes_to_devices
end
def update(vm, vm_host)
@storage_device_allocations.each { _1.update }
create_storage_volumes(vm, vm_host)
end
def utilization
1 - (@candidate_host[:available_storage_gib] - @request.storage_gib).fdiv(@candidate_host[:total_storage_gib])
end
def self.allocate_spdk_installation(spdk_installations)
total_weight = spdk_installations.sum(&:allocation_weight)
fail "Total weight of all eligible spdk_installations shouldn't be zero." if total_weight == 0
rand_point = rand(0..total_weight - 1)
weight_sum = 0
rand_choice = spdk_installations.each { |si|
weight_sum += si.allocation_weight
break si if weight_sum > rand_point
}
rand_choice.id
end
private
def allocate_boot_image(vm_host, boot_image_name)
boot_image = BootImage.where(
vm_host_id: vm_host.id,
name: boot_image_name
).exclude(activated_at: nil).order_by(Sequel.desc(:version, nulls: :last)).first
boot_image.id
end
def map_volumes_to_devices
return false if @candidate_host[:available_storage_gib] < @request.storage_gib
@storage_device_allocations = @candidate_host[:storage_devices].map { StorageDeviceAllocation.new(_1["id"], _1["available_storage_gib"]) }
@volume_to_device_map = {}
@request.storage_volumes.each do |vol_id, vol|
dev = @storage_device_allocations.detect { |dev| dev.available_storage_gib >= vol["size_gib"] && !(@request.distinct_storage_devices && dev.allocated_storage_gib > 0) }
return false if dev.nil?
@volume_to_device_map[vol_id] = dev.id
dev.allocate(vol["size_gib"])
end
true
end
def create_storage_volumes(vm, vm_host)
@request.storage_volumes.each do |disk_index, volume|
spdk_installation_id = StorageAllocation.allocate_spdk_installation(vm_host.spdk_installations)
key_encryption_key = if volume["encrypted"]
key_wrapping_algorithm = "aes-256-gcm"
cipher = OpenSSL::Cipher.new(key_wrapping_algorithm)
key_wrapping_key = cipher.random_key
key_wrapping_iv = cipher.random_iv
StorageKeyEncryptionKey.create_with_id(
algorithm: key_wrapping_algorithm,
key: Base64.encode64(key_wrapping_key),
init_vector: Base64.encode64(key_wrapping_iv),
auth_data: "#{vm.inhost_name}_#{disk_index}"
)
end
image_id = if volume["boot"]
allocate_boot_image(vm_host, vm.boot_image)
elsif volume["read_only"]
allocate_boot_image(vm_host, volume["image"])
end
VmStorageVolume.create_with_id(
vm_id: vm.id,
boot: volume["boot"],
size_gib: volume["size_gib"],
use_bdev_ubi: SpdkInstallation[spdk_installation_id].supports_bdev_ubi? && volume["boot"],
boot_image_id: image_id,
skip_sync: volume["skip_sync"],
disk_index: disk_index,
key_encryption_key_1_id: key_encryption_key&.id,
spdk_installation_id: spdk_installation_id,
storage_device_id: @volume_to_device_map[disk_index],
max_ios_per_sec: volume["max_ios_per_sec"],
max_read_mbytes_per_sec: volume["max_read_mbytes_per_sec"],
max_write_mbytes_per_sec: volume["max_write_mbytes_per_sec"]
)
end
end
class StorageDeviceAllocation
attr_reader :id, :available_storage_gib, :allocated_storage_gib
def initialize(id, available_storage_gib)
@id = id
@available_storage_gib = available_storage_gib
@allocated_storage_gib = 0
end
def allocate(size_gib)
@available_storage_gib -= size_gib
@allocated_storage_gib += size_gib
end
def update
StorageDevice.dataset.where(id: id).update(available_storage_gib: Sequel[:available_storage_gib] - @allocated_storage_gib) if @allocated_storage_gib > 0
end
end
end
end