mirror of
https://github.com/ubicloud/ubicloud.git
synced 2025-10-04 22:02:18 +08:00
Before this commit, we were recreating the PVC multiple times and it was not idempotent. On certain cases it break the flow and pvc would get stuck in a bad state. With this change, by using pvc uid, we make sure we delete the pvc once and if we are not the one recreating the pvc, we make sure we have added the annotation to let the controller on the destination node do the migration Some changes were made to the kubernetes client to patch resources instead of updating them which is simpler and is idempotent
360 lines
14 KiB
Ruby
360 lines
14 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "grpc"
|
|
require "fileutils"
|
|
require "socket"
|
|
require "yaml"
|
|
require "shellwords"
|
|
require "base64"
|
|
require_relative "errors"
|
|
require_relative "kubernetes_client"
|
|
require_relative "../csi_services_pb"
|
|
|
|
module Csi
|
|
module V1
|
|
class NodeService < Node::Service
|
|
include Csi::ServiceHelper
|
|
|
|
MAX_VOLUMES_PER_NODE = 8
|
|
VOLUME_BASE_PATH = "/var/lib/ubicsi"
|
|
OLD_PV_NAME_ANNOTATION_KEY = "csi.ubicloud.com/old-pv-name"
|
|
OLD_PVC_OBJECT_ANNOTATION_KEY = "csi.ubicloud.com/old-pvc-object"
|
|
ACCEPTABLE_FS = ["ext4", "xfs"].freeze
|
|
|
|
def self.mkdir_p
|
|
FileUtils.mkdir_p(VOLUME_BASE_PATH)
|
|
end
|
|
|
|
def initialize(logger:, node_id:)
|
|
@logger = logger
|
|
@node_id = node_id
|
|
end
|
|
|
|
attr_reader :node_id
|
|
|
|
def node_get_capabilities(req, _call)
|
|
log_request_response(req, "node_get_capabilities") do |req_id|
|
|
NodeGetCapabilitiesResponse.new(
|
|
capabilities: [
|
|
NodeServiceCapability.new(
|
|
rpc: NodeServiceCapability::RPC.new(
|
|
type: NodeServiceCapability::RPC::Type::STAGE_UNSTAGE_VOLUME
|
|
)
|
|
)
|
|
]
|
|
)
|
|
end
|
|
end
|
|
|
|
def node_get_info(req, _call)
|
|
log_request_response(req, "node_get_info") do |req_id|
|
|
topology = Topology.new(
|
|
segments: {
|
|
"kubernetes.io/hostname" => @node_id
|
|
}
|
|
)
|
|
NodeGetInfoResponse.new(
|
|
node_id: @node_id,
|
|
max_volumes_per_node: MAX_VOLUMES_PER_NODE,
|
|
accessible_topology: topology
|
|
)
|
|
end
|
|
end
|
|
|
|
def run_cmd_output(*cmd, req_id:)
|
|
output, _ = run_cmd(*cmd, req_id:)
|
|
output
|
|
end
|
|
|
|
def is_mounted?(path, req_id:)
|
|
_, status = run_cmd("mountpoint", "-q", path, req_id:)
|
|
status == 0
|
|
end
|
|
|
|
def find_loop_device(backing_file, req_id:)
|
|
output, ok = run_cmd("losetup", "-j", backing_file, req_id:)
|
|
if ok && !output.empty?
|
|
loop_device = output.split(":", 2)[0].strip
|
|
return loop_device if loop_device.start_with?("/dev/loop")
|
|
end
|
|
nil
|
|
end
|
|
|
|
def remove_loop_device(backing_file, req_id:)
|
|
loop_device = find_loop_device(backing_file, req_id:)
|
|
return unless loop_device
|
|
output, ok = run_cmd("losetup", "-d", loop_device, req_id:)
|
|
unless ok
|
|
raise "Could not remove loop device: #{output}"
|
|
end
|
|
end
|
|
|
|
def self.backing_file_path(volume_id)
|
|
File.join(VOLUME_BASE_PATH, "#{volume_id}.img")
|
|
end
|
|
|
|
def pvc_needs_migration?(pvc)
|
|
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
|
|
!old_pv_name.nil?
|
|
end
|
|
|
|
def find_file_system(loop_device, req_id:)
|
|
output, ok = run_cmd("blkid", "-o", "value", "-s", "TYPE", loop_device, req_id:)
|
|
unless ok
|
|
raise "Failed to get the loop device filesystem status: #{output}"
|
|
end
|
|
output.strip
|
|
end
|
|
|
|
def node_stage_volume(req, _call)
|
|
log_request_response(req, "node_stage_volume") do |req_id|
|
|
client = KubernetesClient.new(req_id:, logger: @logger)
|
|
pvc = fetch_and_migrate_pvc(req_id, client, req)
|
|
perform_node_stage_volume(req_id, pvc, req, _call)
|
|
roll_back_reclaim_policy(req_id, client, req, pvc)
|
|
remove_old_pv_annotation_from_pvc(req_id, client, pvc)
|
|
NodeStageVolumeResponse.new
|
|
rescue => e
|
|
log_and_raise(req_id, e)
|
|
end
|
|
end
|
|
|
|
def fetch_and_migrate_pvc(req_id, client, req)
|
|
pvc = client.get_pvc(req.volume_context["csi.storage.k8s.io/pvc/namespace"],
|
|
req.volume_context["csi.storage.k8s.io/pvc/name"])
|
|
if pvc_needs_migration?(pvc)
|
|
migrate_pvc_data(req_id, client, pvc, req)
|
|
end
|
|
pvc
|
|
end
|
|
|
|
def perform_node_stage_volume(req_id, pvc, req, _call)
|
|
volume_id = req.volume_id
|
|
staging_path = req.staging_target_path
|
|
size_bytes = Integer(req.volume_context["size_bytes"], 10)
|
|
backing_file = NodeService.backing_file_path(volume_id)
|
|
|
|
unless File.exist?(backing_file)
|
|
output, ok = run_cmd("fallocate", "-l", size_bytes.to_s, backing_file, req_id:)
|
|
unless ok
|
|
raise GRPC::ResourceExhausted.new("Failed to allocate backing file: #{output}")
|
|
end
|
|
output, ok = run_cmd("fallocate", "--punch-hole", "--keep-size", "-o", "0", "-l", size_bytes.to_s, backing_file, req_id:)
|
|
unless ok
|
|
raise GRPC::ResourceExhausted.new("Failed to punch hole in backing file: #{output}")
|
|
end
|
|
end
|
|
|
|
loop_device = find_loop_device(backing_file, req_id:)
|
|
is_new_loop_device = loop_device.nil?
|
|
if is_new_loop_device
|
|
log_with_id(req_id, "Setting up new loop device for: #{backing_file}")
|
|
output, ok = run_cmd("losetup", "--find", "--show", backing_file, req_id:)
|
|
loop_device = output.strip
|
|
unless ok && !loop_device.empty?
|
|
raise "Failed to setup loop device: #{output}"
|
|
end
|
|
else
|
|
log_with_id(req_id, "Loop device already exists: #{loop_device}")
|
|
end
|
|
|
|
if req.volume_capability.mount
|
|
current_fs_type = find_file_system(loop_device, req_id:)
|
|
if !current_fs_type.empty? && !ACCEPTABLE_FS.include?(current_fs_type)
|
|
raise "Unacceptable file system type for #{loop_device}: #{current_fs_type}"
|
|
end
|
|
|
|
desired_fs_type = req.volume_capability.mount.fs_type || "ext4"
|
|
if current_fs_type != "" && current_fs_type != desired_fs_type
|
|
raise "Unexpected filesystem on volume. desired: #{desired_fs_type}, current: #{current_fs_type}"
|
|
elsif current_fs_type == ""
|
|
output, ok = run_cmd("mkfs.#{desired_fs_type}", loop_device, req_id:)
|
|
unless ok
|
|
raise "Failed to format device #{loop_device} with #{desired_fs_type}: #{output}"
|
|
end
|
|
end
|
|
end
|
|
unless is_mounted?(staging_path, req_id:)
|
|
FileUtils.mkdir_p(staging_path)
|
|
output, ok = run_cmd("mount", loop_device, staging_path, req_id:)
|
|
unless ok
|
|
raise "Failed to mount #{loop_device} to #{staging_path}: #{output}"
|
|
end
|
|
end
|
|
# If block, do nothing else
|
|
end
|
|
|
|
def remove_old_pv_annotation_from_pvc(req_id, client, pvc)
|
|
namespace, name = pvc["metadata"].values_at("namespace", "name")
|
|
log_with_id(req_id, "Removing old pv annotation #{OLD_PV_NAME_ANNOTATION_KEY}")
|
|
client.remove_pvc_annotation(namespace, name, OLD_PV_NAME_ANNOTATION_KEY)
|
|
end
|
|
|
|
def roll_back_reclaim_policy(req_id, client, req, pvc)
|
|
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
|
|
if old_pv_name.nil?
|
|
return
|
|
end
|
|
pv = client.get_pv(old_pv_name)
|
|
if pv.dig("spec", "persistentVolumeReclaimPolicy") == "Retain"
|
|
pv["spec"]["persistentVolumeReclaimPolicy"] = "Delete"
|
|
client.update_pv(pv)
|
|
end
|
|
end
|
|
|
|
def migrate_pvc_data(req_id, client, pvc, req)
|
|
old_pv_name = pvc.dig("metadata", "annotations", OLD_PV_NAME_ANNOTATION_KEY)
|
|
pv = client.get_pv(old_pv_name)
|
|
pv_node = client.extract_node_from_pv(pv)
|
|
old_node_ip = client.get_node_ip(pv_node)
|
|
old_data_path = NodeService.backing_file_path(pv["spec"]["csi"]["volumeHandle"])
|
|
current_data_path = NodeService.backing_file_path(req.volume_id)
|
|
|
|
daemonizer_unit_name = Shellwords.shellescape("copy_#{old_pv_name}")
|
|
case run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "check", daemonizer_unit_name, req_id:)
|
|
when "Succeeded"
|
|
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "clean", daemonizer_unit_name, req_id:)
|
|
when "NotStarted"
|
|
copy_command = ["rsync", "-az", "--inplace", "--compress-level=9", "--partial", "--whole-file", "-e", "ssh -T -c aes128-gcm@openssh.com -o Compression=no -x -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i /home/ubi/.ssh/id_ed25519", "ubi@#{old_node_ip}:#{old_data_path}", current_data_path]
|
|
run_cmd_output("nsenter", "-t", "1", "-a", "/home/ubi/common/bin/daemonizer2", "run", daemonizer_unit_name, *copy_command, req_id:)
|
|
raise CopyNotFinishedError, "Old PV data is not copied yet"
|
|
when "InProgress"
|
|
raise CopyNotFinishedError, "Old PV data is not copied yet"
|
|
when "Failed"
|
|
raise "Copy old PV data failed"
|
|
else
|
|
raise "Daemonizer2 returned unknown status"
|
|
end
|
|
end
|
|
|
|
def node_unstage_volume(req, _call)
|
|
log_request_response(req, "node_unstage_volume") do |req_id|
|
|
backing_file = NodeService.backing_file_path(req.volume_id)
|
|
client = KubernetesClient.new(req_id:, logger: @logger)
|
|
if !client.node_schedulable?(@node_id)
|
|
prepare_data_migration(client, req_id, req.volume_id)
|
|
end
|
|
remove_loop_device(backing_file, req_id:)
|
|
staging_path = req.staging_target_path
|
|
if is_mounted?(staging_path, req_id:)
|
|
output, ok = run_cmd("umount", "-q", staging_path, req_id:)
|
|
unless ok
|
|
raise "Failed to unmount #{staging_path}: #{output}"
|
|
end
|
|
end
|
|
NodeUnstageVolumeResponse.new
|
|
rescue => e
|
|
log_and_raise(req_id, e)
|
|
end
|
|
end
|
|
|
|
def prepare_data_migration(client, req_id, volume_id)
|
|
log_with_id(req_id, "Retaining pv with volume_id #{volume_id}")
|
|
pv = retain_pv(req_id, client, volume_id)
|
|
log_with_id(req_id, "Recreating pvc with volume_id #{volume_id}")
|
|
recreate_pvc(req_id, client, pv)
|
|
end
|
|
|
|
def retain_pv(req_id, client, volume_id)
|
|
pv = client.find_pv_by_volume_id(volume_id)
|
|
log_with_id(req_id, "Found PV with volume_id #{volume_id}: #{pv}")
|
|
if pv.dig("spec", "persistentVolumeReclaimPolicy") != "Retain"
|
|
pv["spec"]["persistentVolumeReclaimPolicy"] = "Retain"
|
|
client.update_pv(pv)
|
|
log_with_id(req_id, "Updated PV to retain")
|
|
end
|
|
pv
|
|
end
|
|
|
|
def recreate_pvc(req_id, client, pv)
|
|
pvc_namespace, pvc_name = pv["spec"]["claimRef"].values_at("namespace", "name")
|
|
pv_name = pv.dig("metadata", "name")
|
|
begin
|
|
pvc = client.get_pvc(pvc_namespace, pvc_name)
|
|
rescue ObjectNotFoundError
|
|
old_pvc_object = pv.dig("metadata", "annotations", OLD_PVC_OBJECT_ANNOTATION_KEY)
|
|
if old_pvc_object.empty?
|
|
raise
|
|
end
|
|
pvc = YAML.load(Base64.decode64(old_pvc_object))
|
|
end
|
|
log_with_id(req_id, "Found matching PVC for PV #{pv_name}: #{pvc}")
|
|
pvc_uid = pvc.dig("metadata", "uid")
|
|
pvc_deletion_timestamp = pvc.dig("metadata", "deletionTimestamp")
|
|
trim_pvc(pvc, pv_name)
|
|
log_with_id(req_id, "Trimmed PVC for recreation: #{pvc}")
|
|
|
|
client.patch_resource("pv", pv_name, OLD_PVC_OBJECT_ANNOTATION_KEY, Base64.strict_encode64(YAML.dump(pvc)))
|
|
|
|
if pvc_uid == pv_name.delete_prefix("pvc-")
|
|
if !pvc_deletion_timestamp
|
|
client.delete_pvc(pvc_namespace, pvc_name)
|
|
log_with_id(req_id, "Deleted PVC #{pvc_namespace}/#{pvc_name}")
|
|
client.remove_pvc_finalizers(pvc_namespace, pvc_name)
|
|
log_with_id(req_id, "Removed PVC finalizers #{pvc_namespace}/#{pvc_name}")
|
|
end
|
|
client.create_pvc(pvc)
|
|
log_with_id(req_id, "Recreated PVC with the new spec")
|
|
else
|
|
# PVC is recreated now.
|
|
# At this stage we don't know whether we have created the PVC or
|
|
# Statefulset controller has created it. We just need to make sure
|
|
# the csi.ubicloud.com/old-pv-name annotation is set.
|
|
client.patch_resource("pvc", pvc_name, OLD_PV_NAME_ANNOTATION_KEY, pv_name, namespace: pvc_namespace)
|
|
end
|
|
end
|
|
|
|
def trim_pvc(pvc, pv_name)
|
|
pvc["metadata"]["annotations"] ||= {}
|
|
%W[#{OLD_PVC_OBJECT_ANNOTATION_KEY} volume.kubernetes.io/selected-node pv.kubernetes.io/bind-completed].each do |key|
|
|
pvc["metadata"]["annotations"].delete(key)
|
|
end
|
|
%w[resourceVersion uid creationTimestamp deletionTimestamp deletionGracePeriodSeconds].each do |key|
|
|
pvc["metadata"].delete(key)
|
|
end
|
|
pvc["spec"].delete("volumeName")
|
|
pvc.delete("status")
|
|
pvc["metadata"]["annotations"][OLD_PV_NAME_ANNOTATION_KEY] = pv_name
|
|
pvc
|
|
end
|
|
|
|
def node_publish_volume(req, _call)
|
|
log_request_response(req, "node_publish_volume") do |req_id|
|
|
staging_path = req.staging_target_path
|
|
target_path = req.target_path
|
|
|
|
unless is_mounted?(target_path, req_id:)
|
|
FileUtils.mkdir_p(target_path)
|
|
output, ok = run_cmd("mount", "--bind", staging_path, target_path, req_id:)
|
|
unless ok
|
|
raise "Failed to bind mount #{staging_path} to #{target_path}: #{output}"
|
|
end
|
|
end
|
|
|
|
NodePublishVolumeResponse.new
|
|
rescue => e
|
|
log_and_raise(req_id, e)
|
|
end
|
|
end
|
|
|
|
def node_unpublish_volume(req, _call)
|
|
log_request_response(req, "node_unpublish_volume") do |req_id|
|
|
target_path = req.target_path
|
|
|
|
if is_mounted?(target_path, req_id:)
|
|
output, ok = run_cmd("umount", "-q", target_path, req_id:)
|
|
unless ok
|
|
raise "Failed to unmount #{target_path}: #{output}"
|
|
end
|
|
else
|
|
log_with_id(req_id, "#{target_path} is not mounted, skipping umount")
|
|
end
|
|
|
|
NodeUnpublishVolumeResponse.new
|
|
rescue => e
|
|
log_and_raise(req_id, e)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|