Files
ubicloud/kubernetes/csi/lib/ubi_csi/node_service.rb
mohi-kalantari 8753678853 Handle PV deletion in UbiCSI
When we are done with an old pv at the end of "stage" phase of the
newly migrated PV, we would set the reclaim policy back to "Delete"
and kuberentes provisioner would call the DeleteVolume method.

We would then assemble an ssh command to run and delete the backing
file wherever it resides.
2025-07-21 19:31:08 +02:00

403 lines
16 KiB
Ruby

# frozen_string_literal: true
require "grpc"
require "fileutils"
require "open3"
require "socket"
require "logger"
require "yaml"
require "shellwords"
require "base64"
require_relative "errors"
require_relative "kubernetes_client"
require_relative "../csi_services_pb"
module Csi
module V1
class NodeService < Node::Service
MAX_VOLUMES_PER_NODE = 8
VOLUME_BASE_PATH = "/var/lib/ubicsi"
LOGGER = Logger.new($stdout)
OLD_PV_NAME_ANNOTATION_KEY = "csi.ubicloud.com/old-pv-name"
OLD_PVC_OBJECT_ANNOTATION_KEY = "csi.ubicloud.com/old-pvc-object"
def log_with_id(id, message)
LOGGER.info("[req_id=#{id}] [CSI NodeService] #{message}")
end
def node_name
ENV["NODE_ID"]
end
def node_get_capabilities(req, _call)
req_id = SecureRandom.uuid
log_with_id(req_id, "node_get_capabilities request: #{req.inspect}")
resp = NodeGetCapabilitiesResponse.new(
capabilities: [
NodeServiceCapability.new(
rpc: NodeServiceCapability::RPC.new(
type: NodeServiceCapability::RPC::Type::STAGE_UNSTAGE_VOLUME
)
)
]
)
log_with_id(req_id, "node_get_capabilities response: #{resp.inspect}")
resp
end
def node_get_info(req, _call)
req_id = SecureRandom.uuid
log_with_id(req_id, "node_get_info request: #{req.inspect}")
topology = Topology.new(
segments: {
"kubernetes.io/hostname" => node_name
}
)
resp = NodeGetInfoResponse.new(
node_id: node_name,
max_volumes_per_node: MAX_VOLUMES_PER_NODE,
accessible_topology: topology
)
log_with_id(req_id, "node_get_info response: #{resp.inspect}")
resp
end
def run_cmd(*cmd, req_id: nil)
log_with_id(req_id, "Running command: #{cmd}") unless req_id.nil?
Open3.capture2e(*cmd)
end
def run_cmd_output(*cmd, req_id: nil)
output, _ = run_cmd(*cmd, req_id:)
output
end
def is_mounted?(path, req_id: nil)
_, status = run_cmd("mountpoint", "-q", path, req_id:)
status == 0
end
def find_loop_device(backing_file)
output, ok = run_cmd("losetup", "-j", backing_file)
if ok && !output.empty?
loop_device = output.split(":")[0].strip
return loop_device if loop_device.start_with?("/dev/loop")
end
nil
end
def self.backing_file_path(volume_id)
File.join(VOLUME_BASE_PATH, "#{volume_id}.img")
end
def pvc_needs_migration?(pvc)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
!old_pv_name.nil?
end
def node_stage_volume(req, _call)
req_id = SecureRandom.uuid
log_with_id(req_id, "node_stage_volume request: #{req.inspect}")
client = KubernetesClient.new(req_id:)
pvc = fetch_and_migrate_pvc(req_id, client, req)
resp = perform_node_stage_volume(req_id, pvc, req, _call)
roll_back_reclaim_policy(req_id, client, req, pvc)
remove_old_pv_annotation(client, pvc)
resp
end
def fetch_and_migrate_pvc(req_id, client, req)
pvc = client.get_pvc(req.volume_context["csi.storage.k8s.io/pvc/namespace"],
req.volume_context["csi.storage.k8s.io/pvc/name"])
if pvc_needs_migration?(pvc)
migrate_pvc_data(req_id, client, pvc, req)
end
pvc
rescue CopyNotFinishedError => e
log_with_id(req_id, "Waiting for data copy to finish in node_stage_volume: #{e.message}")
raise GRPC::Internal, e.message
rescue => e
log_with_id(req_id, "Internal error in node_stage_volume: #{e.class} - #{e.message} - #{e.backtrace}")
raise GRPC::Internal, "Unexpected error: #{e.class} - #{e.message}"
end
def perform_node_stage_volume(req_id, pvc, req, _call)
volume_id = req.volume_id
staging_path = req.staging_target_path
size_bytes = req.volume_context["size_bytes"].to_i
backing_file = NodeService.backing_file_path(volume_id)
unless Dir.exist?(VOLUME_BASE_PATH)
log_with_id(req_id, "Creating backing file's directory #{VOLUME_BASE_PATH}")
FileUtils.mkdir_p(VOLUME_BASE_PATH)
end
begin
unless File.exist?(backing_file)
output, ok = run_cmd("fallocate", "-l", size_bytes.to_s, backing_file, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to fallocate: #{output}")
raise GRPC::ResourceExhausted, "Failed to allocate backing file: #{output}"
end
output, ok = run_cmd("fallocate", "--punch-hole", "--keep-size", "-o", "0", "-l", size_bytes.to_s, backing_file, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to punchhole: #{output}")
raise GRPC::ResourceExhausted, "Failed to punch hole in backing file: #{output}"
end
end
loop_device = find_loop_device(backing_file)
is_new_loop_device = loop_device.nil?
if is_new_loop_device
log_with_id(req_id, "Setting up new loop device for: #{backing_file}")
output, ok = run_cmd("losetup", "--find", "--show", backing_file, req_id:)
loop_device = output.strip
unless ok && !loop_device.empty?
raise GRPC::Internal, "Failed to setup loop device: #{output}"
end
else
log_with_id(req_id, "Loop device already exists: #{loop_device}")
end
should_mkfs = is_new_loop_device
# in the case of copied PVCs, the previous has run the mkfs and by doing it again,
# we would wipe data so we avoid it here
if is_copied_pvc(pvc)
should_mkfs = false
end
if !req.volume_capability.mount.nil? && should_mkfs
fs_type = req.volume_capability.mount.fs_type || "ext4"
output, ok = run_cmd("mkfs.#{fs_type}", loop_device, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to format device: #{output}")
raise GRPC::Internal, "Failed to format device #{loop_device} with #{fs_type}: #{output}"
end
end
unless is_mounted?(staging_path, req_id:)
FileUtils.mkdir_p(staging_path)
output, ok = run_cmd("mount", loop_device, staging_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to mount loop device: #{output}")
raise GRPC::Internal, "Failed to mount #{loop_device} to #{staging_path}: #{output}"
end
end
# If block, do nothing else
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_stage_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_stage_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodeStageVolume error: #{e.message}"
end
resp = NodeStageVolumeResponse.new
log_with_id(req_id, "node_stage_volume response: #{resp.inspect}")
resp
end
def remove_old_pv_annotation(client, pvc)
if !pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY).nil?
pvc["metadata"]["annotations"].delete(OLD_PV_NAME_ANNOTATION_KEY)
client.update_pvc(pvc)
end
end
def roll_back_reclaim_policy(req_id, client, req, pvc)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
if old_pv_name.nil?
return
end
pv = client.get_pv(old_pv_name)
if pv.dig("spec", "persistentVolumeReclaimPolicy") == "Retain"
pv["spec"]["persistentVolumeReclaimPolicy"] = "Delete"
client.update_pv(pv)
end
rescue => e
log_with_id(req_id, "Internal error in node_stage_volume: #{e.class} - #{e.message} - #{e.backtrace}")
raise GRPC::Internal, "Unexpected error: #{e.class} - #{e.message}"
end
def is_copied_pvc(pvc)
pvc_needs_migration?(pvc)
end
def migrate_pvc_data(req_id, client, pvc, req)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
pv = client.get_pv(old_pv_name)
pv_node = client.extract_node_from_pv(pv)
old_node_ip = client.get_node_ip(pv_node)
old_data_path = NodeService.backing_file_path(pv["spec"]["csi"]["volumeHandle"])
current_data_path = NodeService.backing_file_path(req.volume_id)
daemonizer_unit_name = Shellwords.shellescape("copy_#{old_pv_name}")
case run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "check", daemonizer_unit_name, req_id:)
when "Succeeded"
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "clean", daemonizer_unit_name, req_id:)
when "NotStarted"
copy_command = ["rsync", "-az", "--inplace", "--compress-level=9", "--partial", "--whole-file", "-e", "ssh -T -c aes128-gcm@openssh.com -o Compression=no -x -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i /home/ubi/.ssh/id_ed25519", "ubi@#{old_node_ip}:#{old_data_path}", current_data_path]
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "run", daemonizer_unit_name, *copy_command, req_id:)
raise CopyNotFinishedError, "Old PV data is not copied yet"
when "InProgress"
raise CopyNotFinishedError, "Old PV data is not copied yet"
when "Failed"
raise "Copy old PV data failed"
else
raise "Daemonizer2 returned unknown status"
end
end
def node_unstage_volume(req, _call)
req_id = SecureRandom.uuid
log_with_id(req_id, "node_unstage_volume request: #{req.inspect}")
begin
client = KubernetesClient.new(req_id:)
if !client.node_schedulable?(node_name)
prepare_data_migration(client, req_id, req.volume_id)
end
rescue => e
log_with_id(req_id, "Internal error in node_unstage_volume: #{e.class} - #{e.message} - #{e.backtrace}")
raise GRPC::Internal, "Unexpected error: #{e.class} - #{e.message}"
end
perform_node_unstage_volume(req_id, req, _call)
end
def perform_node_unstage_volume(req_id, req, _call)
staging_path = req.staging_target_path
begin
if is_mounted?(staging_path)
output, ok = run_cmd("umount", "-q", staging_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_unstage_volume: failed to umount device: #{output}")
raise GRPC::Internal, "Failed to unmount #{staging_path}: #{output}"
end
else
log_with_id(req_id, "#{staging_path} is not mounted, skipping umount")
end
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_unstage_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_unstage_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodeUnstageVolume error: #{e.message}"
end
resp = NodeUnstageVolumeResponse.new
log_with_id(req_id, "node_unstage_volume response: #{resp.inspect}")
resp
end
def prepare_data_migration(client, req_id, volume_id)
log_with_id(req_id, "Retaining pv with volume_id #{volume_id}")
pv = retain_pv(req_id, client, volume_id)
log_with_id(req_id, "Recreating pvc with volume_id #{volume_id}")
recreate_pvc(req_id, client, pv)
end
def retain_pv(req_id, client, volume_id)
pv = client.find_pv_by_volume_id(volume_id) # todo: if pv is nil, react accordingly
log_with_id(req_id, "Found PV with volume_id #{volume_id}: #{pv}")
if pv.dig("spec", "persistentVolumeReclaimPolicy") != "Retain"
pv["spec"]["persistentVolumeReclaimPolicy"] = "Retain"
client.update_pv(pv)
log_with_id(req_id, "Updated PV to retain")
end
pv
end
def recreate_pvc(req_id, client, pv)
pvc_namespace = pv["spec"]["claimRef"]["namespace"]
pvc_name = pv["spec"]["claimRef"]["name"]
begin
pvc = client.get_pvc(pvc_namespace, pvc_name)
rescue ObjectNotFoundError => e
old_pvc_object = pv.dig("metadata", "annotations", OLD_PVC_OBJECT_ANNOTATION_KEY)
if old_pvc_object.empty?
raise e
end
pvc = YAML.load(Base64.decode64(old_pvc_object))
end
log_with_id(req_id, "Found matching PVC for PV #{pv["metadata"]["name"]}: #{pvc}")
pvc = trim_pvc(pvc, pv["metadata"]["name"])
log_with_id(req_id, "Trimmed PVC for recreation: #{pvc}")
rendered_pvc = YAML.dump(pvc)
base64_encoded_pvc = Base64.strict_encode64(rendered_pvc)
if pv.dig("metadata", "annotations", OLD_PVC_OBJECT_ANNOTATION_KEY) != base64_encoded_pvc
pv["metadata"]["annotations"][OLD_PVC_OBJECT_ANNOTATION_KEY] = base64_encoded_pvc
pv["metadata"].delete("resourceVersion")
client.update_pv(pv)
end
client.delete_pvc(pvc_namespace, pvc_name)
log_with_id(req_id, "Deleted PVC #{pvc_namespace}/#{pvc_name}")
client.create_pvc(rendered_pvc)
log_with_id(req_id, "Recreated PVC with the new spec")
end
def trim_pvc(pvc, pv_name)
pvc["metadata"]["annotations"] = {OLD_PV_NAME_ANNOTATION_KEY => pv_name}
%w[resourceVersion uid creationTimestamp].each do |key|
pvc["metadata"].delete(key)
end
pvc["spec"].delete("volumeName")
pvc.delete("status")
pvc
end
def node_publish_volume(req, _call)
req_id = SecureRandom.uuid
log_with_id(req_id, "node_publish_volume request: #{req.inspect}")
staging_path = req.staging_target_path
target_path = req.target_path
begin
FileUtils.mkdir_p(target_path)
output, ok = run_cmd("mount", "--bind", staging_path, target_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_publish_volume: failed to bind mount device: #{output}")
raise GRPC::Internal, "Failed to bind mount #{staging_path} to #{target_path}: #{output}"
end
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_publish_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_publish_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodePublishVolume error: #{e.message}"
end
resp = NodePublishVolumeResponse.new
log_with_id(req_id, "node_publish_volume response: #{resp.inspect}")
resp
end
def node_unpublish_volume(req, _call)
req_id = SecureRandom.uuid
log_with_id(req_id, "node_unpublish_volume request: #{req.inspect}")
target_path = req.target_path
begin
if is_mounted?(target_path)
output, ok = run_cmd("umount", "-q", target_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_unpublish_volume: failed to umount device: #{output}")
raise GRPC::Internal, "Failed to unmount #{target_path}: #{output}"
end
else
log_with_id(req_id, "#{target_path} is not mounted, skipping umount")
end
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_unpublish_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_unpublish_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodeUnpublishVolume error: #{e.message}"
end
resp = NodeUnpublishVolumeResponse.new
log_with_id(req_id, "node_unpublish_volume response: #{resp.inspect}")
resp
end
end
end
end