From 722bb6941594e091d0e66cfb8ccb2e1486e6855f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timoth=C3=A9e=20Floure?= Date: Tue, 25 Jan 2022 09:45:11 +0100 Subject: [PATCH] Refactor HAProxy interface --- lib/ha_handler/ha_proxy.ex | 139 +++++++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 35 deletions(-) diff --git a/lib/ha_handler/ha_proxy.ex b/lib/ha_handler/ha_proxy.ex index 15df271..122c4d3 100644 --- a/lib/ha_handler/ha_proxy.ex +++ b/lib/ha_handler/ha_proxy.ex @@ -1,23 +1,108 @@ defmodule HAHandler.HAProxy do + @moduledoc """ + Interface to HAProxy's runtime API: + https://www.haproxy.com/documentation/hapee/latest/api/runtime-api/ + """ + + # TODO: can we keep the socket open by keeping the port state in a GenServer + # and abusing the prompt mode of HAProxy's socket somehow? The later is + # somewhat clunky, but I'm not sure if there is a way around it since HAProxy + # seems to return EOF once a command is executed (unless prompt mode is + # enabled). + + import HAHandler, only: [haproxy_socket: 0] + alias :procket, as: Socket - @haproxy_socket "/run/haproxy.sock" + # How long do we wait for an answer - in milliseconds. + @socket_read_timeout 5_000 - def open_socket() do - pad = 8 * (Socket.unix_path_max() - byte_size(@haproxy_socket)) + @doc """ + Sends a command to HAProxy's socket - please refer to HAProxy's runtime API + documentation for available commands (see moduledoc). - sockaddr = - Socket.sockaddr_common(1, byte_size(@haproxy_socket)) <> @haproxy_socket <> <<0::size(pad)>> + This method will either return an answer with `{:ok, answer}` or error with + `{:error, err}`. + """ + def execute(command) when is_binary(command) do + case open_socket(haproxy_socket()) do + {:ok, socket} -> + send(socket, {self(), {:command, command <> "\n"}}) + read_from_socket(socket) + {:error, err} -> + {:error, err} + end + end - family = 1 + @doc """ + Executes and parse the output of the `show stats` HAProxy command, returning + a list of Maps. - {:ok, socket} = Socket.socket(family, 1, 0) + TODO: extract more informations from stats output. + """ + def get_stats() do + case execute("show stat json") do + {:ok, raw} -> + case Poison.decode(raw) do + {:ok, data} -> + for entry <- data do + attrs = + entry + |> Enum.filter(fn m -> get_in(m, ["field", "name"]) == "pxname" end) + |> Enum.at(0) + + %{ + type: attrs |> Map.get("objType"), + name: get_in(attrs, ["value", "value"]) + } + end + + {:error, err} -> + {:error, err} + end + + {:error, err} -> + {:error, err} + end + end + + # Opens an UNIX socket - there is no built-in support in Elixir/Erlang so we + # use low-level C bindings provided by :procket + # (https://github.com/msantos/procket). + # + # Heavily inspired by the `NVim.Link` module: + # https://github.com/kbrw/neovim-elixir/blob/master/lib/link.ex + defp open_socket(path) do + family = Socket.family(:unix) + type = :stream + # 0 means use the default protocol in the family. + protocol = 0 + + pad = 8 * (Socket.unix_path_max() - byte_size(path)) + sockaddr = Socket.sockaddr_common(family, byte_size(path)) <> path <> <<0::size(pad)>> + + {:ok, socket} = Socket.socket(family, type, protocol) case Socket.connect(socket, sockaddr) do :ok -> - {stdin, stdout} = {socket, socket} - port = Port.open({:fd, stdin, stdout}, [{:line, 10_000}, :binary]) - Process.link(port) + # We connect our UNIX socket to an erlang Port: + # + # > Ports provide a mechanism to start operating system processes + # > external to the Erlang VM and communicate with them via message + # > passing. + # See https://hexdocs.pm/elixir/Port.html for details. + # + # See https://www.erlang.org/doc/man/erlang.html#open_port-2 for more + # port options. + # + # * :binary: all I/O from the port is binary data objects as opposed to + # lists of bytes. + # * Messages are delivered on a per line basis. Each line (delimited by + # the OS-dependent newline sequence) is delivered in a single message. + # The message data format is {Flag, Line}, where Flag is eol or noeol, + # and Line is the data delivered (without the newline sequence). + {fdin, fdout} = {socket, socket} + port = Port.open({:fd, fdin, fdout}, [{:line, 10_000}, :binary]) {:ok, port} @@ -26,37 +111,21 @@ defmodule HAHandler.HAProxy do end end + # Messages may be split due to the `{:line, L}` option specific in + # `open_socket/1`. defp read_from_socket(socket, acc \\ "") do receive do {_port, {:data, {:noeol, data}}} -> read_from_socket(socket, acc <> data) + {_port, {:data, {:eol, data}}} -> - Poison.decode(acc <> data) - _ -> - {:error, :unexpected} - after 5000 -> + {:ok, acc <> data} + + msg -> + {:error, {:unexpected_message, msg}} + after + @socket_read_timeout -> {:error, :timeout} end end - - def get_stats() do - case open_socket() do - {:ok, socket} -> - send socket, {self(), {:command, "show stat json\n"}} - case read_from_socket(socket) do - {:ok, raw} -> - for entry <- raw do - attrs = entry - |> Enum.filter(fn m -> get_in(m, ["field", "name"]) == "pxname" end) - |> Enum.at(0) - - %{type: attrs |> Map.get("objType"), name: get_in(attrs, ["value", "value"])} - end - - {:error, err} -> - {:error, err} - end - {:error, err} -> {:error, err} - end - end end