Skip to content

Commit

Permalink
Support Unix domain socket
Browse files Browse the repository at this point in the history
  • Loading branch information
qcam committed Oct 25, 2017
1 parent 2cb0943 commit 260b256
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 12 deletions.
26 changes: 19 additions & 7 deletions lib/fluxter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,14 @@ defmodule Fluxter do
def start_link(options \\ []) do
import Supervisor.Spec

{host, port, prefix} = Fluxter.load_config(__MODULE__, options)
conn = Fluxter.Conn.new(host, port)
conn = %{conn | header: [conn.header | prefix]}
conn =
case Fluxter.load_config(__MODULE__, options) do
{:inet, {host, port, prefix}} ->
conn = Fluxter.Conn.new(host, port)
conn = %{conn | header: [conn.header | prefix]}
{:local, socket_path} ->
Fluxter.Conn.new(:local, socket_path)
end

Enum.map(@worker_names, &worker(Fluxter.Conn, [conn, &1], id: &1))
|> Supervisor.start_link(strategy: :one_for_one)
Expand Down Expand Up @@ -362,11 +367,18 @@ defmodule Fluxter do
Application.get_all_env(:fluxter)
|> Keyword.pop(module, [])

host = options[:host] || loc_env[:host] || glob_env[:host]
port = options[:port] || loc_env[:port] || glob_env[:port]
prefix = build_prefix(glob_env[:prefix], loc_env[:prefix], options[:prefix])
local? = options[:local] || loc_env[:local] || glob_env[:local]

{host, port, prefix}
if local? do
socket_path = options[:socket_path] || loc_env[:socket_path] || glob_env[:socket_path]
{:local, socket_path}
else
host = options[:host] || loc_env[:host] || glob_env[:host]
port = options[:port] || loc_env[:port] || glob_env[:port]
prefix = build_prefix(glob_env[:prefix], loc_env[:prefix], options[:prefix])

{:inet, {host, port, prefix}}
end
end

defp build_prefix(part1, part2, part3) do
Expand Down
18 changes: 14 additions & 4 deletions lib/fluxter/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ defmodule Fluxter.Conn do

require Logger

defstruct [:sock, :header]
defstruct [:sock, :header, :type]

def new(host, port) when is_binary(host) do
new(string_to_charlist(host), port)
end

def new(host, port) when is_list(host) or is_tuple(host) do
{:ok, addr} = :inet.getaddr(host, :inet)
header = Packet.header(addr, port)
%__MODULE__{header: header}
header = Packet.header(:inet, addr, port)
%__MODULE__{header: header, type: :inet}
end

def new(:local, socket_path) do
header = Packet.header(:local, socket_path)
%__MODULE__{header: header, type: :local}
end

def start_link(%__MODULE__{} = conn, worker) do
Expand All @@ -33,11 +38,16 @@ defmodule Fluxter.Conn do
end
end

def init(conn) do
def init(%__MODULE__{type: :inet} = conn) do
{:ok, sock} = :gen_udp.open(0, [active: false])
{:ok, %{conn | sock: sock}}
end

def init(%__MODULE__{type: :local} = conn) do
{:ok, sock} = :gen_udp.open(0, [:local, active: false])
{:ok, %{conn | sock: sock}}
end

def handle_cast({:write, name, tags, fields}, conn) do
packet = Packet.build(conn.header, name, tags, fields, nil)
send(conn.sock, {self(), {:command, packet}})
Expand Down
11 changes: 10 additions & 1 deletion lib/fluxter/packet.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ defmodule Fluxter.Packet do
otp_release = :erlang.system_info(:otp_release)
@addr_family if(otp_release >= '19', do: [1], else: [])

def header({n1, n2, n3, n4}, port) do
@inet_local [5]

def header(:inet, {n1, n2, n3, n4}, port) do
@addr_family ++ [
band(bsr(port, 8), 0xFF),
band(port, 0xFF),
Expand All @@ -17,6 +19,13 @@ defmodule Fluxter.Packet do
]
end

def header(:local, socket_path) do
@inet_local ++ [
byte_size(socket_path),
socket_path,
]
end

def build(header, name, tags, fields, nil) do
tags = encode_tags(tags)
fields = encode_fields(fields)
Expand Down

0 comments on commit 260b256

Please sign in to comment.