With the help of this client, the commands we need in order to implement the data copy and migration is added. During the process we need to create/update/get/list PVs, get nodes and get/create/delete/update PVCs. Dockerfile is updated to install kubectl on the docker images. Note that we won't use any ruby gems for interacting with kubernetes since most are not maintained well and the best they do is assembling a REST client and giving you back the whole response which kubectl does with no effort. Also kubectl will use the credentials avaialbe in pod and won't be affected by token rotations as it would read the token on every command. ClusterRole of node_plugin is also updated to give enough access to perform the commands.
92 lines
2.3 KiB
Ruby
92 lines
2.3 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "open3"
|
|
require "yaml"
|
|
require_relative "errors"
|
|
|
|
module Csi
|
|
class KubernetesClient
|
|
include ServiceHelper
|
|
|
|
def initialize(req_id:, logger:)
|
|
@req_id = req_id
|
|
@logger = logger
|
|
end
|
|
|
|
def log_with_id(req_id, message)
|
|
@logger.debug("[req_id=#{req_id}] #{message}")
|
|
end
|
|
|
|
def run_kubectl(*args, yaml_data: nil)
|
|
cmd = ["kubectl", *args]
|
|
stdin_data = yaml_data ? YAML.dump(yaml_data) : nil
|
|
output, status = run_cmd(*cmd, req_id: @req_id, stdin_data:)
|
|
unless status.success?
|
|
if output.strip.end_with?("not found")
|
|
raise ObjectNotFoundError, output
|
|
end
|
|
raise "Command failed: #{cmd.join(" ")}\nOutput: #{output}"
|
|
end
|
|
output
|
|
end
|
|
|
|
def yaml_load_kubectl(*)
|
|
YAML.safe_load(run_kubectl(*))
|
|
end
|
|
|
|
def get_node(name)
|
|
yaml_load_kubectl("get", "node", name, "-oyaml")
|
|
end
|
|
|
|
def get_node_ip(name)
|
|
node_yaml = get_node(name)
|
|
node_yaml.dig("status", "addresses", 0, "address")
|
|
end
|
|
|
|
def get_pv(name)
|
|
yaml_load_kubectl("get", "pv", name, "-oyaml")
|
|
end
|
|
|
|
def extract_node_from_pv(pv)
|
|
pv.dig("spec", "nodeAffinity", "required", "nodeSelectorTerms", 0, "matchExpressions", 0, "values", 0)
|
|
end
|
|
|
|
def create_pv(yaml_data)
|
|
run_kubectl("create", "-f", "-", yaml_data:)
|
|
end
|
|
|
|
def update_pv(yaml_data)
|
|
run_kubectl("apply", "-f", "-", yaml_data:)
|
|
end
|
|
|
|
def get_pvc(namespace, name)
|
|
yaml_load_kubectl("-n", namespace, "get", "pvc", name, "-oyaml")
|
|
end
|
|
|
|
def create_pvc(yaml_data)
|
|
run_kubectl("create", "-f", "-", yaml_data:)
|
|
end
|
|
|
|
def update_pvc(yaml_data)
|
|
run_kubectl("apply", "-f", "-", yaml_data:)
|
|
end
|
|
|
|
def delete_pvc(namespace, name)
|
|
run_kubectl("-n", namespace, "delete", "pvc", name, "--wait=false", "--ignore-not-found=true")
|
|
end
|
|
|
|
def node_schedulable?(name)
|
|
!get_node(name).dig("spec", "unschedulable")
|
|
end
|
|
|
|
def find_pv_by_volume_id(volume_id)
|
|
pv_list = yaml_load_kubectl("get", "pv", "-oyaml")
|
|
pv = pv_list["items"].find { |pv| pv.dig("spec", "csi", "volumeHandle") == volume_id }
|
|
|
|
raise ObjectNotFoundError, "PersistentVolume with volumeHandle '#{volume_id}' not found" unless pv
|
|
|
|
pv
|
|
end
|
|
end
|
|
end
|