Files
ubicloud/kubernetes/csi/lib/ubi_csi/node_service.rb
Daniel Farina 7963a0ec01 Outline mkdir_p for CSI directory initialization
It's unusual to have such a side effect upon object initialization.

In production cases, this object is not created more than once per
startup, so, no big deal...but once testing and mocks get involved, it
clutters the tests, so abide the more conventional approach of
outlining the side effect.

In passing, remove the check for existence of a directory before
`mkdir_p`: it idempotent, it will check if there's nothing to do
internally anyway.
2025-07-25 15:20:55 +02:00

363 lines
15 KiB
Ruby

# frozen_string_literal: true
require "grpc"
require "fileutils"
require "socket"
require "yaml"
require "shellwords"
require "base64"
require_relative "errors"
require_relative "kubernetes_client"
require_relative "../csi_services_pb"
module Csi
module V1
class NodeService < Node::Service
include Csi::ServiceHelper
MAX_VOLUMES_PER_NODE = 8
VOLUME_BASE_PATH = "/var/lib/ubicsi"
OLD_PV_NAME_ANNOTATION_KEY = "csi.ubicloud.com/old-pv-name"
OLD_PVC_OBJECT_ANNOTATION_KEY = "csi.ubicloud.com/old-pvc-object"
def self.mkdir_p
FileUtils.mkdir_p(VOLUME_BASE_PATH)
end
def initialize(logger:, node_id:)
@logger = logger
@node_id = node_id
end
attr_reader :node_id
def node_get_capabilities(req, _call)
log_request_response(req, "node_get_capabilities") do |req_id|
NodeGetCapabilitiesResponse.new(
capabilities: [
NodeServiceCapability.new(
rpc: NodeServiceCapability::RPC.new(
type: NodeServiceCapability::RPC::Type::STAGE_UNSTAGE_VOLUME
)
)
]
)
end
end
def node_get_info(req, _call)
log_request_response(req, "node_get_info") do |req_id|
topology = Topology.new(
segments: {
"kubernetes.io/hostname" => @node_id
}
)
NodeGetInfoResponse.new(
node_id: @node_id,
max_volumes_per_node: MAX_VOLUMES_PER_NODE,
accessible_topology: topology
)
end
end
def run_cmd_output(*cmd, req_id:)
output, _ = run_cmd(*cmd, req_id:)
output
end
def is_mounted?(path, req_id:)
_, status = run_cmd("mountpoint", "-q", path, req_id:)
status == 0
end
def find_loop_device(backing_file, req_id:)
output, ok = run_cmd("losetup", "-j", backing_file, req_id:)
if ok && !output.empty?
loop_device = output.split(":", 2)[0].strip
return loop_device if loop_device.start_with?("/dev/loop")
end
nil
end
def self.backing_file_path(volume_id)
File.join(VOLUME_BASE_PATH, "#{volume_id}.img")
end
def pvc_needs_migration?(pvc)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
!old_pv_name.nil?
end
alias_method :is_copied_pvc?, :pvc_needs_migration?
def node_stage_volume(req, _call)
log_request_response(req, "node_stage_volume") do |req_id|
client = KubernetesClient.new(req_id:, logger: @logger)
pvc = fetch_and_migrate_pvc(req_id, client, req)
perform_node_stage_volume(req_id, pvc, req, _call)
roll_back_reclaim_policy(req_id, client, req, pvc)
remove_old_pv_annotation(client, pvc)
NodeStageVolumeResponse.new
end
end
def fetch_and_migrate_pvc(req_id, client, req)
pvc = client.get_pvc(req.volume_context["csi.storage.k8s.io/pvc/namespace"],
req.volume_context["csi.storage.k8s.io/pvc/name"])
if pvc_needs_migration?(pvc)
migrate_pvc_data(req_id, client, pvc, req)
end
pvc
rescue CopyNotFinishedError => e
log_with_id(req_id, "Waiting for data copy to finish in node_stage_volume: #{e.message}")
raise GRPC::Internal, e.message
rescue => e
log_with_id(req_id, "Internal error in node_stage_volume: #{e.class} - #{e.message} - #{e.backtrace}")
raise GRPC::Internal, "Unexpected error: #{e.class} - #{e.message}"
end
def perform_node_stage_volume(req_id, pvc, req, _call)
volume_id = req.volume_id
staging_path = req.staging_target_path
size_bytes = Integer(req.volume_context["size_bytes"], 10)
backing_file = NodeService.backing_file_path(volume_id)
begin
unless File.exist?(backing_file)
output, ok = run_cmd("fallocate", "-l", size_bytes.to_s, backing_file, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to fallocate: #{output}")
raise GRPC::ResourceExhausted, "Failed to allocate backing file: #{output}"
end
output, ok = run_cmd("fallocate", "--punch-hole", "--keep-size", "-o", "0", "-l", size_bytes.to_s, backing_file, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to punchhole: #{output}")
raise GRPC::ResourceExhausted, "Failed to punch hole in backing file: #{output}"
end
end
loop_device = find_loop_device(backing_file, req_id:)
is_new_loop_device = loop_device.nil?
if is_new_loop_device
log_with_id(req_id, "Setting up new loop device for: #{backing_file}")
output, ok = run_cmd("losetup", "--find", "--show", backing_file, req_id:)
loop_device = output.strip
unless ok && !loop_device.empty?
raise GRPC::Internal, "Failed to setup loop device: #{output}"
end
else
log_with_id(req_id, "Loop device already exists: #{loop_device}")
end
should_mkfs = is_new_loop_device
# in the case of copied PVCs, the previous has run the mkfs and by doing it again,
# we would wipe data so we avoid it here
if is_copied_pvc?(pvc)
should_mkfs = false
end
if !req.volume_capability.mount.nil? && should_mkfs
fs_type = req.volume_capability.mount.fs_type || "ext4"
output, ok = run_cmd("mkfs.#{fs_type}", loop_device, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to format device: #{output}")
raise GRPC::Internal, "Failed to format device #{loop_device} with #{fs_type}: #{output}"
end
end
unless is_mounted?(staging_path, req_id:)
FileUtils.mkdir_p(staging_path)
output, ok = run_cmd("mount", loop_device, staging_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_stage_volume: failed to mount loop device: #{output}")
raise GRPC::Internal, "Failed to mount #{loop_device} to #{staging_path}: #{output}"
end
end
# If block, do nothing else
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_stage_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_stage_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodeStageVolume error: #{e.message}"
end
end
def remove_old_pv_annotation(client, pvc)
if !pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY).nil?
pvc["metadata"]["annotations"].delete(OLD_PV_NAME_ANNOTATION_KEY)
client.update_pvc(pvc)
end
end
def roll_back_reclaim_policy(req_id, client, req, pvc)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
if old_pv_name.nil?
return
end
pv = client.get_pv(old_pv_name)
if pv.dig("spec", "persistentVolumeReclaimPolicy") == "Retain"
pv["spec"]["persistentVolumeReclaimPolicy"] = "Delete"
client.update_pv(pv)
end
rescue => e
log_with_id(req_id, "Internal error in node_stage_volume: #{e.class} - #{e.message} - #{e.backtrace}")
raise GRPC::Internal, "Unexpected error: #{e.class} - #{e.message}"
end
def migrate_pvc_data(req_id, client, pvc, req)
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
pv = client.get_pv(old_pv_name)
pv_node = client.extract_node_from_pv(pv)
old_node_ip = client.get_node_ip(pv_node)
old_data_path = NodeService.backing_file_path(pv["spec"]["csi"]["volumeHandle"])
current_data_path = NodeService.backing_file_path(req.volume_id)
daemonizer_unit_name = Shellwords.shellescape("copy_#{old_pv_name}")
case run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "check", daemonizer_unit_name, req_id:)
when "Succeeded"
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "clean", daemonizer_unit_name, req_id:)
when "NotStarted"
copy_command = ["rsync", "-az", "--inplace", "--compress-level=9", "--partial", "--whole-file", "-e", "ssh -T -c aes128-gcm@openssh.com -o Compression=no -x -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i /home/ubi/.ssh/id_ed25519", "ubi@#{old_node_ip}:#{old_data_path}", current_data_path]
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "run", daemonizer_unit_name, *copy_command, req_id:)
raise CopyNotFinishedError, "Old PV data is not copied yet"
when "InProgress"
raise CopyNotFinishedError, "Old PV data is not copied yet"
when "Failed"
raise "Copy old PV data failed"
else
raise "Daemonizer2 returned unknown status"
end
end
def node_unstage_volume(req, _call)
log_request_response(req, "node_unstage_volume") do |req_id|
begin
client = KubernetesClient.new(req_id:, logger: @logger)
if !client.node_schedulable?(@node_id)
prepare_data_migration(client, req_id, req.volume_id)
end
staging_path = req.staging_target_path
if is_mounted?(staging_path, req_id:)
output, ok = run_cmd("umount", "-q", staging_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_unstage_volume: failed to umount device: #{output}")
raise GRPC::Internal, "Failed to unmount #{staging_path}: #{output}"
end
end
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_unstage_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_unstage_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodeUnstageVolume error: #{e.class} - #{e.message}"
end
NodeUnstageVolumeResponse.new
end
end
def prepare_data_migration(client, req_id, volume_id)
log_with_id(req_id, "Retaining pv with volume_id #{volume_id}")
pv = retain_pv(req_id, client, volume_id)
log_with_id(req_id, "Recreating pvc with volume_id #{volume_id}")
recreate_pvc(req_id, client, pv)
end
def retain_pv(req_id, client, volume_id)
pv = client.find_pv_by_volume_id(volume_id)
log_with_id(req_id, "Found PV with volume_id #{volume_id}: #{pv}")
if pv.dig("spec", "persistentVolumeReclaimPolicy") != "Retain"
pv["spec"]["persistentVolumeReclaimPolicy"] = "Retain"
client.update_pv(pv)
log_with_id(req_id, "Updated PV to retain")
end
pv
end
def recreate_pvc(req_id, client, pv)
pvc_namespace, pvc_name = pv["spec"]["claimRef"].values_at("namespace", "name")
begin
pvc = client.get_pvc(pvc_namespace, pvc_name)
rescue ObjectNotFoundError => e
old_pvc_object = pv.dig("metadata", "annotations", OLD_PVC_OBJECT_ANNOTATION_KEY)
if old_pvc_object.empty?
raise e
end
pvc = YAML.load(Base64.decode64(old_pvc_object))
end
log_with_id(req_id, "Found matching PVC for PV #{pv["metadata"]["name"]}: #{pvc}")
pvc = trim_pvc(pvc, pv["metadata"]["name"])
log_with_id(req_id, "Trimmed PVC for recreation: #{pvc}")
base64_encoded_pvc = Base64.strict_encode64(YAML.dump(pvc))
if pv.dig("metadata", "annotations", OLD_PVC_OBJECT_ANNOTATION_KEY) != base64_encoded_pvc
pv["metadata"]["annotations"][OLD_PVC_OBJECT_ANNOTATION_KEY] = base64_encoded_pvc
pv["metadata"].delete("resourceVersion")
client.update_pv(pv)
end
client.delete_pvc(pvc_namespace, pvc_name)
log_with_id(req_id, "Deleted PVC #{pvc_namespace}/#{pvc_name}")
client.create_pvc(pvc)
log_with_id(req_id, "Recreated PVC with the new spec")
end
def trim_pvc(pvc, pv_name)
pvc["metadata"]["annotations"] = {OLD_PV_NAME_ANNOTATION_KEY => pv_name}
%w[resourceVersion uid creationTimestamp].each do |key|
pvc["metadata"].delete(key)
end
pvc["spec"].delete("volumeName")
pvc.delete("status")
pvc
end
def node_publish_volume(req, _call)
log_request_response(req, "node_publish_volume") do |req_id|
staging_path = req.staging_target_path
target_path = req.target_path
begin
FileUtils.mkdir_p(target_path)
output, ok = run_cmd("mount", "--bind", staging_path, target_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_publish_volume: failed to bind mount device: #{output}")
raise GRPC::Internal, "Failed to bind mount #{staging_path} to #{target_path}: #{output}"
end
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_publish_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_publish_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodePublishVolume error: #{e.message}"
end
NodePublishVolumeResponse.new
end
end
def node_unpublish_volume(req, _call)
log_request_response(req, "node_unpublish_volume") do |req_id|
target_path = req.target_path
begin
if is_mounted?(target_path, req_id:)
output, ok = run_cmd("umount", "-q", target_path, req_id:)
unless ok
log_with_id(req_id, "gRPC error in node_unpublish_volume: failed to umount device: #{output}")
raise GRPC::Internal, "Failed to unmount #{target_path}: #{output}"
end
else
log_with_id(req_id, "#{target_path} is not mounted, skipping umount")
end
rescue GRPC::BadStatus => e
log_with_id(req_id, "gRPC error in node_unpublish_volume: #{e.class} - #{e.message}")
raise e
rescue => e
log_with_id(req_id, "Internal error in node_unpublish_volume: #{e.class} - #{e.message}\n#{e.backtrace.join("\n")}")
raise GRPC::Internal, "NodeUnpublishVolume error: #{e.message}"
end
NodeUnpublishVolumeResponse.new
end
end
end
end
end