defmodule HAHandler.Control do @moduledoc """ This module handles the decision-logic and actions to be taken regarding the current state of the infrastructure. """ @haproxy_pgsql_backend "pgsql" use GenServer require Logger alias HAHandler.{PGSQL, HAProxy} # How much do we wait (ms) between each check/decision-making round? @refresh 15_000 def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end @impl true def init(_opts) do state = [] # Let's skip the initial startup round so that other components are all up # and running. Process.send_after(self(), :sync, @refresh) {:ok, state} end @impl true def handle_info(:sync, state) do Logger.debug("Executing control logic.") # Fetch PGSQL state, make sure HAProxy routes to the master # process. pgsql_state = PGSQL.get_instances() |> Enum.map(fn {hostname, pid} = instance -> haproxy_server = HAHandler.pgsql_instances() |> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end) |> Enum.at(0) |> Keyword.get(:haproxy_server) %{ haproxy_server: haproxy_server, pgsql_watcher_pid: pid, pgsql_operation_mode: PGSQL.get_operation_mode(instance) } end) haproxy_state = HAProxy.get_stats() |> Map.get("Server", []) |> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_pgsql_backend end) |> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end) |> Enum.reduce(&Map.merge/2) for pgsql_instance <- pgsql_state do haproxy_state = Map.get(haproxy_state, pgsql_instance.haproxy_server) case {pgsql_instance.pgsql_operation_mode, haproxy_state} do {:primary, "UP"} -> :noop {:primary, "MAINT"} -> Logger.info("Enabling routing PGSQL to (now) primary #{pgsql_instance.haproxy_server}.") HAProxy.set_server( @haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "ready" ) {:secondary, "UP"} -> Logger.info( "Disabling routing PGSQL to (now) secondary #{pgsql_instance.haproxy_server}." ) HAProxy.set_server( @haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "maint" ) {:secondary, "MAINT"} -> :noop unknown -> Logger.warning("Unhandled PGSQL/HAProxy state: #{inspect(unknown)}") end end # Schedule next round. Process.send_after(self(), :sync, @refresh) {:noreply, state} end end