control: route pgsql traffic to primary servers
This commit is contained in:
parent
d1d31ca883
commit
0ec71ea8bb
3 changed files with 100 additions and 1 deletions
|
@ -11,7 +11,8 @@ defmodule HAHandler.Application do
|
||||||
def start(_type, _args) do
|
def start(_type, _args) do
|
||||||
children = [
|
children = [
|
||||||
{Plug.Cowboy, scheme: :http, plug: HAHandler.Web.Router, options: [port: HAHandler.http_port()]},
|
{Plug.Cowboy, scheme: :http, plug: HAHandler.Web.Router, options: [port: HAHandler.http_port()]},
|
||||||
{HAHandler.PGSQL.Supervisor, HAHandler.pgsql_instances()}
|
{HAHandler.PGSQL.Supervisor, HAHandler.pgsql_instances()},
|
||||||
|
{HAHandler.Control, []}
|
||||||
]
|
]
|
||||||
|
|
||||||
# See https://hexdocs.pm/elixir/Supervisor.html
|
# See https://hexdocs.pm/elixir/Supervisor.html
|
||||||
|
|
82
lib/ha_handler/control.ex
Normal file
82
lib/ha_handler/control.ex
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
defmodule HAHandler.Control do
|
||||||
|
@moduledoc """
|
||||||
|
This module handles the decision-logic and actions to be
|
||||||
|
taken regarding the current state of the infrastructure.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@haproxy_pgsql_backend "pgsql"
|
||||||
|
|
||||||
|
use GenServer
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias HAHandler.{PGSQL, HAProxy}
|
||||||
|
|
||||||
|
# How much do we wait (ms) between each check/decision-making round?
|
||||||
|
@refresh 15_000
|
||||||
|
|
||||||
|
def start_link(opts) do
|
||||||
|
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def init(_opts) do
|
||||||
|
state = []
|
||||||
|
|
||||||
|
# Let's skip the initial startup round so that other components are all up
|
||||||
|
# and running.
|
||||||
|
Process.send_after self(), :sync, @refresh
|
||||||
|
|
||||||
|
{:ok, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_info(:sync, state) do
|
||||||
|
Logger.debug("Executing control logic.")
|
||||||
|
|
||||||
|
# Fetch PGSQL state, make sure HAProxy routes to the master
|
||||||
|
# process.
|
||||||
|
pgsql_state = PGSQL.get_instances()
|
||||||
|
|> Enum.map(fn {hostname, pid} = instance->
|
||||||
|
haproxy_server = HAHandler.pgsql_instances()
|
||||||
|
|> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end)
|
||||||
|
|> Enum.at(0)
|
||||||
|
|> Keyword.get(:haproxy_server)
|
||||||
|
|
||||||
|
%{
|
||||||
|
haproxy_server: haproxy_server,
|
||||||
|
pgsql_watcher_pid: pid,
|
||||||
|
pgsql_operation_mode: PGSQL.get_operation_mode(instance)
|
||||||
|
}
|
||||||
|
end)
|
||||||
|
haproxy_state = HAProxy.get_stats()
|
||||||
|
|> Map.get("Server", [])
|
||||||
|
|> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_pgsql_backend end)
|
||||||
|
|> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end)
|
||||||
|
|> Enum.reduce(&Map.merge/2)
|
||||||
|
|
||||||
|
for pgsql_instance <- pgsql_state do
|
||||||
|
haproxy_state = Map.get(haproxy_state, pgsql_instance.haproxy_server)
|
||||||
|
|
||||||
|
case {pgsql_instance.pgsql_operation_mode, haproxy_state} do
|
||||||
|
{:primary, "UP"} ->
|
||||||
|
:noop
|
||||||
|
{:primary, "MAINT"} ->
|
||||||
|
Logger.info("Enabling routing PGSQL to (now) primary #{pgsql_instance.haproxy_server}.")
|
||||||
|
HAProxy.set_server(@haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "ready")
|
||||||
|
{:secondary, "UP"} ->
|
||||||
|
Logger.info("Disabling routing PGSQL to (now) secondary #{pgsql_instance.haproxy_server}.")
|
||||||
|
HAProxy.set_server(@haproxy_pgsql_backend, pgsql_instance.haproxy_server, "state", "maint")
|
||||||
|
{:secondary, "MAINT"} ->
|
||||||
|
:noop
|
||||||
|
unknown ->
|
||||||
|
Logger.warning("Unhandled PGSQL/HAProxy state: #{unknown}")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Schedule next round.
|
||||||
|
Process.send_after self(), :sync, @refresh
|
||||||
|
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
end
|
|
@ -104,6 +104,22 @@ defmodule HAHandler.HAProxy do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Set a server's properties, as per [1].
|
||||||
|
|
||||||
|
[1] https://www.haproxy.com/documentation/hapee/latest/api/runtime-api/set-server/
|
||||||
|
"""
|
||||||
|
def set_server(backend, server, key, value) do
|
||||||
|
case execute("set server #{backend}/#{server} #{key} #{value}") do
|
||||||
|
{:ok, ""} ->
|
||||||
|
:ok
|
||||||
|
{:ok, err} ->
|
||||||
|
{:error, err}
|
||||||
|
{:error, err} ->
|
||||||
|
{:error, err}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# Opens an UNIX socket - there is no built-in support in Elixir/Erlang so we
|
# Opens an UNIX socket - there is no built-in support in Elixir/Erlang so we
|
||||||
# use low-level C bindings provided by :procket
|
# use low-level C bindings provided by :procket
|
||||||
# (https://github.com/msantos/procket).
|
# (https://github.com/msantos/procket).
|
||||||
|
|
Loading…
Reference in a new issue