ubicloud/kubernetes/csi/lib/ubi_csi/node_service.rb
mohi-kalantari 4c383de3ca Fix PVC recreation issue
Before this commit, we were recreating the PVC multiple times and
it was not idempotent. On certain cases it break the flow and pvc
would get stuck in a bad state.

With this change, by using pvc uid, we make sure we delete the pvc
once and if we are not the one recreating the pvc, we make sure we
have added the annotation to let the controller on the destination
node do the migration

Some changes were made to the kubernetes client to patch resources
instead of updating them which is simpler and is idempotent
2025-09-24 11:59:14 +02:00

360 lines
14 KiB
Ruby

# frozen_string_literal: true
require "grpc"
require "fileutils"
require "socket"
require "yaml"
require "shellwords"
require "base64"
require_relative "errors"
require_relative "kubernetes_client"
require_relative "../csi_services_pb"
module Csi
module V1
class NodeService < Node::Service
include Csi::ServiceHelper
MAX_VOLUMES_PER_NODE = 8
VOLUME_BASE_PATH = "/var/lib/ubicsi"
OLD_PV_NAME_ANNOTATION_KEY = "csi.ubicloud.com/old-pv-name"
OLD_PVC_OBJECT_ANNOTATION_KEY = "csi.ubicloud.com/old-pvc-object"
ACCEPTABLE_FS = ["ext4", "xfs"].freeze
def self.mkdir_p
FileUtils.mkdir_p(VOLUME_BASE_PATH)
end
def initialize(logger:, node_id:)
@logger = logger
@node_id = node_id
end
attr_reader :node_id
def node_get_capabilities(req, _call)
log_request_response(req, "node_get_capabilities") do |req_id|
NodeGetCapabilitiesResponse.new(
capabilities: [
NodeServiceCapability.new(
rpc: NodeServiceCapability::RPC.new(
type: NodeServiceCapability::RPC::Type::STAGE_UNSTAGE_VOLUME
)
)
]
)
end
end
def node_get_info(req, _call)
log_request_response(req, "node_get_info") do |req_id|
topology = Topology.new(
segments: {
"kubernetes.io/hostname" => @node_id
}
)
NodeGetInfoResponse.new(
node_id: @node_id,
max_volumes_per_node: MAX_VOLUMES_PER_NODE,
accessible_topology: topology
)
end
end
def run_cmd_output(*cmd, req_id:)
output, _ = run_cmd(*cmd, req_id:)
output
end
def is_mounted?(path, req_id:)
_, status = run_cmd("mountpoint", "-q", path, req_id:)
status == 0
end
def find_loop_device(backing_file, req_id:)
output, ok = run_cmd("losetup", "-j", backing_file, req_id:)
if ok && !output.empty?
loop_device = output.split(":", 2)[0].strip
return loop_device if loop_device.start_with?("/dev/loop")
end
nil
end
def remove_loop_device(backing_file, req_id:)
loop_device = find_loop_device(backing_file, req_id:)
return unless loop_device
output, ok = run_cmd("losetup", "-d", loop_device, req_id:)
unless ok
raise "Could not remove loop device: #{output}"
end
end
def self.backing_file_path(volume_id)
File.join(VOLUME_BASE_PATH, "#{volume_id}.img")
end
def pvc_needs_migration?(pvc)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
!old_pv_name.nil?
end
def find_file_system(loop_device, req_id:)
output, ok = run_cmd("blkid", "-o", "value", "-s", "TYPE", loop_device, req_id:)
unless ok
raise "Failed to get the loop device filesystem status: #{output}"
end
output.strip
end
def node_stage_volume(req, _call)
log_request_response(req, "node_stage_volume") do |req_id|
client = KubernetesClient.new(req_id:, logger: @logger)
pvc = fetch_and_migrate_pvc(req_id, client, req)
perform_node_stage_volume(req_id, pvc, req, _call)
roll_back_reclaim_policy(req_id, client, req, pvc)
remove_old_pv_annotation_from_pvc(req_id, client, pvc)
NodeStageVolumeResponse.new
rescue => e
log_and_raise(req_id, e)
end
end
def fetch_and_migrate_pvc(req_id, client, req)
pvc = client.get_pvc(req.volume_context["csi.storage.k8s.io/pvc/namespace"],
req.volume_context["csi.storage.k8s.io/pvc/name"])
if pvc_needs_migration?(pvc)
migrate_pvc_data(req_id, client, pvc, req)
end
pvc
end
def perform_node_stage_volume(req_id, pvc, req, _call)
volume_id = req.volume_id
staging_path = req.staging_target_path
size_bytes = Integer(req.volume_context["size_bytes"], 10)
backing_file = NodeService.backing_file_path(volume_id)
unless File.exist?(backing_file)
output, ok = run_cmd("fallocate", "-l", size_bytes.to_s, backing_file, req_id:)
unless ok
raise GRPC::ResourceExhausted.new("Failed to allocate backing file: #{output}")
end
output, ok = run_cmd("fallocate", "--punch-hole", "--keep-size", "-o", "0", "-l", size_bytes.to_s, backing_file, req_id:)
unless ok
raise GRPC::ResourceExhausted.new("Failed to punch hole in backing file: #{output}")
end
end
loop_device = find_loop_device(backing_file, req_id:)
is_new_loop_device = loop_device.nil?
if is_new_loop_device
log_with_id(req_id, "Setting up new loop device for: #{backing_file}")
output, ok = run_cmd("losetup", "--find", "--show", backing_file, req_id:)
loop_device = output.strip
unless ok && !loop_device.empty?
raise "Failed to setup loop device: #{output}"
end
else
log_with_id(req_id, "Loop device already exists: #{loop_device}")
end
if req.volume_capability.mount
current_fs_type = find_file_system(loop_device, req_id:)
if !current_fs_type.empty? && !ACCEPTABLE_FS.include?(current_fs_type)
raise "Unacceptable file system type for #{loop_device}: #{current_fs_type}"
end
desired_fs_type = req.volume_capability.mount.fs_type || "ext4"
if current_fs_type != "" && current_fs_type != desired_fs_type
raise "Unexpected filesystem on volume. desired: #{desired_fs_type}, current: #{current_fs_type}"
elsif current_fs_type == ""
output, ok = run_cmd("mkfs.#{desired_fs_type}", loop_device, req_id:)
unless ok
raise "Failed to format device #{loop_device} with #{desired_fs_type}: #{output}"
end
end
end
unless is_mounted?(staging_path, req_id:)
FileUtils.mkdir_p(staging_path)
output, ok = run_cmd("mount", loop_device, staging_path, req_id:)
unless ok
raise "Failed to mount #{loop_device} to #{staging_path}: #{output}"
end
end
# If block, do nothing else
end
def remove_old_pv_annotation_from_pvc(req_id, client, pvc)
namespace, name = pvc["metadata"].values_at("namespace", "name")
log_with_id(req_id, "Removing old pv annotation #{OLD_PV_NAME_ANNOTATION_KEY}")
client.remove_pvc_annotation(namespace, name, OLD_PV_NAME_ANNOTATION_KEY)
end
def roll_back_reclaim_policy(req_id, client, req, pvc)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
if old_pv_name.nil?
return
end
pv = client.get_pv(old_pv_name)
if pv.dig("spec", "persistentVolumeReclaimPolicy") == "Retain"
pv["spec"]["persistentVolumeReclaimPolicy"] = "Delete"
client.update_pv(pv)
end
end
def migrate_pvc_data(req_id, client, pvc, req)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
pv = client.get_pv(old_pv_name)
pv_node = client.extract_node_from_pv(pv)
old_node_ip = client.get_node_ip(pv_node)
old_data_path = NodeService.backing_file_path(pv["spec"]["csi"]["volumeHandle"])
current_data_path = NodeService.backing_file_path(req.volume_id)
daemonizer_unit_name = Shellwords.shellescape("copy_#{old_pv_name}")
case run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "check", daemonizer_unit_name, req_id:)
when "Succeeded"
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "clean", daemonizer_unit_name, req_id:)
when "NotStarted"
copy_command = ["rsync", "-az", "--inplace", "--compress-level=9", "--partial", "--whole-file", "-e", "ssh -T -c aes128-gcm@openssh.com -o Compression=no -x -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i /home/ubi/.ssh/id_ed25519", "ubi@#{old_node_ip}:#{old_data_path}", current_data_path]
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "run", daemonizer_unit_name, *copy_command, req_id:)
raise CopyNotFinishedError, "Old PV data is not copied yet"
when "InProgress"
raise CopyNotFinishedError, "Old PV data is not copied yet"
when "Failed"
raise "Copy old PV data failed"
else
raise "Daemonizer2 returned unknown status"
end
end
def node_unstage_volume(req, _call)
log_request_response(req, "node_unstage_volume") do |req_id|
backing_file = NodeService.backing_file_path(req.volume_id)
client = KubernetesClient.new(req_id:, logger: @logger)
if !client.node_schedulable?(@node_id)
prepare_data_migration(client, req_id, req.volume_id)
end
remove_loop_device(backing_file, req_id:)
staging_path = req.staging_target_path
if is_mounted?(staging_path, req_id:)
output, ok = run_cmd("umount", "-q", staging_path, req_id:)
unless ok
raise "Failed to unmount #{staging_path}: #{output}"
end
end
NodeUnstageVolumeResponse.new
rescue => e
log_and_raise(req_id, e)
end
end
def prepare_data_migration(client, req_id, volume_id)
log_with_id(req_id, "Retaining pv with volume_id #{volume_id}")
pv = retain_pv(req_id, client, volume_id)
log_with_id(req_id, "Recreating pvc with volume_id #{volume_id}")
recreate_pvc(req_id, client, pv)
end
def retain_pv(req_id, client, volume_id)
pv = client.find_pv_by_volume_id(volume_id)
log_with_id(req_id, "Found PV with volume_id #{volume_id}: #{pv}")
if pv.dig("spec", "persistentVolumeReclaimPolicy") != "Retain"
pv["spec"]["persistentVolumeReclaimPolicy"] = "Retain"
client.update_pv(pv)
log_with_id(req_id, "Updated PV to retain")
end
pv
end
def recreate_pvc(req_id, client, pv)
pvc_namespace, pvc_name = pv["spec"]["claimRef"].values_at("namespace", "name")
pv_name = pv.dig("metadata", "name")
begin
pvc = client.get_pvc(pvc_namespace, pvc_name)
rescue ObjectNotFoundError
old_pvc_object = pv.dig("metadata", "annotations", OLD_PVC_OBJECT_ANNOTATION_KEY)
if old_pvc_object.empty?
raise
end
pvc = YAML.load(Base64.decode64(old_pvc_object))
end
log_with_id(req_id, "Found matching PVC for PV #{pv_name}: #{pvc}")
pvc_uid = pvc.dig("metadata", "uid")
pvc_deletion_timestamp = pvc.dig("metadata", "deletionTimestamp")
trim_pvc(pvc, pv_name)
log_with_id(req_id, "Trimmed PVC for recreation: #{pvc}")
client.patch_resource("pv", pv_name, OLD_PVC_OBJECT_ANNOTATION_KEY, Base64.strict_encode64(YAML.dump(pvc)))
if pvc_uid == pv_name.delete_prefix("pvc-")
if !pvc_deletion_timestamp
client.delete_pvc(pvc_namespace, pvc_name)
log_with_id(req_id, "Deleted PVC #{pvc_namespace}/#{pvc_name}")
client.remove_pvc_finalizers(pvc_namespace, pvc_name)
log_with_id(req_id, "Removed PVC finalizers #{pvc_namespace}/#{pvc_name}")
end
client.create_pvc(pvc)
log_with_id(req_id, "Recreated PVC with the new spec")
else
# PVC is recreated now.
# At this stage we don't know whether we have created the PVC or
# Statefulset controller has created it. We just need to make sure
# the csi.ubicloud.com/old-pv-name annotation is set.
client.patch_resource("pvc", pvc_name, OLD_PV_NAME_ANNOTATION_KEY, pv_name, namespace: pvc_namespace)
end
end
def trim_pvc(pvc, pv_name)
pvc["metadata"]["annotations"] ||= {}
%W[#{OLD_PVC_OBJECT_ANNOTATION_KEY} volume.kubernetes.io/selected-node pv.kubernetes.io/bind-completed].each do |key|
pvc["metadata"]["annotations"].delete(key)
end
%w[resourceVersion uid creationTimestamp deletionTimestamp deletionGracePeriodSeconds].each do |key|
pvc["metadata"].delete(key)
end
pvc["spec"].delete("volumeName")
pvc.delete("status")
pvc["metadata"]["annotations"][OLD_PV_NAME_ANNOTATION_KEY] = pv_name
pvc
end
def node_publish_volume(req, _call)
log_request_response(req, "node_publish_volume") do |req_id|
staging_path = req.staging_target_path
target_path = req.target_path
unless is_mounted?(target_path, req_id:)
FileUtils.mkdir_p(target_path)
output, ok = run_cmd("mount", "--bind", staging_path, target_path, req_id:)
unless ok
raise "Failed to bind mount #{staging_path} to #{target_path}: #{output}"
end
end
NodePublishVolumeResponse.new
rescue => e
log_and_raise(req_id, e)
end
end
def node_unpublish_volume(req, _call)
log_request_response(req, "node_unpublish_volume") do |req_id|
target_path = req.target_path
if is_mounted?(target_path, req_id:)
output, ok = run_cmd("umount", "-q", target_path, req_id:)
unless ok
raise "Failed to unmount #{target_path}: #{output}"
end
else
log_with_id(req_id, "#{target_path} is not mounted, skipping umount")
end
NodeUnpublishVolumeResponse.new
rescue => e
log_and_raise(req_id, e)
end
end
end
end
end