ha-handler/lib/ha_handler/control.ex
Timothée Floure abcd3337dd
All checks were successful
continuous-integration/drone/push Build is passing
Add minimal handler clustering logic
2022-05-22 14:30:44 +02:00

189 lines
5.1 KiB
Elixir

defmodule HAHandler.Control do
@moduledoc """
This module handles the decision-logic and actions to be
taken regarding the current state of the infrastructure.
FIXME: POC quickly hacked together, there's a lot of weak code duplicated
around.
"""
@haproxy_pgsql_backend "pgsql"
@haproxy_drbd_backend "sshfs"
use GenServer
require Logger
alias HAHandler.{PGSQL, HAProxy, DRBD}
# How much do we wait (ms) between each check/decision-making round?
@refresh 15_000
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
state = []
# Let's skip the initial startup round so that other components are all up
# and running.
Process.send_after(self(), :sync, @refresh)
{:ok, state}
end
defp process_pgsql() do
# Fetch PGSQL state, make sure HAProxy routes to the master
# process.
pgsql_state =
PGSQL.get_instances()
|> Enum.map(fn {hostname, pid} = instance ->
haproxy_server =
HAHandler.pgsql_instances()
|> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end)
|> Enum.at(0)
|> Keyword.get(:haproxy_server)
%{
haproxy_server: haproxy_server,
pgsql_watcher_pid: pid,
pgsql_operation_mode: PGSQL.get_operation_mode(instance)
}
end)
haproxy_state =
HAProxy.get_stats()
|> Map.get("Server", [])
|> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_pgsql_backend end)
|> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end)
|> Enum.reduce(&Map.merge/2)
for pgsql_instance <- pgsql_state do
haproxy_state = Map.get(haproxy_state, pgsql_instance.haproxy_server)
case {pgsql_instance.pgsql_operation_mode, haproxy_state} do
{:primary, "UP"} ->
:noop
{:primary, "MAINT"} ->
Logger.info("Enabling routing PGSQL to (now) primary #{pgsql_instance.haproxy_server}.")
HAProxy.set_server(
@haproxy_pgsql_backend,
pgsql_instance.haproxy_server,
"state",
"ready"
)
{:secondary, "UP"} ->
Logger.info(
"Disabling routing PGSQL to (now) secondary #{pgsql_instance.haproxy_server}."
)
HAProxy.set_server(
@haproxy_pgsql_backend,
pgsql_instance.haproxy_server,
"state",
"maint"
)
{:secondary, "MAINT"} ->
:noop
unknown ->
Logger.warning("Unhandled PGSQL/HAProxy state: #{inspect(unknown)}")
end
end
end
defp process_drbd() do
drbd_state =
DRBD.get_instances()
|> Enum.map(fn {hostname, pid} = instance ->
haproxy_server =
HAHandler.drbd_instances()
|> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end)
|> Enum.at(0)
|> Keyword.get(:haproxy_server)
%{
haproxy_server: haproxy_server,
drbd_watcher_pid: pid,
drbd_state: DRBD.get_state(instance)
}
end)
haproxy_state =
HAProxy.get_stats()
|> Map.get("Server", [])
|> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_drbd_backend end)
|> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end)
|> Enum.reduce(&Map.merge/2)
for drbd_instance <- drbd_state do
haproxy_state = Map.get(haproxy_state, drbd_instance.haproxy_server)
case {drbd_instance.drbd_state.mode, haproxy_state} do
{"Secondary/Primary", "UP"} ->
Logger.info(
"Disabling routing SSHFS to (now) secondary #{drbd_instance.haproxy_server}."
)
HAProxy.set_server(
@haproxy_drbd_backend,
drbd_instance.haproxy_server,
"state",
"maint"
)
{"Primary/Secondary", "UP"} ->
:noop
{"Secondary/Primary", "MAINT"} ->
:noop
{"Primary/Secondary", "MAINT"} ->
Logger.info("Enabling routing SSHFS to (now) primary #{drbd_instance.haproxy_server}.")
HAProxy.set_server(
@haproxy_pgsql_backend,
drbd_instance.haproxy_server,
"state",
"ready"
)
unknown ->
Logger.warning("Unknown DRBD/HAProxy state: #{inspect(unknown)}")
Logger.info(
"Disabling routing SSHFS to (likely) failed #{drbd_instance.haproxy_server}."
)
HAProxy.set_server(
@haproxy_drbd_backend,
drbd_instance.haproxy_server,
"state",
"maint"
)
end
end
end
@impl true
def handle_info(:sync, state) do
Logger.debug("Executing control logic.")
case HAProxy.get_stats() do
%{} ->
process_pgsql()
process_drbd()
{:error, err} ->
Logger.error("Unable to fetch HAProxy state (#{inspect(err)}) - skipping control loop.")
end
# Schedule next round.
Process.send_after(self(), :sync, @refresh)
{:noreply, state}
end
end