diff --git a/Gemfile.lock b/Gemfile.lock index 08fe44e..a7454cd 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH puma-status (1.7) net_http_unix (~> 0.2) parallel (~> 1) + websocket (~> 1.2) GEM remote: https://rubygems.org/ @@ -26,6 +27,7 @@ GEM rspec-support (~> 3.12.0) rspec-support (3.12.1) timecop (0.9.8) + websocket (1.2.11) PLATFORMS ruby diff --git a/lib/core.rb b/lib/core.rb index 5322966..3d78fa7 100644 --- a/lib/core.rb +++ b/lib/core.rb @@ -76,7 +76,7 @@ def hydrate_stats(stats, puma_state, state_file_path) end def format_stats(stats) - master_line = "#{stats.pid} (#{stats.state_file_path})" + master_line = "#{stats.origin} #{stats.pid} (#{stats.state_file_path})" master_line += " Version: #{stats.version} |" if stats.version master_line += " Uptime: #{seconds_to_human(stats.uptime)}" master_line += " | Phase: #{stats.phase}" if stats.phase diff --git a/lib/puma-status.rb b/lib/puma-status.rb index 4cdd6b8..a009367 100644 --- a/lib/puma-status.rb +++ b/lib/puma-status.rb @@ -1,6 +1,92 @@ require_relative './helpers' require_relative './core.rb' require 'parallel' +require 'open3' +require 'net/http' +require 'uri' +require 'openssl' +require 'websocket' + +def get_k8s_token_and_server + kube_config = YAML.load(File.read(File.expand_path("~/.kube/config"))) + + current_context = kube_config["current-context"] + context = kube_config["contexts"].find { |c| c["name"] == current_context }["context"] + cluster = kube_config["clusters"].find { |c| c["name"] == context["cluster"] }["cluster"] + user = kube_config["users"].find { |u| u["name"] == context["user"] }["user"] + + stdout_str, stderr_str, status = Open3.capture3(user.dig('exec', 'env').reduce({}) { |a,e| a[e['name']] = e['value']; a }, user.dig('exec', 'command'), *user.dig('exec', 'args')) + token = JSON.parse(stdout_str)["status"]["token"] + + return token, cluster['server'] +end + +def get_pods(token, server) + uri = URI.parse("#{server}/api/v1/namespaces/dev/pods?labelSelector=container%3Ddwe") + request = Net::HTTP::Get.new(uri) + request["Authorization"] = "Bearer #{token}" + + req_options = { + use_ssl: uri.scheme == "https", + verify_mode: OpenSSL::SSL::VERIFY_NONE, + } + + response = Net::HTTP.start(uri.hostname, uri.port, req_options) do |http| + http.request(request) + end + + pods = JSON.parse(response.body)["items"].map { |pod| pod["metadata"]["name"] } +end + +def get_stats_for_pod(token, server, pod, state_file_path) + uri = URI.parse("#{server}/api/v1/namespaces/dev/pods/#{pod}/exec") + uri.scheme = 'wss' + uri.port = 443 + + commands = <<~BASH.split("\n") + export $(awk -F": " '/:/ {print toupper($1)"="$2}' #{state_file_path}) + curl -s --unix-socket $(echo "$CONTROL_URL" | sed 's#^unix://##') http://localhost/stats?token=$CONTROL_AUTH_TOKEN + BASH + bash_command = ["/bin/bash", "-lc", commands.join(" && ")].map { URI.encode_www_form_component(_1) }.join("&command=") + + socket = TCPSocket.new(uri.host, uri.port) + ssl_context = OpenSSL::SSL::SSLContext.new + ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE + ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context) + uri.query = "command=#{bash_command}&stderr=true&stdin=false&stdout=true&tty=false" + + handshake = ::WebSocket::Handshake::Client.new :url => uri.to_s, headers: { "Authorization" => "Bearer #{token}", "Sec-WebSocket-Protocol" => "v4.channel.k8s.io" } + frame = ::WebSocket::Frame::Incoming::Client.new + + ssl_socket.connect + ssl_socket.write(handshake.to_s) + + until handshake.finished? + recv = ssl_socket.gets + handshake << recv + end + + while recv = ssl_socket.gets + frame << recv + end + + stats = nil + while d = frame.next + case d.type + when :binary + if d.to_s[0] == "\u0001" + stats = d.to_s[1..-1] + end + when :close + break + end + end + + ssl_socket.close + socket.close + + return Stats.new(JSON.parse(stats), origin: pod) +end def run debug "puma-status" @@ -12,11 +98,22 @@ def run end errors = [] - + + token, server = get_k8s_token_and_server + pods = get_pods(token, server) + outputs = Parallel.map(ARGV, in_threads: ARGV.count) do |state_file_path| begin debug "State file: #{state_file_path}" - format_stats(get_stats(state_file_path)) + + if true + pods.map do |pod| + stats = get_stats_for_pod(token, server, pod, state_file_path) + format_stats(stats) + end.join("\n") + else + format_stats(get_stats(state_file_path)) + end rescue Errno::ENOENT => e if e.message =~ /#{state_file_path}/ errors << "#{yellow(state_file_path)} doesn't exist" diff --git a/lib/stats.rb b/lib/stats.rb index bac3e09..2b17060 100644 --- a/lib/stats.rb +++ b/lib/stats.rb @@ -22,7 +22,7 @@ def mem=(mem) end def mem - @wstats['mem'] + @wstats['mem'] || 0 end def pcpu=(pcpu) @@ -30,7 +30,7 @@ def pcpu=(pcpu) end def pcpu - @wstats['pcpu'] + @wstats['pcpu'] || 0 end def booting? @@ -84,10 +84,15 @@ def last_checkin end end - def initialize(stats) + def initialize(stats, origin: 'local') + @origin = origin @stats = stats end + def origin + @origin + end + def workers @workers ||= (@stats['worker_status'] || [@stats]).map { |wstats| Worker.new(wstats) } end diff --git a/puma-status.gemspec b/puma-status.gemspec index e1de514..c605cbb 100644 --- a/puma-status.gemspec +++ b/puma-status.gemspec @@ -14,6 +14,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "net_http_unix", '~> 0.2' s.add_runtime_dependency "parallel", '~> 1' + s.add_runtime_dependency "websocket", '~> 1.2' s.add_development_dependency "rspec", '~> 3.8' s.add_development_dependency "climate_control", '~> 0.2'