diff --git a/apps/examples/test/examples/log_trade/advisor_test.exs b/apps/examples/test/examples/log_trade/advisor_test.exs new file mode 100644 index 000000000..6a227956e --- /dev/null +++ b/apps/examples/test/examples/log_trade/advisor_test.exs @@ -0,0 +1,28 @@ +defmodule Examples.LogTrade.AdvisorTest do + use Tai.TestSupport.E2ECase, async: false + + @scenario :log_trade + @venue :test_exchange_a + @product :btc_usd + + def before_start_app, do: seed_mock_responses(@scenario) + + def after_start_app, do: seed_venues(@scenario) + + def after_boot_app do + start_venue(@venue) + configure_fleet(@scenario) + start_advisors(where: [fleet_id: @scenario]) + end + + test "logs the bid/ask spread via a custom event" do + push_stream_market_data({@scenario, :snapshot, @venue, @product}) + + assert_receive {TaiEvents.Event, %Examples.LogSpread.Events.Spread{} = event, _} + assert event.venue_id == @venue + assert event.product_symbol == @product + assert event.bid_price == Decimal.new("6500.1") + assert event.ask_price == Decimal.new("6500.11") + assert event.spread == Decimal.new("0.01") + end +end diff --git a/apps/tai/lib/tai/system_bus.ex b/apps/tai/lib/tai/system_bus.ex index e0e215178..41d42ba5e 100644 --- a/apps/tai/lib/tai/system_bus.ex +++ b/apps/tai/lib/tai/system_bus.ex @@ -1,5 +1,16 @@ defmodule Tai.SystemBus do + @moduledoc """ + A core pubsub bus for async message communication. The following subsystems depend on this module. + + - boot + - metrics + + NOTE: This module should not be used as a public API as it is subject to change + """ + @type partitions :: pos_integer + @type topic :: atom | tuple + @type topics :: [topic] @spec child_spec(opts :: term) :: Supervisor.child_spec() def child_spec(opts) do @@ -21,6 +32,9 @@ defmodule Tai.SystemBus do ) end + @spec subscribe(topic | topics) :: :ok + def subscribe(topics) + def subscribe([]), do: :ok def subscribe([topic | tail]) do @@ -34,6 +48,9 @@ defmodule Tai.SystemBus do |> subscribe end + @spec unsubscribe(topic | topics) :: :ok + def unsubscribe(topics) + def unsubscribe([]), do: :ok def unsubscribe([topic | tail]) do @@ -47,6 +64,7 @@ defmodule Tai.SystemBus do |> unsubscribe end + @spec broadcast(topic, term) :: :ok def broadcast(topic, message) do Registry.dispatch(__MODULE__, topic, fn entries -> for {pid, _} <- entries, do: send(pid, message)