diff --git a/lib/fluxter.ex b/lib/fluxter.ex index af785c3..44a9949 100644 --- a/lib/fluxter.ex +++ b/lib/fluxter.ex @@ -305,9 +305,14 @@ defmodule Fluxter do def start_link(options \\ []) do import Supervisor.Spec - {host, port, prefix} = Fluxter.load_config(__MODULE__, options) - conn = Fluxter.Conn.new(host, port) - conn = %{conn | header: [conn.header | prefix]} + conn = + case Fluxter.load_config(__MODULE__, options) do + {:inet, {host, port, prefix}} -> + conn = Fluxter.Conn.new(host, port) + conn = %{conn | header: [conn.header | prefix]} + {:local, socket_path} -> + Fluxter.Conn.new(:local, socket_path) + end Enum.map(@worker_names, &worker(Fluxter.Conn, [conn, &1], id: &1)) |> Supervisor.start_link(strategy: :one_for_one) @@ -362,11 +367,18 @@ defmodule Fluxter do Application.get_all_env(:fluxter) |> Keyword.pop(module, []) - host = options[:host] || loc_env[:host] || glob_env[:host] - port = options[:port] || loc_env[:port] || glob_env[:port] - prefix = build_prefix(glob_env[:prefix], loc_env[:prefix], options[:prefix]) + local? = options[:local] || loc_env[:local] || glob_env[:local] - {host, port, prefix} + if local? do + socket_path = options[:socket_path] || loc_env[:socket_path] || glob_env[:socket_path] + {:local, socket_path} + else + host = options[:host] || loc_env[:host] || glob_env[:host] + port = options[:port] || loc_env[:port] || glob_env[:port] + prefix = build_prefix(glob_env[:prefix], loc_env[:prefix], options[:prefix]) + + {:inet, {host, port, prefix}} + end end defp build_prefix(part1, part2, part3) do diff --git a/lib/fluxter/conn.ex b/lib/fluxter/conn.ex index 3e5ee7f..336a67f 100644 --- a/lib/fluxter/conn.ex +++ b/lib/fluxter/conn.ex @@ -7,7 +7,7 @@ defmodule Fluxter.Conn do require Logger - defstruct [:sock, :header] + defstruct [:sock, :header, :type] def new(host, port) when is_binary(host) do new(string_to_charlist(host), port) @@ -15,8 +15,13 @@ defmodule Fluxter.Conn do def new(host, port) when is_list(host) or is_tuple(host) do {:ok, addr} = :inet.getaddr(host, :inet) - header = Packet.header(addr, port) - %__MODULE__{header: header} + header = Packet.header(:inet, addr, port) + %__MODULE__{header: header, type: :inet} + end + + def new(:local, socket_path) do + header = Packet.header(:local, socket_path) + %__MODULE__{header: header, type: :local} end def start_link(%__MODULE__{} = conn, worker) do @@ -33,11 +38,16 @@ defmodule Fluxter.Conn do end end - def init(conn) do + def init(%__MODULE__{type: :inet} = conn) do {:ok, sock} = :gen_udp.open(0, [active: false]) {:ok, %{conn | sock: sock}} end + def init(%__MODULE__{type: :local} = conn) do + {:ok, sock} = :gen_udp.open(0, [:local, active: false]) + {:ok, %{conn | sock: sock}} + end + def handle_cast({:write, name, tags, fields}, conn) do packet = Packet.build(conn.header, name, tags, fields, nil) send(conn.sock, {self(), {:command, packet}}) diff --git a/lib/fluxter/packet.ex b/lib/fluxter/packet.ex index b746040..240ec03 100644 --- a/lib/fluxter/packet.ex +++ b/lib/fluxter/packet.ex @@ -6,7 +6,9 @@ defmodule Fluxter.Packet do otp_release = :erlang.system_info(:otp_release) @addr_family if(otp_release >= '19', do: [1], else: []) - def header({n1, n2, n3, n4}, port) do + @inet_local [5] + + def header(:inet, {n1, n2, n3, n4}, port) do @addr_family ++ [ band(bsr(port, 8), 0xFF), band(port, 0xFF), @@ -17,6 +19,13 @@ defmodule Fluxter.Packet do ] end + def header(:local, socket_path) do + @inet_local ++ [ + byte_size(socket_path), + socket_path, + ] + end + def build(header, name, tags, fields, nil) do tags = encode_tags(tags) fields = encode_fields(fields)