diff --git a/lib/ha_handler.ex b/lib/ha_handler.ex index 056d944..345c045 100644 --- a/lib/ha_handler.ex +++ b/lib/ha_handler.ex @@ -12,6 +12,7 @@ defmodule HAHandler do def http_port, do: Application.get_env(@otp_app, :http_port) def haproxy_socket, do: Application.get_env(@otp_app, :haproxy_socket) def pgsql_instances, do: Application.get_env(@otp_app, :pgsql_instances, []) + def drbd_instances, do: Application.get_env(@otp_app, :drbd_instances, []) def acme_challenge_path, do: Application.get_env(@otp_app, :acme_challenge_path) def static_path(), do: Application.app_dir(@otp_app, "priv/static/") diff --git a/lib/ha_handler/application.ex b/lib/ha_handler/application.ex index 139126d..baea618 100644 --- a/lib/ha_handler/application.ex +++ b/lib/ha_handler/application.ex @@ -13,6 +13,7 @@ defmodule HAHandler.Application do {Plug.Cowboy, scheme: :http, plug: HAHandler.Web.Router, options: [port: HAHandler.http_port()]}, {HAHandler.PGSQL.Supervisor, HAHandler.pgsql_instances()}, + {HAHandler.DRBD.Supervisor, HAHandler.drbd_instances()}, {HAHandler.Control, []} ] diff --git a/lib/ha_handler/control.ex b/lib/ha_handler/control.ex index 20ec667..f72dbff 100644 --- a/lib/ha_handler/control.ex +++ b/lib/ha_handler/control.ex @@ -2,15 +2,19 @@ defmodule HAHandler.Control do @moduledoc """ This module handles the decision-logic and actions to be taken regarding the current state of the infrastructure. + + FIXME: POC quickly hacked together, there's a lot of weak code duplicated + around. """ @haproxy_pgsql_backend "pgsql" + @haproxy_drbd_backend "sshfs" use GenServer require Logger - alias HAHandler.{PGSQL, HAProxy} + alias HAHandler.{PGSQL, HAProxy, DRBD} # How much do we wait (ms) between each check/decision-making round? @refresh 15_000 @@ -30,10 +34,7 @@ defmodule HAHandler.Control do {:ok, state} end - @impl true - def handle_info(:sync, state) do - Logger.debug("Executing control logic.") - + defp process_pgsql() do # Fetch PGSQL state, make sure HAProxy routes to the master # process. pgsql_state = @@ -95,6 +96,75 @@ defmodule HAHandler.Control do Logger.warning("Unhandled PGSQL/HAProxy state: #{inspect(unknown)}") end end + end + + defp process_drbd() do + drbd_state = + DRBD.get_instances() + |> Enum.map(fn {hostname, pid} = instance -> + haproxy_server = + HAHandler.drbd_instances() + |> Enum.filter(fn opts -> Keyword.get(opts, :hostname) == hostname end) + |> Enum.at(0) + |> Keyword.get(:haproxy_server) + + %{ + haproxy_server: haproxy_server, + drbd_watcher_pid: pid, + drbd_state: DRBD.get_state(instance) + } + end) + + haproxy_state = + HAProxy.get_stats() + |> Map.get("Server", []) + |> Enum.filter(fn mapping -> mapping["pxname"] == @haproxy_drbd_backend end) + |> Enum.map(fn mapping -> %{mapping["svname"] => mapping["status"]} end) + |> Enum.reduce(&Map.merge/2) + + for drbd_instance <- drbd_state do + haproxy_state = Map.get(haproxy_state, drbd_instance.haproxy_server) + + case {drbd_instance.drbd_state.mode, haproxy_state} do + {"Secondary/Primary", "UP"} -> + Logger.info( + "Disabling routing SSHFS to (now) secondary #{drbd_instance.haproxy_server}." + ) + + HAProxy.set_server( + @haproxy_drbd_backend, + drbd_instance.haproxy_server, + "state", + "maint" + ) + {"Primary/Secondary", "UP"} -> + :noop + + {"Secondary/Primary", "MAINT"} -> + :noop + + {"Primary/Secondary", "MAINT"} -> + Logger.info("Enabling routing SSHFS to (now) primary #{drbd_instance.haproxy_server}.") + + HAProxy.set_server( + @haproxy_pgsql_backend, + drbd_instance.haproxy_server, + "state", + "ready" + ) + + unknown -> + Logger.warning("Unhandled DRBD/HAProxy state: #{inspect(unknown)}") + end + end + end + + @impl true + def handle_info(:sync, state) do + Logger.debug("Executing control logic.") + + process_pgsql() + process_drbd() # Schedule next round. Process.send_after(self(), :sync, @refresh) diff --git a/lib/ha_handler/drbd.ex b/lib/ha_handler/drbd.ex new file mode 100644 index 0000000..c5b1e76 --- /dev/null +++ b/lib/ha_handler/drbd.ex @@ -0,0 +1,74 @@ +defmodule HAHandler.DRBD do + @supervisor HAHandler.DRBD.Supervisor + + # There might be >1 resources configured in DRBD! + @default_resource_id "1" + + # We don't support DRBD 9 for the time being, as /proc/drbd does not have a + # stable API. + @supported_drbd_major_version "8" + + # Parsing of /proc/drbd, assuming DRBD 8. Splitting the regexes helps humans + # wrapping their head around what's going on. And yes, it's fragile: we need + # drbd 9 to get a JSON interface to `drbdadm status`. + @drbd_proc_cmd "cat /proc/drbd" + @block_regex ~r/(?(.|\n)*)\n(?\n\s(.|\n)*)/ + @version_regex ~r/version: (?(?\d+)\.(?\d+)\.(?\d))/ + @resource_split_regex ~r{(\n\s(\d+)\:\s)} + @id_extraction_regex ~r/\n\s(?\d+)\:\s/ + @data_extraction_regex ~r/cs:(?(\w|\/)+)\sro:(?(\w|\/)+)\sds:(?(\w|\/)+)\s/ + + def get_instances() do + watchers = Supervisor.which_children(@supervisor) + + for {hostname, pid, _type, _modules} <- watchers do + {hostname, pid} + end + end + + def get_stats() do + get_instances() + |> Enum.map(fn instance -> get_state(instance) end) + end + + def get_state({hostname, pid}) do + case GenServer.call(pid, {:execute, @drbd_proc_cmd}) do + {:ok, raw, 0} -> + case Regex.named_captures(@block_regex, raw) do + %{"version_block" => version_block, "resource_block" => resource_block} -> + version = Regex.named_captures(@version_regex, version_block) + + if Map.get(version, "major") != @supported_drbd_major_version do + {:error, "unsupported DRBD version #{inspect(version)}"} + else + resources = Regex.split( @resource_split_regex, resource_block, + [include_captures: true, trim: true]) + |> Enum.chunk_every(2) + |> Enum.map(fn [raw_id, raw_data] -> + %{} + |> Map.merge(Regex.named_captures(@id_extraction_regex, raw_id)) + |> Map.merge(Regex.named_captures(@data_extraction_regex, raw_data)) + end) + + default_resource = resources + |> Enum.filter(fn r -> r["id"] == @default_resource_id end) + |> Enum.at(0) + + %{ + hostname: hostname, + version: Map.get(version, "full"), + mode: Map.get(default_resource, "ro"), + status: Map.get(default_resource, "ds"), + data: resources + } + end + _ -> + {:error, "could not parse /proc/drbd"} + end + {:ok, _, posix_err} -> + {:error, posix_err} + {:error, _err} = reply -> + reply + end + end +end diff --git a/lib/ha_handler/drbd/supervisor.ex b/lib/ha_handler/drbd/supervisor.ex new file mode 100644 index 0000000..289488f --- /dev/null +++ b/lib/ha_handler/drbd/supervisor.ex @@ -0,0 +1,23 @@ +defmodule HAHandler.DRBD.Supervisor do + use Supervisor + + alias HAHandler.DRBD.Watcher, as: DRBDWatcher + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(instances) do + children = + Enum.map(instances, fn conf -> + %{ + id: Keyword.get(conf, :hostname), + start: {DRBDWatcher, :start_link, [conf]} + } + end) + + opts = [strategy: :one_for_one] + Supervisor.init(children, opts) + end +end diff --git a/lib/ha_handler/drbd/watcher.ex b/lib/ha_handler/drbd/watcher.ex new file mode 100644 index 0000000..85f3361 --- /dev/null +++ b/lib/ha_handler/drbd/watcher.ex @@ -0,0 +1,51 @@ +defmodule HAHandler.DRBD.Watcher do + # TODO: add support for SSH public keys authentication. + + use GenServer + + require Logger + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + defp connect(hostname, password) do + case :inet.gethostbyname(to_charlist(hostname), :inet6) do + {:ok, {:hostent, _name, _aliases, _addrtype, _length, [addr]}} -> + SSHEx.connect( + ip: addr, + user: 'root', + password: password, + silently_accept_hosts: true + ) + err -> + err + end + end + + @impl true + def init(opts) do + hostname = Keyword.get(opts, :hostname) + password = Keyword.get(opts, :password) + + {:ok, pid} = connect(hostname, password) + + state = %{ + backend: pid, + hostname: hostname, + password: password + } + + {:ok, state} + end + + @impl true + def handle_call({:execute, cmd}, _from, %{backend: backend} = state) do + case SSHEx.run(backend, cmd) do + {:ok, _output, _status} = reply-> + {:reply, reply, state} + {:error, _err} = reply -> + {:error, reply, state} + end + end +end diff --git a/lib/ha_handler/web/controller.ex b/lib/ha_handler/web/controller.ex index 48756e8..babf10a 100644 --- a/lib/ha_handler/web/controller.ex +++ b/lib/ha_handler/web/controller.ex @@ -1,7 +1,7 @@ defmodule HAHandler.Web.Controller do import Plug.Conn - alias HAHandler.{HAProxy, PGSQL} + alias HAHandler.{HAProxy, PGSQL, DRBD} @template_dir "lib/ha_handler/web/templates" @index_template EEx.compile_file(Path.join(@template_dir, "index.html.eex")) @@ -19,10 +19,12 @@ defmodule HAHandler.Web.Controller do haproxy_stats = HAProxy.get_stats(hide_error: true) pgsql_stats = PGSQL.get_stats() + drbd_stats = DRBD.get_stats() assigns = [ haproxy_stats: haproxy_stats, pgsql_status: pgsql_stats, + drbd_status: drbd_stats, hostname: hostname, otp_app: HAHandler.otp_app(), version: HAHandler.version(), diff --git a/lib/ha_handler/web/templates/index.html.eex b/lib/ha_handler/web/templates/index.html.eex index 28fcb32..047dce4 100644 --- a/lib/ha_handler/web/templates/index.html.eex +++ b/lib/ha_handler/web/templates/index.html.eex @@ -119,6 +119,31 @@ <% end %> + +
+ +

DRBD

+ + + + + + + + + + + + <%= for entry <- drbd_status do %> + + + + + + + <% end %> + +
HostnameVersionStatusOperation
<%= entry[:hostname] %><%= entry[:version] %><%= entry[:status] %><%= entry[:mode] %>
diff --git a/mix.exs b/mix.exs index 656c772..880e01c 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule HAHandler.MixProject do def project do [ app: :ha_handler, - version: "0.2.1", + version: "0.3.0", elixir: "~> 1.12", start_permanent: Mix.env() == :prod, deps: deps(), @@ -28,7 +28,8 @@ defmodule HAHandler.MixProject do {:replug, "~> 0.1.0"}, {:procket, "~> 0.9"}, {:poison, "~> 5.0"}, - {:postgrex, "~> 0.16.1"} + {:postgrex, "~> 0.16.1"}, + {:sshex, "~> 2.2.1"} ] end diff --git a/priv/static/app.css b/priv/static/app.css index 7450ecd..37334a0 100644 --- a/priv/static/app.css +++ b/priv/static/app.css @@ -1,5 +1,5 @@ main { - width: 500px; + width: 700px; margin-top: 50px; margin-left: auto; margin-right: auto;