Compare commits
13 Commits
Author | SHA1 | Date |
---|---|---|
Timothée Floure | 31fe677c20 | |
Timothée Floure | b4eb4f524d | |
Timothée Floure | 992ff7f5ef | |
Timothée Floure | fb3338b4d9 | |
Timothée Floure | 884796d50c | |
Timothée Floure | 4a2b6a4948 | |
Timothée Floure | aeb6db4f77 | |
Timothée Floure | 06b52b3b2a | |
Timothée Floure | fa05a3d7d3 | |
Timothée Floure | f4b6c0f929 | |
Timothée Floure | abcd3337dd | |
Timothée Floure | 77ebea3746 | |
Timothée Floure | 9915bff2a7 |
|
@ -4,7 +4,7 @@ name: default
|
|||
|
||||
steps:
|
||||
- name: build-release
|
||||
image: alpine:latest
|
||||
image: alpine:3.15
|
||||
environment:
|
||||
MIX_ENV: prod
|
||||
commands:
|
||||
|
@ -17,7 +17,7 @@ steps:
|
|||
- cd _build/prod/rel
|
||||
- tar czf "ha-handler-$(git describe --exact-match --tags $(git log -n1 --pretty='%h') || git rev-parse HEAD).tar.gz" ha_handler/
|
||||
- name: publish-release-archive
|
||||
image: alpine:latest
|
||||
image: alpine:3.15
|
||||
environment:
|
||||
LFTP_PASSWORD:
|
||||
from_secret: ssh_password
|
||||
|
|
16
changelog.md
16
changelog.md
|
@ -1,3 +1,19 @@
|
|||
# 2022-07-04 - v0.4.2
|
||||
|
||||
* Fix eventual crash on failed DRBD backend.
|
||||
* Appsignal: ignore errors on backends (failed PSQL backend currently generate
|
||||
errors, and floods appsignal).
|
||||
|
||||
# 2022-06-13 - v0.4.1
|
||||
|
||||
* Fix crash on failed SSHEx / Postgrex connection failure.
|
||||
|
||||
# 2022-06-09 - v0.4.0
|
||||
|
||||
* Add minimal clustering logic.
|
||||
* Fix crash on unavailable HAProxy socket.
|
||||
* Fix invalid drbd backend state computation for haproxy.
|
||||
|
||||
# 2022-04-20 - v0.3.0
|
||||
|
||||
* Add Appsignal support.
|
||||
|
|
|
@ -1,16 +1,20 @@
|
|||
import Config
|
||||
|
||||
config :ha_handler,
|
||||
http_port: 4000,
|
||||
http_port: 4040,
|
||||
acme_challenge_path: "acme-challenge",
|
||||
haproxy_socket: System.get_env("HAPROXY_SOCKET") || "/var/run/haproxy.sock",
|
||||
handler_instances: [
|
||||
:"ha_handler@fenschel",
|
||||
:"ha_handler2@fenschel"
|
||||
],
|
||||
pgsql_instances: [
|
||||
[
|
||||
hostname: "pgsql.lnth.ch.recycled.cloud",
|
||||
username: "postgres",
|
||||
database: "postgres",
|
||||
haproxy_server: "lnth",
|
||||
password: "eicheeR6ieph5jae7oozahf3vesio9Ae",
|
||||
password: "secret",
|
||||
socket_options: [:inet6],
|
||||
ssl: true
|
||||
],
|
||||
|
@ -19,7 +23,7 @@ config :ha_handler,
|
|||
haproxy_server: "fvil",
|
||||
username: "postgres",
|
||||
database: "postgres",
|
||||
password: "eicheeR6ieph5jae7oozahf3vesio9Ae",
|
||||
password: "secret",
|
||||
socket_options: [:inet6],
|
||||
ssl: true
|
||||
]
|
||||
|
@ -27,12 +31,12 @@ config :ha_handler,
|
|||
drbd_instances: [
|
||||
[
|
||||
hostname: "drbd.lnth.ch.recycled.cloud",
|
||||
password: "tu9laiz9teece6aithohjohph6eCh3qu",
|
||||
password: "secret",
|
||||
haproxy_server: "lnth"
|
||||
],
|
||||
[
|
||||
hostname: "drbd.fvil.ch.recycled.cloud",
|
||||
password: "tu9laiz9teece6aithohjohph6eCh3qu",
|
||||
password: "secret",
|
||||
haproxy_server: "fvil"
|
||||
]
|
||||
]
|
||||
|
@ -42,4 +46,5 @@ config :appsignal, :config,
|
|||
otp_app: :ha_handler,
|
||||
name: "ha-handler",
|
||||
push_api_key: "secret",
|
||||
ignore_namespaces: ["pgsql", "drbd"],
|
||||
env: config_env()
|
||||
|
|
|
@ -13,6 +13,7 @@ defmodule HAHandler do
|
|||
def haproxy_socket, do: Application.get_env(@otp_app, :haproxy_socket)
|
||||
def pgsql_instances, do: Application.get_env(@otp_app, :pgsql_instances, [])
|
||||
def drbd_instances, do: Application.get_env(@otp_app, :drbd_instances, [])
|
||||
def handler_instances, do: Application.get_env(@otp_app, :handler_instances, [])
|
||||
|
||||
def acme_challenge_path, do: Application.get_env(@otp_app, :acme_challenge_path)
|
||||
def static_path(), do: Application.app_dir(@otp_app, "priv/static/")
|
||||
|
|
|
@ -14,6 +14,7 @@ defmodule HAHandler.Application do
|
|||
scheme: :http, plug: HAHandler.Web.Router, options: [port: HAHandler.http_port()]},
|
||||
{HAHandler.PGSQL.Supervisor, HAHandler.pgsql_instances()},
|
||||
{HAHandler.DRBD.Supervisor, HAHandler.drbd_instances()},
|
||||
{HAHandler.Cluster, HAHandler.handler_instances()},
|
||||
{HAHandler.Control, []}
|
||||
]
|
||||
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
defmodule HAHandler.Cluster do
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
# How much do we wait (ms) between each check/decision-making round?
|
||||
@refresh 30_000
|
||||
|
||||
def start_link(opts) do
|
||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(instances) do
|
||||
if Node.alive?() do
|
||||
Logger.info("Distribution/clustering is ENABLED.")
|
||||
Logger.info("Current handler instance is: #{Node.self()}")
|
||||
Logger.info("Configured handler instances: #{inspect(instances)}")
|
||||
:net_kernel.monitor_nodes(true)
|
||||
|
||||
send(self(), :sync)
|
||||
else
|
||||
Logger.warning("Distribution is DISABLED - skipping clustering logic")
|
||||
end
|
||||
|
||||
{:ok, instances}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:sync, instances) do
|
||||
current_network = Node.list() ++ [Node.self()]
|
||||
|
||||
for node_name <- instances do
|
||||
# Nothing to do if the node is already in our network/cluster.
|
||||
if node_name not in current_network do
|
||||
case Node.connect(node_name) do
|
||||
true ->
|
||||
Logger.info("Connected to handler instance #{node_name}")
|
||||
_ ->
|
||||
Logger.warning("Could not connect to handler instance #{node_name}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Process.send_after(self(), :sync, @refresh)
|
||||
|
||||
{:noreply, instances}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:nodedown, node}, instances) do
|
||||
Logger.warning("Node #{node} went down.")
|
||||
|
||||
{:noreply, instances}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:nodeup, node}, instances) do
|
||||
Logger.info("Node #{node} came up.")
|
||||
|
||||
send(self(), :sync)
|
||||
|
||||
{:noreply, instances}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:get_details, _from, instances) do
|
||||
{uptime_ms, _} = :erlang.statistics(:wall_clock)
|
||||
|
||||
local_details = %{
|
||||
node: Node.self(),
|
||||
otp_app: HAHandler.otp_app,
|
||||
version: HAHandler.version,
|
||||
uptime: round(uptime_ms / 1_000 / 60),
|
||||
env: HAHandler.env
|
||||
}
|
||||
|
||||
{:reply, local_details, instances}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call(:get_instances, _from, instances) do
|
||||
{:reply, instances, instances}
|
||||
end
|
||||
|
||||
def get_instance_details() do
|
||||
known_instances = [Node.self()] ++ Node.list() ++ GenServer.call(__MODULE__, :get_instances)
|
||||
|
||||
known_instances
|
||||
|> Enum.uniq()
|
||||
|> Enum.map(fn node ->
|
||||
try do
|
||||
# FIXME: remote node coud return garbage/another structure!
|
||||
GenServer.call({__MODULE__, node}, :get_details)
|
||||
|> Map.put(:status, :up)
|
||||
catch
|
||||
:exit, _err ->
|
||||
%{
|
||||
node: node,
|
||||
otp_app: :unknown,
|
||||
version: :unknown,
|
||||
uptime: :unknown,
|
||||
env: :unknown,
|
||||
status: :down
|
||||
}
|
||||
end
|
||||
end)
|
||||
end
|
||||
end
|
|
@ -147,14 +147,24 @@ defmodule HAHandler.Control do
|
|||
Logger.info("Enabling routing SSHFS to (now) primary #{drbd_instance.haproxy_server}.")
|
||||
|
||||
HAProxy.set_server(
|
||||
@haproxy_pgsql_backend,
|
||||
@haproxy_drbd_backend,
|
||||
drbd_instance.haproxy_server,
|
||||
"state",
|
||||
"ready"
|
||||
)
|
||||
|
||||
unknown ->
|
||||
Logger.warning("Unhandled DRBD/HAProxy state: #{inspect(unknown)}")
|
||||
Logger.warning("Unknown DRBD/HAProxy state: #{inspect(unknown)}")
|
||||
Logger.info(
|
||||
"Disabling routing SSHFS to (likely) failed #{drbd_instance.haproxy_server}."
|
||||
)
|
||||
|
||||
HAProxy.set_server(
|
||||
@haproxy_drbd_backend,
|
||||
drbd_instance.haproxy_server,
|
||||
"state",
|
||||
"maint"
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -163,8 +173,13 @@ defmodule HAHandler.Control do
|
|||
def handle_info(:sync, state) do
|
||||
Logger.debug("Executing control logic.")
|
||||
|
||||
process_pgsql()
|
||||
process_drbd()
|
||||
case HAProxy.get_stats() do
|
||||
%{} ->
|
||||
process_pgsql()
|
||||
process_drbd()
|
||||
{:error, err} ->
|
||||
Logger.error("Unable to fetch HAProxy state (#{inspect(err)}) - skipping control loop.")
|
||||
end
|
||||
|
||||
# Schedule next round.
|
||||
Process.send_after(self(), :sync, @refresh)
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
defmodule HAHandler.DRBD do
|
||||
require Logger
|
||||
|
||||
@supervisor HAHandler.DRBD.Supervisor
|
||||
|
||||
# There might be >1 resources configured in DRBD!
|
||||
|
@ -18,6 +20,15 @@ defmodule HAHandler.DRBD do
|
|||
@id_extraction_regex ~r/\n\s(?<id>\d+)\:\s/
|
||||
@data_extraction_regex ~r/cs:(?<cs>(\w|\/)+)\sro:(?<ro>(\w|\/)+)\sds:(?<ds>(\w|\/)+)\s/
|
||||
|
||||
# Empty state, when backend is not queryable for some reason.
|
||||
@empty_state %{
|
||||
hostname: "unknown",
|
||||
version: "",
|
||||
mode: "",
|
||||
status: "unknown",
|
||||
data: ""
|
||||
}
|
||||
|
||||
def get_instances() do
|
||||
watchers = Supervisor.which_children(@supervisor)
|
||||
|
||||
|
@ -32,6 +43,8 @@ defmodule HAHandler.DRBD do
|
|||
end
|
||||
|
||||
def get_state({hostname, pid}) do
|
||||
empty_reply = %{@empty_state | hostname: hostname}
|
||||
|
||||
case GenServer.call(pid, {:execute, @drbd_proc_cmd}) do
|
||||
{:ok, raw, 0} ->
|
||||
case Regex.named_captures(@block_regex, raw) do
|
||||
|
@ -54,21 +67,25 @@ defmodule HAHandler.DRBD do
|
|||
|> Enum.filter(fn r -> r["id"] == @default_resource_id end)
|
||||
|> Enum.at(0)
|
||||
|
||||
%{
|
||||
hostname: hostname,
|
||||
processed_reply = %{
|
||||
version: Map.get(version, "full"),
|
||||
mode: Map.get(default_resource, "ro"),
|
||||
status: Map.get(default_resource, "ds"),
|
||||
data: resources
|
||||
}
|
||||
Map.merge(empty_reply, processed_reply)
|
||||
end
|
||||
_ ->
|
||||
{:error, "could not parse /proc/drbd"}
|
||||
Logger.warning("Failed to query DRBD backend: could not parse /proc/drbd.")
|
||||
|
||||
end
|
||||
{:ok, _, posix_err} ->
|
||||
{:error, posix_err}
|
||||
{:error, _err} = reply ->
|
||||
reply
|
||||
Logger.warning("Failed to query DRBD backend: POSIX #{inspect(posix_err)}.")
|
||||
empty_reply
|
||||
|
||||
{:error, err} ->
|
||||
Logger.warning("Failed to query DRBD backend: #{inspect(err)}.")
|
||||
empty_reply
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -17,7 +17,9 @@ defmodule HAHandler.DRBD.Supervisor do
|
|||
}
|
||||
end)
|
||||
|
||||
opts = [strategy: :one_for_one]
|
||||
opts = [
|
||||
strategy: :one_for_one
|
||||
]
|
||||
Supervisor.init(children, opts)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -25,27 +25,52 @@ defmodule HAHandler.DRBD.Watcher do
|
|||
|
||||
@impl true
|
||||
def init(opts) do
|
||||
hostname = Keyword.get(opts, :hostname)
|
||||
password = Keyword.get(opts, :password)
|
||||
|
||||
{:ok, pid} = connect(hostname, password)
|
||||
# Configures this worker's jobs to report in the "drbd" namespace
|
||||
Appsignal.Span.set_namespace(Appsignal.Tracer.root_span(), "drbd")
|
||||
|
||||
state = %{
|
||||
backend: pid,
|
||||
hostname: hostname,
|
||||
password: password
|
||||
backend: nil,
|
||||
last_reconnect: nil,
|
||||
hostname: Keyword.get(opts, :hostname),
|
||||
password: Keyword.get(opts, :password),
|
||||
}
|
||||
|
||||
# This action will be processed once the GenServer is fully
|
||||
# started/operational. This process handle connection failures by itself,
|
||||
# as we don't want to crash loop into supervisor logic (which is only there
|
||||
# to handle unexpected failure).
|
||||
send self(), :reconnect
|
||||
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:reconnect, state = %{hostname: hostname, password: password}) do
|
||||
case connect(hostname, password) do
|
||||
{:ok, pid} ->
|
||||
{:noreply, %{state | backend: pid}}
|
||||
{:error, _err} ->
|
||||
# Nothing to do, as the next request will trigger the reconnect logic
|
||||
# (see :execute call).
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:execute, cmd}, _from, %{backend: backend} = state) do
|
||||
case SSHEx.run(backend, cmd) do
|
||||
{:ok, _output, _status} = reply->
|
||||
{:reply, reply, state}
|
||||
{:error, :closed} = reply ->
|
||||
# Asynchroneously tries to reopen the connection to the backend.
|
||||
send self(), :reconnect
|
||||
|
||||
{:reply, reply, state}
|
||||
{:error, _err} = reply ->
|
||||
{:error, reply, state}
|
||||
# Do not take action on unknown error.
|
||||
{:reply, reply, state}
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -8,18 +8,27 @@ defmodule HAHandler.PGSQL.Watcher do
|
|||
|
||||
@impl true
|
||||
def init(opts) do
|
||||
# Configures this worker's jobs to report in the "pgsql" namespace
|
||||
Appsignal.Span.set_namespace(Appsignal.Tracer.root_span(), "pgsql")
|
||||
|
||||
# Starts a Postgrex child but does not means the connection was
|
||||
# successful.
|
||||
# TODO: set dbconnections backoff and connect hooks
|
||||
# See https://github.com/elixir-ecto/db_connection/blob/master/lib/db_connection.ex#L343
|
||||
{:ok, pid} = Postgrex.start_link(opts)
|
||||
case Postgrex.start_link(opts) do
|
||||
{:ok, pid} ->
|
||||
state = %{
|
||||
backend: pid,
|
||||
hostname: Keyword.get(opts, :hostname)
|
||||
}
|
||||
|
||||
state = %{
|
||||
backend: pid,
|
||||
hostname: Keyword.get(opts, :hostname)
|
||||
}
|
||||
|
||||
{:ok, state}
|
||||
{:ok, state}
|
||||
{:error, err} ->
|
||||
# Will be catched by the supervisor if anything happen. It should not
|
||||
# be triggered even if a PGSQL node down, since Postgrex has its own
|
||||
# surpervision tree.
|
||||
{:error, err}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
defmodule HAHandler.Web.Controller do
|
||||
import Plug.Conn
|
||||
|
||||
alias HAHandler.{HAProxy, PGSQL, DRBD}
|
||||
alias HAHandler.{HAProxy, PGSQL, DRBD, Cluster}
|
||||
|
||||
@template_dir "lib/ha_handler/web/templates"
|
||||
@index_template EEx.compile_file(Path.join(@template_dir, "index.html.eex"))
|
||||
|
@ -20,11 +20,13 @@ defmodule HAHandler.Web.Controller do
|
|||
haproxy_stats = HAProxy.get_stats(hide_error: true)
|
||||
pgsql_stats = PGSQL.get_stats()
|
||||
drbd_stats = DRBD.get_stats()
|
||||
handler_stats = Cluster.get_instance_details()
|
||||
|
||||
assigns = [
|
||||
haproxy_stats: haproxy_stats,
|
||||
pgsql_status: pgsql_stats,
|
||||
drbd_status: drbd_stats,
|
||||
handler_status: handler_stats,
|
||||
hostname: hostname,
|
||||
otp_app: HAHandler.otp_app(),
|
||||
version: HAHandler.version(),
|
||||
|
|
|
@ -25,7 +25,32 @@
|
|||
|
||||
<h2>Handler</h2>
|
||||
|
||||
<%= otp_app %> <b>v<%= version %></b> (<%= env %>) running on <b><%= hostname %></b>
|
||||
<p>
|
||||
<b>Local instance:</b> <%= otp_app %> <b>v<%= version %></b> (<%= env %>) running on <b><%= hostname %></b>
|
||||
</p>
|
||||
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Instance</th>
|
||||
<th>Version</th>
|
||||
<th>Env</th>
|
||||
<th>Status</th>
|
||||
<th>Uptime</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<%= for instance <- handler_status do %>
|
||||
<tr>
|
||||
<td><%= instance.node %></td>
|
||||
<td><%= instance.version %></td>
|
||||
<td><%= instance.env %></td>
|
||||
<td><%= instance.status %></td>
|
||||
<td><%= instance.uptime %>m</td>
|
||||
</tr>
|
||||
<% end %>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
<hr />
|
||||
|
||||
|
|
Loading…
Reference in New Issue