diff --git a/lib/ha_handler/application.ex b/lib/ha_handler/application.ex index d928238..725967f 100644 --- a/lib/ha_handler/application.ex +++ b/lib/ha_handler/application.ex @@ -11,7 +11,8 @@ defmodule HAHandler.Application do def start(_type, _args) do children = [ {Plug.Cowboy, scheme: :http, plug: HAHandler.Web.Router, options: [port: HAHandler.http_port()]}, - {HAHandler.PGSQL.Supervisor, HAHandler.pgsql_instances()} + {HAHandler.PGSQL.Supervisor, HAHandler.pgsql_instances()}, + {HAHandler.Control, []} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/ha_handler/control.ex b/lib/ha_handler/control.ex new file mode 100644 index 0000000..ea13960 --- /dev/null +++ b/lib/ha_handler/control.ex @@ -0,0 +1,82 @@ +defmodule HAHandler.Control do + @moduledoc """ + This module handles the decision-logic and actions to be + taken regarding the current state of the infrastructure. + """ + + @haproxy_pgsql_backend "pgsql" + + use GenServer + + require Logger + + alias HAHandler.{PGSQL, HAProxy} + + # How much do we wait (ms) between each check/decision-making round? + @refresh 15_000 + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + state = [] + + # Let's skip the initial startup round so that other components are all up + # and running. + Process.send_after self(), :sync, @refresh + + {:ok, state} + end + + @impl true + def handle_info(:sync, state) do + Logger.debug("Executing control logic.") + + # Fetch PGSQL state, make sure HAProxy routes to the master + # process. + pgsql_state = PGSQL.get_instances() + |> Enum.map(fn {hostname, pid} = instance-> + haproxy_server = HAHandler.pgsql_instances() + |> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end) + |> Enum.at(0) + |> Keyword.get(:haproxy_server) + + %{ + haproxy_server: haproxy_server, + pgsql_watcher_pid: pid, + pgsql_operation_mode: PGSQL.get_operation_mode(instance) + } + end) + haproxy_state = HAProxy.get_stats() + |> Map.get("Server", []) + |> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_pgsql_backend end) + |> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end) + |> Enum.reduce(&Map.merge/2) + + for pgsql_instance <- pgsql_state do + haproxy_state = Map.get(haproxy_state, pgsql_instance.haproxy_server) + + case {pgsql_instance.pgsql_operation_mode, haproxy_state} do + {:primary, "UP"} -> + :noop + {:primary, "MAINT"} -> + Logger.info("Enabling routing PGSQL to (now) primary #{pgsql_instance.haproxy_server}.") + HAProxy.set_server(@haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "ready") + {:secondary, "UP"} -> + Logger.info("Disabling routing PGSQL to (now) secondary #{pgsql_instance.haproxy_server}.") + HAProxy.set_server(@haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "maint") + {:secondary, "MAINT"} -> + :noop + unknown -> + Logger.warning("Unhandled PGSQL/HAProxy state: #{unknown}") + end + end + + # Schedule next round. + Process.send_after self(), :sync, @refresh + + {:noreply, state} + end +end diff --git a/lib/ha_handler/ha_proxy.ex b/lib/ha_handler/ha_proxy.ex index efb2b3c..f3e400a 100644 --- a/lib/ha_handler/ha_proxy.ex +++ b/lib/ha_handler/ha_proxy.ex @@ -104,6 +104,22 @@ defmodule HAHandler.HAProxy do end end + @doc """ + Set a server's properties, as per [1]. + + [1] https://www.haproxy.com/documentation/hapee/latest/api/runtime-api/set-server/ + """ + def set_server(backend, server, key, value) do + case execute("set server #{backend}/#{server} #{key} #{value}") do + {:ok, ""} -> + :ok + {:ok, err} -> + {:error, err} + {:error, err} -> + {:error, err} + end + end + # Opens an UNIX socket - there is no built-in support in Elixir/Erlang so we # use low-level C bindings provided by :procket # (https://github.com/msantos/procket).