Add initial plumbing for DRBD
This is 'quickly-hacked-together' and needs some love - it's working, but is ways to fragile. It's no more than a POC atm.
This commit is contained in:
parent
2c64a54cb9
commit
b9aa3eeb98
10 changed files with 257 additions and 9 deletions
|
@ -12,6 +12,7 @@ defmodule HAHandler do
|
|||
def http_port, do: Application.get_env(@otp_app, :http_port)
|
||||
def haproxy_socket, do: Application.get_env(@otp_app, :haproxy_socket)
|
||||
def pgsql_instances, do: Application.get_env(@otp_app, :pgsql_instances, [])
|
||||
def drbd_instances, do: Application.get_env(@otp_app, :drbd_instances, [])
|
||||
|
||||
def acme_challenge_path, do: Application.get_env(@otp_app, :acme_challenge_path)
|
||||
def static_path(), do: Application.app_dir(@otp_app, "priv/static/")
|
||||
|
|
|
@ -13,6 +13,7 @@ defmodule HAHandler.Application do
|
|||
{Plug.Cowboy,
|
||||
scheme: :http, plug: HAHandler.Web.Router, options: [port: HAHandler.http_port()]},
|
||||
{HAHandler.PGSQL.Supervisor, HAHandler.pgsql_instances()},
|
||||
{HAHandler.DRBD.Supervisor, HAHandler.drbd_instances()},
|
||||
{HAHandler.Control, []}
|
||||
]
|
||||
|
||||
|
|
|
@ -2,15 +2,19 @@ defmodule HAHandler.Control do
|
|||
@moduledoc """
|
||||
This module handles the decision-logic and actions to be
|
||||
taken regarding the current state of the infrastructure.
|
||||
|
||||
FIXME: POC quickly hacked together, there's a lot of weak code duplicated
|
||||
around.
|
||||
"""
|
||||
|
||||
@haproxy_pgsql_backend "pgsql"
|
||||
@haproxy_drbd_backend "sshfs"
|
||||
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
alias HAHandler.{PGSQL, HAProxy}
|
||||
alias HAHandler.{PGSQL, HAProxy, DRBD}
|
||||
|
||||
# How much do we wait (ms) between each check/decision-making round?
|
||||
@refresh 15_000
|
||||
|
@ -30,10 +34,7 @@ defmodule HAHandler.Control do
|
|||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:sync, state) do
|
||||
Logger.debug("Executing control logic.")
|
||||
|
||||
defp process_pgsql() do
|
||||
# Fetch PGSQL state, make sure HAProxy routes to the master
|
||||
# process.
|
||||
pgsql_state =
|
||||
|
@ -95,6 +96,75 @@ defmodule HAHandler.Control do
|
|||
Logger.warning("Unhandled PGSQL/HAProxy state: #{inspect(unknown)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp process_drbd() do
|
||||
drbd_state =
|
||||
DRBD.get_instances()
|
||||
|> Enum.map(fn {hostname, pid} = instance ->
|
||||
haproxy_server =
|
||||
HAHandler.drbd_instances()
|
||||
|> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end)
|
||||
|> Enum.at(0)
|
||||
|> Keyword.get(:haproxy_server)
|
||||
|
||||
%{
|
||||
haproxy_server: haproxy_server,
|
||||
drbd_watcher_pid: pid,
|
||||
drbd_state: DRBD.get_state(instance)
|
||||
}
|
||||
end)
|
||||
|
||||
haproxy_state =
|
||||
HAProxy.get_stats()
|
||||
|> Map.get("Server", [])
|
||||
|> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_drbd_backend end)
|
||||
|> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end)
|
||||
|> Enum.reduce(&Map.merge/2)
|
||||
|
||||
for drbd_instance <- drbd_state do
|
||||
haproxy_state = Map.get(haproxy_state, drbd_instance.haproxy_server)
|
||||
|
||||
case {drbd_instance.drbd_state.mode, haproxy_state} do
|
||||
{"Secondary/Primary", "UP"} ->
|
||||
Logger.info(
|
||||
"Disabling routing SSHFS to (now) secondary #{drbd_instance.haproxy_server}."
|
||||
)
|
||||
|
||||
HAProxy.set_server(
|
||||
@haproxy_drbd_backend,
|
||||
drbd_instance.haproxy_server,
|
||||
"state",
|
||||
"maint"
|
||||
)
|
||||
{"Primary/Secondary", "UP"} ->
|
||||
:noop
|
||||
|
||||
{"Secondary/Primary", "MAINT"} ->
|
||||
:noop
|
||||
|
||||
{"Primary/Secondary", "MAINT"} ->
|
||||
Logger.info("Enabling routing SSHFS to (now) primary #{drbd_instance.haproxy_server}.")
|
||||
|
||||
HAProxy.set_server(
|
||||
@haproxy_pgsql_backend,
|
||||
drbd_instance.haproxy_server,
|
||||
"state",
|
||||
"ready"
|
||||
)
|
||||
|
||||
unknown ->
|
||||
Logger.warning("Unhandled DRBD/HAProxy state: #{inspect(unknown)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:sync, state) do
|
||||
Logger.debug("Executing control logic.")
|
||||
|
||||
process_pgsql()
|
||||
process_drbd()
|
||||
|
||||
# Schedule next round.
|
||||
Process.send_after(self(), :sync, @refresh)
|
||||
|
|
74
lib/ha_handler/drbd.ex
Normal file
74
lib/ha_handler/drbd.ex
Normal file
|
@ -0,0 +1,74 @@
|
|||
defmodule HAHandler.DRBD do
|
||||
@supervisor HAHandler.DRBD.Supervisor
|
||||
|
||||
# There might be >1 resources configured in DRBD!
|
||||
@default_resource_id "1"
|
||||
|
||||
# We don't support DRBD 9 for the time being, as /proc/drbd does not have a
|
||||
# stable API.
|
||||
@supported_drbd_major_version "8"
|
||||
|
||||
# Parsing of /proc/drbd, assuming DRBD 8. Splitting the regexes helps humans
|
||||
# wrapping their head around what's going on. And yes, it's fragile: we need
|
||||
# drbd 9 to get a JSON interface to `drbdadm status`.
|
||||
@drbd_proc_cmd "cat /proc/drbd"
|
||||
@block_regex ~r/(?<version_block>(.|\n)*)\n(?<resource_block>\n\s(.|\n)*)/
|
||||
@version_regex ~r/version: (?<full>(?<major>\d+)\.(?<intermediate>\d+)\.(?<minor>\d))/
|
||||
@resource_split_regex ~r{(\n\s(\d+)\:\s)}
|
||||
@id_extraction_regex ~r/\n\s(?<id>\d+)\:\s/
|
||||
@data_extraction_regex ~r/cs:(?<cs>(\w|\/)+)\sro:(?<ro>(\w|\/)+)\sds:(?<ds>(\w|\/)+)\s/
|
||||
|
||||
def get_instances() do
|
||||
watchers = Supervisor.which_children(@supervisor)
|
||||
|
||||
for {hostname, pid, _type, _modules} <- watchers do
|
||||
{hostname, pid}
|
||||
end
|
||||
end
|
||||
|
||||
def get_stats() do
|
||||
get_instances()
|
||||
|> Enum.map(fn instance -> get_state(instance) end)
|
||||
end
|
||||
|
||||
def get_state({hostname, pid}) do
|
||||
case GenServer.call(pid, {:execute, @drbd_proc_cmd}) do
|
||||
{:ok, raw, 0} ->
|
||||
case Regex.named_captures(@block_regex, raw) do
|
||||
%{"version_block" => version_block, "resource_block" => resource_block} ->
|
||||
version = Regex.named_captures(@version_regex, version_block)
|
||||
|
||||
if Map.get(version, "major") != @supported_drbd_major_version do
|
||||
{:error, "unsupported DRBD version #{inspect(version)}"}
|
||||
else
|
||||
resources = Regex.split( @resource_split_regex, resource_block,
|
||||
[include_captures: true, trim: true])
|
||||
|> Enum.chunk_every(2)
|
||||
|> Enum.map(fn [raw_id, raw_data] ->
|
||||
%{}
|
||||
|> Map.merge(Regex.named_captures(@id_extraction_regex, raw_id))
|
||||
|> Map.merge(Regex.named_captures(@data_extraction_regex, raw_data))
|
||||
end)
|
||||
|
||||
default_resource = resources
|
||||
|> Enum.filter(fn r -> r["id"] == @default_resource_id end)
|
||||
|> Enum.at(0)
|
||||
|
||||
%{
|
||||
hostname: hostname,
|
||||
version: Map.get(version, "full"),
|
||||
mode: Map.get(default_resource, "ro"),
|
||||
status: Map.get(default_resource, "ds"),
|
||||
data: resources
|
||||
}
|
||||
end
|
||||
_ ->
|
||||
{:error, "could not parse /proc/drbd"}
|
||||
end
|
||||
{:ok, _, posix_err} ->
|
||||
{:error, posix_err}
|
||||
{:error, _err} = reply ->
|
||||
reply
|
||||
end
|
||||
end
|
||||
end
|
23
lib/ha_handler/drbd/supervisor.ex
Normal file
23
lib/ha_handler/drbd/supervisor.ex
Normal file
|
@ -0,0 +1,23 @@
|
|||
defmodule HAHandler.DRBD.Supervisor do
|
||||
use Supervisor
|
||||
|
||||
alias HAHandler.DRBD.Watcher, as: DRBDWatcher
|
||||
|
||||
def start_link(opts) do
|
||||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(instances) do
|
||||
children =
|
||||
Enum.map(instances, fn conf ->
|
||||
%{
|
||||
id: Keyword.get(conf, :hostname),
|
||||
start: {DRBDWatcher, :start_link, [conf]}
|
||||
}
|
||||
end)
|
||||
|
||||
opts = [strategy: :one_for_one]
|
||||
Supervisor.init(children, opts)
|
||||
end
|
||||
end
|
51
lib/ha_handler/drbd/watcher.ex
Normal file
51
lib/ha_handler/drbd/watcher.ex
Normal file
|
@ -0,0 +1,51 @@
|
|||
defmodule HAHandler.DRBD.Watcher do
|
||||
# TODO: add support for SSH public keys authentication.
|
||||
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
def start_link(opts) do
|
||||
GenServer.start_link(__MODULE__, opts)
|
||||
end
|
||||
|
||||
defp connect(hostname, password) do
|
||||
case :inet.gethostbyname(to_charlist(hostname), :inet6) do
|
||||
{:ok, {:hostent, _name, _aliases, _addrtype, _length, [addr]}} ->
|
||||
SSHEx.connect(
|
||||
ip: addr,
|
||||
user: 'root',
|
||||
password: password,
|
||||
silently_accept_hosts: true
|
||||
)
|
||||
err ->
|
||||
err
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(opts) do
|
||||
hostname = Keyword.get(opts, :hostname)
|
||||
password = Keyword.get(opts, :password)
|
||||
|
||||
{:ok, pid} = connect(hostname, password)
|
||||
|
||||
state = %{
|
||||
backend: pid,
|
||||
hostname: hostname,
|
||||
password: password
|
||||
}
|
||||
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_call({:execute, cmd}, _from, %{backend: backend} = state) do
|
||||
case SSHEx.run(backend, cmd) do
|
||||
{:ok, _output, _status} = reply->
|
||||
{:reply, reply, state}
|
||||
{:error, _err} = reply ->
|
||||
{:error, reply, state}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,7 +1,7 @@
|
|||
defmodule HAHandler.Web.Controller do
|
||||
import Plug.Conn
|
||||
|
||||
alias HAHandler.{HAProxy, PGSQL}
|
||||
alias HAHandler.{HAProxy, PGSQL, DRBD}
|
||||
|
||||
@template_dir "lib/ha_handler/web/templates"
|
||||
@index_template EEx.compile_file(Path.join(@template_dir, "index.html.eex"))
|
||||
|
@ -19,10 +19,12 @@ defmodule HAHandler.Web.Controller do
|
|||
|
||||
haproxy_stats = HAProxy.get_stats(hide_error: true)
|
||||
pgsql_stats = PGSQL.get_stats()
|
||||
drbd_stats = DRBD.get_stats()
|
||||
|
||||
assigns = [
|
||||
haproxy_stats: haproxy_stats,
|
||||
pgsql_status: pgsql_stats,
|
||||
drbd_status: drbd_stats,
|
||||
hostname: hostname,
|
||||
otp_app: HAHandler.otp_app(),
|
||||
version: HAHandler.version(),
|
||||
|
|
|
@ -119,6 +119,31 @@
|
|||
<% end %>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
<hr />
|
||||
|
||||
<h2>DRBD</h2>
|
||||
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Hostname</th>
|
||||
<th>Version</th>
|
||||
<th>Status</th>
|
||||
<th>Operation</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
<%= for entry <- drbd_status do %>
|
||||
<tr>
|
||||
<td><%= entry[:hostname] %></td>
|
||||
<td><%= entry[:version] %></td>
|
||||
<td><%= entry[:status] %></td>
|
||||
<td><%= entry[:mode] %></td>
|
||||
</tr>
|
||||
<% end %>
|
||||
</tbody>
|
||||
</table>
|
||||
</main>
|
||||
</body>
|
||||
</html>
|
||||
|
|
5
mix.exs
5
mix.exs
|
@ -4,7 +4,7 @@ defmodule HAHandler.MixProject do
|
|||
def project do
|
||||
[
|
||||
app: :ha_handler,
|
||||
version: "0.2.1",
|
||||
version: "0.3.0",
|
||||
elixir: "~> 1.12",
|
||||
start_permanent: Mix.env() == :prod,
|
||||
deps: deps(),
|
||||
|
@ -28,7 +28,8 @@ defmodule HAHandler.MixProject do
|
|||
{:replug, "~> 0.1.0"},
|
||||
{:procket, "~> 0.9"},
|
||||
{:poison, "~> 5.0"},
|
||||
{:postgrex, "~> 0.16.1"}
|
||||
{:postgrex, "~> 0.16.1"},
|
||||
{:sshex, "~> 2.2.1"}
|
||||
]
|
||||
end
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
main {
|
||||
width: 500px;
|
||||
width: 700px;
|
||||
margin-top: 50px;
|
||||
margin-left: auto;
|
||||
margin-right: auto;
|
||||
|
|
Loading…
Reference in a new issue