defmodule HAHandler.Control do @moduledoc """ This module handles the decision-logic and actions to be taken regarding the current state of the infrastructure. FIXME: POC quickly hacked together, there's a lot of weak code duplicated around. """ @haproxy_pgsql_backend "pgsql" @haproxy_drbd_backend "sshfs" use GenServer require Logger alias HAHandler.{PGSQL, HAProxy, DRBD} # How much do we wait (ms) between each check/decision-making round? @refresh 15_000 def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end @impl true def init(_opts) do state = [] # Let's skip the initial startup round so that other components are all up # and running. Process.send_after(self(), :sync, @refresh) {:ok, state} end defp process_pgsql() do # Fetch PGSQL state, make sure HAProxy routes to the master # process. pgsql_state = PGSQL.get_instances() |> Enum.map(fn {hostname, pid} = instance -> haproxy_server = HAHandler.pgsql_instances() |> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end) |> Enum.at(0) |> Keyword.get(:haproxy_server) %{ haproxy_server: haproxy_server, pgsql_watcher_pid: pid, pgsql_operation_mode: PGSQL.get_operation_mode(instance) } end) haproxy_state = HAProxy.get_stats() |> Map.get("Server", []) |> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_pgsql_backend end) |> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end) |> Enum.reduce(&Map.merge/2) for pgsql_instance <- pgsql_state do haproxy_state = Map.get(haproxy_state, pgsql_instance.haproxy_server) case {pgsql_instance.pgsql_operation_mode, haproxy_state} do {:primary, "UP"} -> :noop {:primary, "MAINT"} -> Logger.info("Enabling routing PGSQL to (now) primary #{pgsql_instance.haproxy_server}.") HAProxy.set_server( @haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "ready" ) {:secondary, "UP"} -> Logger.info( "Disabling routing PGSQL to (now) secondary #{pgsql_instance.haproxy_server}." ) HAProxy.set_server( @haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "maint" ) {:secondary, "MAINT"} -> :noop unknown -> Logger.warning("Unhandled PGSQL/HAProxy state: #{inspect(unknown)}") end end end defp process_drbd() do drbd_state = DRBD.get_instances() |> Enum.map(fn {hostname, pid} = instance -> haproxy_server = HAHandler.drbd_instances() |> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end) |> Enum.at(0) |> Keyword.get(:haproxy_server) %{ haproxy_server: haproxy_server, drbd_watcher_pid: pid, drbd_state: DRBD.get_state(instance) } end) haproxy_state = HAProxy.get_stats() |> Map.get("Server", []) |> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_drbd_backend end) |> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end) |> Enum.reduce(&Map.merge/2) for drbd_instance <- drbd_state do haproxy_state = Map.get(haproxy_state, drbd_instance.haproxy_server) case {drbd_instance.drbd_state.mode, haproxy_state} do {"Secondary/Primary", "UP"} -> Logger.info( "Disabling routing SSHFS to (now) secondary #{drbd_instance.haproxy_server}." ) HAProxy.set_server( @haproxy_drbd_backend, drbd_instance.haproxy_server, "state", "maint" ) {"Primary/Secondary", "UP"} -> :noop {"Secondary/Primary", "MAINT"} -> :noop {"Primary/Secondary", "MAINT"} -> Logger.info("Enabling routing SSHFS to (now) primary #{drbd_instance.haproxy_server}.") HAProxy.set_server( @haproxy_drbd_backend, drbd_instance.haproxy_server, "state", "ready" ) unknown -> Logger.warning("Unknown DRBD/HAProxy state: #{inspect(unknown)}") Logger.info( "Disabling routing SSHFS to (likely) failed #{drbd_instance.haproxy_server}." ) HAProxy.set_server( @haproxy_drbd_backend, drbd_instance.haproxy_server, "state", "maint" ) end end end @impl true def handle_info(:sync, state) do Logger.debug("Executing control logic.") case HAProxy.get_stats() do %{} -> process_pgsql() process_drbd() {:error, err} -> Logger.error("Unable to fetch HAProxy state (#{inspect(err)}) - skipping control loop.") end # Schedule next round. Process.send_after(self(), :sync, @refresh) {:noreply, state} end end