Files
ubicloud/kubernetes/csi/lib/ubi_csi/controller_service.rb
mohi-kalantari 66bc21cdbc Handle PV deletion in UbiCSI
When we are done with an old pv at the end of "stage" phase of the
newly migrated PV, we would set the reclaim policy back to "Delete"
and kuberentes provisioner would call the DeleteVolume method.

We would then assemble an ssh command to run and delete the backing
file wherever it resides.
2025-07-25 15:20:55 +02:00

147 lines
7.2 KiB
Ruby

# frozen_string_literal: true
require "grpc"
require "json"
require "securerandom"
require_relative "../csi_services_pb"
module Csi
module V1
class ControllerService < Controller::Service
include Csi::ServiceHelper
MAX_VOLUME_SIZE = 2 * 1024 * 1024 * 1024 # 2GB in bytes
def initialize(logger:)
@logger = logger
@volume_store = {} # Maps volume name to volume details
@mutex = Mutex.new
end
def controller_get_capabilities(req, _call)
log_request_response(req, "controller_get_capabilities") do |req_id|
ControllerGetCapabilitiesResponse.new(
capabilities: [
ControllerServiceCapability.new(
rpc: ControllerServiceCapability::RPC.new(
type: ControllerServiceCapability::RPC::Type::CREATE_DELETE_VOLUME
)
)
]
)
end
end
# We advertise PluginCapability::Service::Type::VOLUME_ACCESSIBILITY_CONSTRAINTS
# as one of the plugin capabilities in IdentityPlugin. This plugin allows us
# to stick the PVs to the node they are scheduled on so they won't jump around
# during regular pod deletes.
#
# This function will be used in CreateVolume method,
# Telling the kubernetes cluster first to not select control-plane nodes,
# then selecting any of the nodes which might can host the PV.
def select_worker_topology(req)
preferred = req.accessibility_requirements.preferred
requisite = req.accessibility_requirements.requisite
selected = preferred.find { |topo| !topo.segments["kubernetes.io/hostname"].start_with?("kc") }
selected ||= requisite.find { |topo| !topo.segments["kubernetes.io/hostname"].start_with?("kc") }
if selected.nil?
raise GRPC::FailedPrecondition.new("No suitable worker node topology found", GRPC::Core::StatusCodes::FAILED_PRECONDITION)
end
selected
end
def create_volume(req, _call)
log_request_response(req, "create_volume") do |req_id|
raise GRPC::InvalidArgument.new("Volume name is required", GRPC::Core::StatusCodes::INVALID_ARGUMENT) if req.name.nil? || req.name.empty?
raise GRPC::InvalidArgument.new("Capacity range is required", GRPC::Core::StatusCodes::INVALID_ARGUMENT) if req.capacity_range.nil?
raise GRPC::InvalidArgument.new("Required bytes must be positive", GRPC::Core::StatusCodes::INVALID_ARGUMENT) if req.capacity_range.required_bytes <= 0
raise GRPC::InvalidArgument.new("Volume size exceeds maximum allowed size of 2GB", GRPC::Core::StatusCodes::OUT_OF_RANGE) if req.capacity_range.required_bytes > MAX_VOLUME_SIZE
raise GRPC::InvalidArgument.new("Volume capabilities are required", GRPC::Core::StatusCodes::INVALID_ARGUMENT) if req.volume_capabilities.nil? || req.volume_capabilities.empty?
raise GRPC::InvalidArgument.new("Topology requirement is required", GRPC::Core::StatusCodes::INVALID_ARGUMENT) if req.accessibility_requirements.nil? || req.accessibility_requirements.requisite.empty?
existing = nil
new_volume_id = nil
selected_topology = nil
@mutex.synchronize do
unless (existing = @volume_store[req.name])
selected_topology = select_worker_topology(req)
new_volume_id = "vol-#{SecureRandom.uuid}"
@volume_store[req.name] = {
volume_id: new_volume_id,
name: req.name.freeze,
accessible_topology: selected_topology.freeze,
capacity_bytes: req.capacity_range.required_bytes,
parameters: req.parameters.to_h.transform_values(&:freeze).freeze,
capabilities: req.volume_capabilities.map(&:to_h).freeze
}.freeze
end
end
if existing
if req.accessibility_requirements.requisite.first != existing[:accessible_topology]
raise GRPC::FailedPrecondition.new("Existing volume has incompatible topology", GRPC::Core::StatusCodes::FAILED_PRECONDITION)
end
if existing[:capacity_bytes] != req.capacity_range.required_bytes
raise GRPC::FailedPrecondition.new("Volume with same name but different size exists", GRPC::Core::StatusCodes::FAILED_PRECONDITION)
end
if existing[:parameters] != req.parameters.to_h
raise GRPC::FailedPrecondition.new("Volume with same name but different parameters exists", GRPC::Core::StatusCodes::FAILED_PRECONDITION)
end
existing_capabilities = existing[:capabilities].sort_by(&:to_json)
new_capabilities = req.volume_capabilities.map(&:to_h).sort_by(&:to_json)
if existing_capabilities != new_capabilities
raise GRPC::FailedPrecondition.new("Volume with same name but different capabilities exists", GRPC::Core::StatusCodes::FAILED_PRECONDITION)
end
end
volume_id = existing ? existing[:volume_id] : new_volume_id
topology = existing ? existing[:accessible_topology] : selected_topology
CreateVolumeResponse.new(
volume: Volume.new(
volume_id: volume_id,
capacity_bytes: req.capacity_range.required_bytes,
volume_context: req.parameters.to_h.merge("size_bytes" => req.capacity_range.required_bytes.to_s),
accessible_topology: [topology]
)
)
end
end
def delete_volume(req, _call)
log_request_response(req, "delete_volume") do |req_id|
raise GRPC::InvalidArgument.new("Volume ID is required", GRPC::Core::StatusCodes::INVALID_ARGUMENT) if req.volume_id.nil? || req.volume_id.empty?
client = KubernetesClient.new(req_id:, logger: @logger)
# Since we would have at most 8 PVCs per node, searching by value will not cause overhead
pv_name = @mutex.synchronize { @volume_store.find { |_, d| d[:volume_id] == req.volume_id }&.first }
pv = pv_name.nil? ? client.find_pv_by_volume_id(req.volume_id) : client.get_pv(pv_name)
pv_node = client.extract_node_from_pv(pv)
pv_node_ip = client.get_node_ip(pv_node)
file_path = NodeService.backing_file_path(req.volume_id)
delete_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-i", "/ssh/id_ed25519", "ubi@#{pv_node_ip}",
"sudo", "rm", "-f", file_path]
output, status = run_cmd(*delete_cmd, req_id:)
if !status.success?
log_with_id(req_id, "Could not delete the PV's backing file: #{output}")
raise GRPC::Internal, "Could not delete the PV's backing file"
end
@mutex.synchronize { @volume_store.delete(pv_name) }
DeleteVolumeResponse.new
rescue GRPC::InvalidArgument => e
log_with_id(req_id, "Handled gRPC validation error in delete_volume: #{e.class} - #{e.message}")
raise
rescue => e
log_with_id(req_id, "Internal error in delete_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "DeleteVolume error: #{e.message}"
end
end
end
end
end