diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml new file mode 100644 index 0000000..9f89fb9 --- /dev/null +++ b/.github/workflows/benchmark.yml @@ -0,0 +1,46 @@ +name: Benchmark + +on: + push: + branches: + - main + pull_request: + workflow_dispatch: + +jobs: + benchmark: + if: ${{ !contains(github.event.head_commit.message, '[ci skip]') }} + runs-on: ubuntu-latest + env: + BUNDLE_JOBS: 4 + BUNDLE_RETRY: 3 + CI: true + RAILS_VERSION: ${{ matrix.rails_version }} + DATABASE_URL: postgresql://postgres:postgres@localhost:5432 + services: + redis: + image: redis:7.0-alpine + ports: ["6379:6379"] + options: --health-cmd="redis-cli ping" --health-interval 1s --health-timeout 3s --health-retries 30 + strategy: + fail-fast: false + matrix: + rails_version: ["~> 7.0", "~> 8.0.0"] + adapter: ["redis", "async"] + features: [""] + include: + - rails_version: "~> 8.0.0" + adapter: "redis" + features: "FASTLANE_BROADCASTS=1" + steps: + - uses: actions/checkout@v4 + - uses: ruby/setup-ruby@v1 + with: + ruby-version: 3.3 + bundler-cache: true + - name: Run benchmarks + env: + ACTION_CABLE_ADAPTER: ${{ matrix.adapter }} + MODE: smoke + run: | + ${{ matrix.features }} bundle exec ruby --yjit benchmarks/broadcasting.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 74360cc..a88dd50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## main +- Add `config.fastlane_broadcasts_enabled` to opt-in for optimized broadcasts (no double JSON encoding). + ## 0.1.2 - Added a hack to prevent third-party extensions from changing the methods visibility. diff --git a/Gemfile b/Gemfile index 29fbd7c..9319bb1 100644 --- a/Gemfile +++ b/Gemfile @@ -17,6 +17,10 @@ gem "cgi", ">= 0.3.6", require: false # Workaround until all supported Ruby versions ship with uri version 0.13.1 or higher. gem "uri", ">= 0.13.1", require: false +# For benchmarks and profiling +gem "benchmark-ips", require: false +gem "vernier", require: false + group :rubocop do gem "rubocop", ">= 1.25.1", require: false gem "rubocop-minitest", require: false diff --git a/benchmarks/broadcasting.rb b/benchmarks/broadcasting.rb new file mode 100644 index 0000000..35544f3 --- /dev/null +++ b/benchmarks/broadcasting.rb @@ -0,0 +1,203 @@ +# frozen_string_literal: true + +require "bundler/setup" + +require "benchmark/ips" +require "action_cable" + +ActionCable.server.config.cable = { "adapter" => ENV.fetch("ACTION_CABLE_ADAPTER", "async") } +ActionCable.server.config.logger = ENV["LOG"] == "1" ? Logger.new(STDOUT) : Logger.new(nil) +ActionCable.server.config.fastlane_broadcasts_enabled = true if %w[1 t true].include?(ENV["FASTLANE_BROADCASTS"]) +ActionCable.server.config.worker_pool_size = ENV.fetch("WORKER_POOL_SIZE", 4).to_i + +# Number of clients +N = ENV.fetch("N", 10).to_i + +# Number of messages to broadcast per run +B = ENV.fetch("M", N * 10).to_i + +# Benchmark mode +mode = ENV.fetch("MODE", "benchmark") + +$stdout.puts "Running #{mode} with N=#{N}, M=#{B}, adapter=#{ActionCable.server.config.cable["adapter"]}, fastlane_broadcasts_enabled=#{ActionCable.server.config.fastlane_broadcasts_enabled}, worker_pool_size=#{ActionCable.server.config.worker_pool_size}" + +module ApplicationCable + class Connection < ActionCable::Connection::Base + end + + class Channel < ActionCable::Channel::Base + end +end + +class TestChannel < ApplicationCable::Channel + def subscribed + stream_from "all" + end +end + +class TestSocket + attr_reader :server + delegate :pubsub, :config, :executor, :logger, :worker_pool, to: :server + + def initialize(server, sync_queue: nil) + @coder = ActiveSupport::JSON + @sync_queue = sync_queue + @server = server + @transmissions = [] + end + + def perform_work(receiver, method, *args) + worker_pool.async_invoke(receiver, method, *args, connection: self) + end + + def transmit(cable_message) + encode(cable_message).then do + raw_transmit(_1) + end + end + + def raw_transmit(msg) + @transmissions << msg + @sync_queue&.push(msg) + end + + def close(...) + end + + def encode(cable_message) + @coder.encode cable_message + end +end + +sync_queue = Queue.new +clients = N.times.map do + ApplicationCable::Connection.new(ActionCable.server, TestSocket.new(ActionCable.server, sync_queue:)) +end + +# Subscribe all clients to the same channel +subscribe_cmd = { "command" => "subscribe", "identifier" => { "channel" => "TestChannel" }.to_json } +clients.each { _1.handle_channel_command(subscribe_cmd) } + +received = 0 +clients.size.times do + raise "Expected to receive #{clients.size} confirmations, got only #{received}" if sync_queue.pop(timeout: 5.0).nil? + received += 1 +end + +SMALL_JSON = { "message" => "hello" }.freeze + +require "active_support/time_with_zone" +LARGE_JSON = ActiveSupport::TimeZone.all.take(5).to_json.then { JSON.parse(_1) }.freeze + +TURBO_STREAM = <<~HTML + + + +HTML + +HOTWIRE = { "html" => TURBO_STREAM }.freeze + +SCENARIOS = { + "small json" => SMALL_JSON, + "large json" => LARGE_JSON, + "turbo stream" => HOTWIRE +} + +if mode == "benchmark" + $stdout.puts "Running benchmarks..." + + Benchmark.ips do |x| + x.config(warmup: 5, time: 10) + + SCENARIOS.each do |name, payload| + x.report("#{name} (#{ActiveSupport::NumberHelper.number_to_human_size(payload.to_json.size)})") do + Thread.new do + B.times do + ActionCable.server.broadcast("all", payload) + end + end + + (B * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) } + end + end + end +elsif mode == "profile" + $stdout.puts "Profiling..." + + require "vernier" + + scenario = ENV.fetch("SCENARIO", "small json") + payload = SCENARIOS.fetch(scenario) + path = File.join(__dir__, "../tmp/broadcasting_#{[scenario, N, B, ActionCable.server.config.fastlane_broadcasts_enabled ? "fastlane" : nil].compact.join("-").parameterize}.json") + + # warmup + th = Thread.new do + 10.times do + ActionCable.server.broadcast("all", payload) + end + end + + (10 * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) } + + Vernier.trace(out: path) do + th = Thread.new do + B.times do + ActionCable.server.broadcast("all", payload) + end + end + + (B * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) } + th.join + end + + $stdout.puts "Profiling done: #{path}" +elsif mode == "smoke" + ActionCable.server.broadcast("all", { message: "hello" }) + N.times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) } + puts "All good" +else + raise "Unknown mode: #{mode}" +end diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 0183ecc..0efc7bf 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -23,6 +23,7 @@ # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. #++ +require "logger" require "active_support" require "active_support/rails" require "zeitwerk" diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb index 8a15559..62d679b 100644 --- a/lib/action_cable/channel/streams.rb +++ b/lib/action_cable/channel/streams.rb @@ -157,10 +157,21 @@ def streams # Always wrap the outermost handler to invoke the user handler on the worker # pool rather than blocking the event loop. def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + if !user_handler && !coder && connection.config.fastlane_broadcasts_enabled + return opt_worker_pool_stream_handler(broadcasting) + end + handler = stream_handler(broadcasting, user_handler, coder: coder) -> message do - connection.perform_work handler, :call, message + connection.perform_work handler, :call, message.data + end + end + + # Optimized stream handler that avoids double JSON encoding/decoding on broadcast + def opt_worker_pool_stream_handler(broadcasting) + -> (message) do + connection.raw_transmit message.encoded_for(@identifier) end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 598ee44..b0bef21 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -117,6 +117,10 @@ def transmit(data) # :nodoc: socket.transmit(data) end + def raw_transmit(data) # :nodoc: + socket.raw_transmit(data) + end + # Close the connection. def close(reason: nil, reconnect: true) transmit( diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index 2c0c2b4..1f293e8 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -18,7 +18,7 @@ def internal_channel def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message ActiveSupport::JSON.decode(message) } + callback = -> (message) { process_internal_message ActiveSupport::JSON.decode(message.data) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] diff --git a/lib/action_cable/server/configuration.rb b/lib/action_cable/server/configuration.rb index b5bf431..7e4cbe2 100644 --- a/lib/action_cable/server/configuration.rb +++ b/lib/action_cable/server/configuration.rb @@ -19,6 +19,7 @@ class Configuration attr_accessor :precompile_assets attr_accessor :health_check_path, :health_check_application attr_writer :pubsub_adapter + attr_accessor :fastlane_broadcasts_enabled def initialize @log_tags = [] @@ -31,6 +32,8 @@ def initialize @allow_same_origin_as_host = true @filter_parameters = [] + @fastlane_broadcasts_enabled = false + @health_check_application = ->(env) { [200, { Rack::CONTENT_TYPE => "text/html", "date" => Time.now.httpdate }, []] } diff --git a/lib/action_cable/server/socket.rb b/lib/action_cable/server/socket.rb index 5dcbf66..af7489f 100644 --- a/lib/action_cable/server/socket.rb +++ b/lib/action_cable/server/socket.rb @@ -47,6 +47,12 @@ def transmit(cable_message) websocket.transmit encode(cable_message) end + def raw_transmit(message) + return unless websocket.alive? + + websocket.transmit message + end + # Close the WebSocket connection. def close(...) websocket.close(...) if websocket.alive? diff --git a/lib/action_cable/subscription_adapter/subscriber_map.rb b/lib/action_cable/subscription_adapter/subscriber_map.rb index 612b3fa..5b62468 100644 --- a/lib/action_cable/subscription_adapter/subscriber_map.rb +++ b/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -5,6 +5,19 @@ module ActionCable module SubscriptionAdapter class SubscriberMap + class Message < Struct.new(:data) + def initialize(...) + super + @cache = Concurrent::Map.new + end + + def encoded_for(identifier) + @cache.compute_if_absent(identifier) do + ActiveSupport::JSON.encode({ identifier: identifier, message: ActiveSupport::JSON.decode(data) }) + end + end + end + def initialize @subscribers = Hash.new { |h, k| h[k] = [] } @sync = Mutex.new @@ -41,8 +54,10 @@ def broadcast(channel, message) @subscribers[channel].dup end + msg = Message.new(message) + list.each do |subscriber| - invoke_callback(subscriber, message) + invoke_callback(subscriber, msg) end end diff --git a/test/client_test.rb b/test/client_test.rb index 8ae0258..28e8214 100644 --- a/test/client_test.rb +++ b/test/client_test.rb @@ -91,6 +91,9 @@ def setup # and now the "real" setup for our test: server.config.disable_request_forgery_protection = true + + # enable fastlane broadcasts (if env variable provided) + server.config.fastlane_broadcasts_enabled = true if %w[1 t true].include?(ENV["ACTION_CABLE_FASTLANE_BROADCASTS"]) end def with_puma_server(rack_app = ActionCable.server, port = 3099) diff --git a/test/subscription_adapter/common.rb b/test/subscription_adapter/common.rb index e7008b8..2f8500a 100644 --- a/test/subscription_adapter/common.rb +++ b/test/subscription_adapter/common.rb @@ -27,7 +27,7 @@ def teardown def subscribe_as_queue(channel, adapter = @rx_adapter) queue = Queue.new - callback = -> data { queue << data } + callback = -> msg { queue << msg.data } subscribed = Concurrent::Event.new adapter.subscribe(channel, callback, Proc.new { subscribed.set }) subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)