In monitor, we create 2 threads per resource; one for SSH event loop processing and one for actual pulse check. In previous version, each resource would keep their threads even after the pulse check is completed. This means the number of resources we can monitor at the same time is limited by the number of threads we can create. This commit changes the behavior so that after the pulse check is completed, the threads are released. This way, we can monitor significantly more resources at the same time. One drawback of the new approach is that we need to re-create the threads for each check. In my system creating 1000 threads takes about 0.025 seconds, so overhead is seems negligible. I also added a new helper method, needs_event_loop_for_pulse_check? to models. We actually don't need event loop for pulse check for most of the resources, only PostgresServer and MinioServer need it. Other resources rely on exec! to perform their pulse check which doesn't need event loop. In fact, I observed that extra event loop processing actually slows down the exec! calls. By taking this into consideration, we reduce the number of threads we create and also improve the speed of some pulse checks. Another change we are making is that removing the monitoring_interval from the model and hardcoding it in the monitor as 5 seconds. This removes capability of setting different monitoring intervals for different resources. Supporting this would require some work and since it is not used in the current implementation I decided to remove all together. If we need to support this in the future, we can add it back with some effort.
99 lines
2.7 KiB
Ruby
99 lines
2.7 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "net/ssh"
|
|
require_relative "../../model"
|
|
|
|
class MinioServer < Sequel::Model
|
|
one_to_one :strand, key: :id
|
|
many_to_one :project
|
|
many_to_one :vm, key: :vm_id, class: :Vm
|
|
one_to_many :active_billing_records, class: :BillingRecord, key: :resource_id, conditions: {Sequel.function(:upper, :span) => nil}
|
|
many_to_one :pool, key: :minio_pool_id, class: :MinioPool
|
|
one_through_many :cluster, [[:minio_server, :id, :minio_pool_id], [:minio_pool, :id, :cluster_id]], class: :MinioCluster
|
|
|
|
dataset_module Authorization::Dataset
|
|
|
|
include ResourceMethods
|
|
include SemaphoreMethods
|
|
include HealthMonitorMethods
|
|
|
|
semaphore :checkup, :destroy, :restart, :reconfigure, :refresh_certificates, :initial_provisioning
|
|
|
|
plugin :column_encryption do |enc|
|
|
enc.column :cert_key
|
|
end
|
|
|
|
def hostname
|
|
"#{cluster.name}#{index}.#{Config.minio_host_name}"
|
|
end
|
|
|
|
def private_ipv4_address
|
|
vm.nics.first.private_ipv4.network.to_s
|
|
end
|
|
|
|
def minio_volumes
|
|
cluster.pools.map do |pool|
|
|
pool.volumes_url
|
|
end.join(" ")
|
|
end
|
|
|
|
def ip4_url
|
|
"https://#{vm.ephemeral_net4}:9000"
|
|
end
|
|
|
|
def endpoint
|
|
cluster.dns_zone ? "#{hostname}:9000" : "#{vm.ephemeral_net4}:9000"
|
|
end
|
|
|
|
def init_health_monitor_session
|
|
socket_path = File.join(Dir.pwd, "var", "health_monitor_sockets", "ms_#{vm.ephemeral_net6.nth(2)}")
|
|
FileUtils.rm_rf(socket_path)
|
|
FileUtils.mkdir_p(socket_path)
|
|
|
|
ssh_session = vm.sshable.start_fresh_session
|
|
ssh_session.forward.local(UNIXServer.new(File.join(socket_path, "health_monitor_socket")), private_ipv4_address, 9000)
|
|
{
|
|
ssh_session: ssh_session,
|
|
minio_client: client(socket: File.join("unix://", socket_path, "health_monitor_socket"))
|
|
}
|
|
end
|
|
|
|
def check_pulse(session:, previous_pulse:)
|
|
reading = begin
|
|
server_data = JSON.parse(session[:minio_client].admin_info.body)["servers"].find { _1["endpoint"] == endpoint }
|
|
(server_data["state"] == "online" && server_data["drives"].all? { _1["state"] == "ok" }) ? "up" : "down"
|
|
rescue
|
|
"down"
|
|
end
|
|
pulse = aggregate_readings(previous_pulse: previous_pulse, reading: reading)
|
|
|
|
if pulse[:reading] == "down" && pulse[:reading_rpt] > 5 && Time.now - pulse[:reading_chg] > 30 && !reload.checkup_set?
|
|
incr_checkup
|
|
end
|
|
|
|
pulse
|
|
end
|
|
|
|
def needs_event_loop_for_pulse_check?
|
|
true
|
|
end
|
|
|
|
def server_url
|
|
cluster.url || ip4_url
|
|
end
|
|
|
|
def client(socket: nil)
|
|
Minio::Client.new(
|
|
endpoint: server_url,
|
|
access_key: cluster.admin_user,
|
|
secret_key: cluster.admin_password,
|
|
ssl_ca_file_data: cluster.root_certs,
|
|
socket: socket
|
|
)
|
|
end
|
|
|
|
def self.redacted_columns
|
|
super + [:cert]
|
|
end
|
|
end
|