Skip to content

Commit

Permalink
+ fastlane broadcasts
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jan 8, 2025
1 parent c118761 commit 10e1a80
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 4 deletions.
46 changes: 46 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Benchmark

on:
push:
branches:
- main
pull_request:
workflow_dispatch:

jobs:
benchmark:
if: ${{ !contains(github.event.head_commit.message, '[ci skip]') }}
runs-on: ubuntu-latest
env:
BUNDLE_JOBS: 4
BUNDLE_RETRY: 3
CI: true
RAILS_VERSION: ${{ matrix.rails_version }}
DATABASE_URL: postgresql://postgres:postgres@localhost:5432
services:
redis:
image: redis:7.0-alpine
ports: ["6379:6379"]
options: --health-cmd="redis-cli ping" --health-interval 1s --health-timeout 3s --health-retries 30
strategy:
fail-fast: false
matrix:
rails_version: ["~> 7.0", "~> 8.0.0"]
adapter: ["redis", "async"]
features: [""]
include:
- rails_version: "~> 8.0.0"
adapter: "redis"
features: "FASTLANE_BROADCASTS=1"
steps:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.3
bundler-cache: true
- name: Run benchmarks
env:
ACTION_CABLE_ADAPTER: ${{ matrix.adapter }}
MODE: smoke
run: |
${{ matrix.features }} bundle exec ruby --yjit benchmarks/broadcasting.rb
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## main

- Add `config.fastlane_broadcasts_enabled` to opt-in for optimized broadcasts (no double JSON encoding).

## 0.1.1

- Added RSpec patch.
Expand Down
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ gem "cgi", ">= 0.3.6", require: false
# Workaround until all supported Ruby versions ship with uri version 0.13.1 or higher.
gem "uri", ">= 0.13.1", require: false

# For benchmarks and profiling
gem "benchmark-ips", require: false
gem "vernier", require: false

group :rubocop do
gem "rubocop", ">= 1.25.1", require: false
gem "rubocop-minitest", require: false
Expand Down
203 changes: 203 additions & 0 deletions benchmarks/broadcasting.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# frozen_string_literal: true

require "bundler/setup"

require "benchmark/ips"
require "action_cable"

ActionCable.server.config.cable = { "adapter" => ENV.fetch("ACTION_CABLE_ADAPTER", "async") }
ActionCable.server.config.logger = ENV["LOG"] == "1" ? Logger.new(STDOUT) : Logger.new(nil)
ActionCable.server.config.fastlane_broadcasts_enabled = true if %w[1 t true].include?(ENV["FASTLANE_BROADCASTS"])
ActionCable.server.config.worker_pool_size = ENV.fetch("WORKER_POOL_SIZE", 4).to_i

# Number of clients
N = ENV.fetch("N", 10).to_i

# Number of messages to broadcast per run
B = ENV.fetch("M", N * 10).to_i

# Benchmark mode
mode = ENV.fetch("MODE", "benchmark")

$stdout.puts "Running #{mode} with N=#{N}, M=#{B}, adapter=#{ActionCable.server.config.cable["adapter"]}, fastlane_broadcasts_enabled=#{ActionCable.server.config.fastlane_broadcasts_enabled}, worker_pool_size=#{ActionCable.server.config.worker_pool_size}"

module ApplicationCable
class Connection < ActionCable::Connection::Base
end

class Channel < ActionCable::Channel::Base
end
end

class TestChannel < ApplicationCable::Channel
def subscribed
stream_from "all"
end
end

class TestSocket
attr_reader :server
delegate :pubsub, :config, :executor, :logger, :worker_pool, to: :server

def initialize(server, sync_queue: nil)
@coder = ActiveSupport::JSON
@sync_queue = sync_queue
@server = server
@transmissions = []
end

def perform_work(receiver, method, *args)
worker_pool.async_invoke(receiver, method, *args, connection: self)
end

def transmit(cable_message)
encode(cable_message).then do
raw_transmit(_1)
end
end

def raw_transmit(msg)
@transmissions << msg
@sync_queue&.push(msg)
end

def close(...)
end

def encode(cable_message)
@coder.encode cable_message
end
end

sync_queue = Queue.new
clients = N.times.map do
ApplicationCable::Connection.new(ActionCable.server, TestSocket.new(ActionCable.server, sync_queue:))
end

# Subscribe all clients to the same channel
subscribe_cmd = { "command" => "subscribe", "identifier" => { "channel" => "TestChannel" }.to_json }
clients.each { _1.handle_channel_command(subscribe_cmd) }

received = 0
clients.size.times do
raise "Expected to receive #{clients.size} confirmations, got only #{received}" if sync_queue.pop(timeout: 5.0).nil?
received += 1
end

SMALL_JSON = { "message" => "hello" }.freeze

require "active_support/time_with_zone"
LARGE_JSON = ActiveSupport::TimeZone.all.take(5).to_json.then { JSON.parse(_1) }.freeze

TURBO_STREAM = <<~HTML
<turbo-stream action="append" target="comments-container">
<template>
<div class="flex space-x-4 p-4 bg-white rounded-lg shadow-sm border border-gray-200 mb-4" id="comment-123">
<div class="flex-shrink-0">
<img class="h-10 w-10 rounded-full object-cover"
src="https://avatars.githubusercontent.com/u/12345678"
alt="User avatar">
</div>
<div class="flex-grow">
<div class="flex items-center justify-between mb-2">
<div>
<h4 class="text-sm font-medium text-gray-900">Sarah Johnson</h4>
<p class="text-xs text-gray-500">Posted 2 minutes ago</p>
</div>
<div class="flex items-center space-x-2">
<button class="text-gray-400 hover:text-gray-600">
<svg class="h-4 w-4" fill="currentColor" viewBox="0 0 20 20">
<path d="M6 10a2 2 0 11-4 0 2 2 0 014 0zM12 10a2 2 0 11-4 0 2 2 0 014 0zM16 12a2 2 0 100-4 2 2 0 000 4z" />
</svg>
</button>
</div>
</div>
<div class="prose prose-sm text-gray-700">
<p>This is a really insightful article! I especially appreciated the section about implementing Hotwire in Ruby on Rails applications. The examples were clear and helped me understand the concepts better.</p>
</div>
<div class="mt-3 flex items-center space-x-4">
<button class="flex items-center text-sm text-gray-500 hover:text-gray-700">
<svg class="h-4 w-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4.318 6.318a4.5 4.5 0 000 6.364L12 20.364l7.682-7.682a4.5 4.5 0 00-6.364-6.364L12 7.636l-1.318-1.318a4.5 4.5 0 00-6.364 0z" />
</svg>
<span>12 likes</span>
</button>
<button class="flex items-center text-sm text-gray-500 hover:text-gray-700">
<svg class="h-4 w-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8 12h.01M12 12h.01M16 12h.01M21 12c0 4.418-4.03 8-9 8a9.863 9.863 0 01-4.255-.949L3 20l1.395-3.72C3.512 15.042 3 13.574 3 12c0-4.418 4.03-8 9-8s9 3.582 9 8z" />
</svg>
<span>Reply</span>
</button>
</div>
</div>
</div>
</template>
</turbo-stream>
HTML

HOTWIRE = { "html" => TURBO_STREAM }.freeze

SCENARIOS = {
"small json" => SMALL_JSON,
"large json" => LARGE_JSON,
"turbo stream" => HOTWIRE
}

if mode == "benchmark"
$stdout.puts "Running benchmarks..."

Benchmark.ips do |x|
x.config(warmup: 5, time: 10)

SCENARIOS.each do |name, payload|
x.report("#{name} (#{ActiveSupport::NumberHelper.number_to_human_size(payload.to_json.size)})") do
Thread.new do
B.times do
ActionCable.server.broadcast("all", payload)
end
end

(B * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }
end
end
end
elsif mode == "profile"
$stdout.puts "Profiling..."

require "vernier"

scenario = ENV.fetch("SCENARIO", "small json")
payload = SCENARIOS.fetch(scenario)
path = File.join(__dir__, "../tmp/broadcasting_#{[scenario, N, B, ActionCable.server.config.fastlane_broadcasts_enabled ? "fastlane" : nil].compact.join("-").parameterize}.json")

# warmup
th = Thread.new do
10.times do
ActionCable.server.broadcast("all", payload)
end
end

(10 * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }

Vernier.trace(out: path) do
th = Thread.new do
B.times do
ActionCable.server.broadcast("all", payload)
end
end

(B * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }
th.join
end

$stdout.puts "Profiling done: #{path}"
elsif mode == "smoke"
ActionCable.server.broadcast("all", { message: "hello" })
N.times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }
puts "All good"
else
raise "Unknown mode: #{mode}"
end
13 changes: 12 additions & 1 deletion lib/action_cable/channel/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,21 @@ def streams
# Always wrap the outermost handler to invoke the user handler on the worker
# pool rather than blocking the event loop.
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
if !user_handler && !coder && connection.config.fastlane_broadcasts_enabled
return opt_worker_pool_stream_handler(broadcasting)
end

handler = stream_handler(broadcasting, user_handler, coder: coder)

-> message do
connection.perform_work handler, :call, message
connection.perform_work handler, :call, message.data
end
end

# Optimized stream handler that avoids double JSON encoding/decoding on broadcast
def opt_worker_pool_stream_handler(broadcasting)
-> (message) do
connection.raw_transmit message.encoded_for(@identifier)
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib/action_cable/connection/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def transmit(data) # :nodoc:
socket.transmit(data)
end

def raw_transmit(data) # :nodoc:
socket.raw_transmit(data)
end

# Close the connection.
def close(reason: nil, reconnect: true)
transmit(
Expand Down
2 changes: 1 addition & 1 deletion lib/action_cable/connection/internal_channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def internal_channel

def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message ActiveSupport::JSON.decode(message) }
callback = -> (message) { process_internal_message ActiveSupport::JSON.decode(message.data) }
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]

Expand Down
3 changes: 3 additions & 0 deletions lib/action_cable/server/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Configuration
attr_accessor :precompile_assets
attr_accessor :health_check_path, :health_check_application
attr_writer :pubsub_adapter
attr_accessor :fastlane_broadcasts_enabled

def initialize
@log_tags = []
Expand All @@ -31,6 +32,8 @@ def initialize
@allow_same_origin_as_host = true
@filter_parameters = []

@fastlane_broadcasts_enabled = false

@health_check_application = ->(env) {
[200, { Rack::CONTENT_TYPE => "text/html", "date" => Time.now.httpdate }, []]
}
Expand Down
6 changes: 6 additions & 0 deletions lib/action_cable/server/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ def transmit(cable_message)
websocket.transmit encode(cable_message)
end

def raw_transmit(message)
return unless websocket.alive?

websocket.transmit message
end

# Close the WebSocket connection.
def close(...)
websocket.close(...) if websocket.alive?
Expand Down
17 changes: 16 additions & 1 deletion lib/action_cable/subscription_adapter/subscriber_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@
module ActionCable
module SubscriptionAdapter
class SubscriberMap
class Message < Struct.new(:data)
def initialize(...)
super
@cache = Concurrent::Map.new
end

def encoded_for(identifier)
@cache.compute_if_absent(identifier) do
ActiveSupport::JSON.encode({ identifier: identifier, message: ActiveSupport::JSON.decode(data) })
end
end
end

def initialize
@subscribers = Hash.new { |h, k| h[k] = [] }
@sync = Mutex.new
Expand Down Expand Up @@ -41,8 +54,10 @@ def broadcast(channel, message)
@subscribers[channel].dup
end

msg = Message.new(message)

list.each do |subscriber|
invoke_callback(subscriber, message)
invoke_callback(subscriber, msg)
end
end

Expand Down
3 changes: 3 additions & 0 deletions test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def setup

# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true

# enable fastlane broadcasts (if env variable provided)
server.config.fastlane_broadcasts_enabled = true if %w[1 t true].include?(ENV["ACTION_CABLE_FASTLANE_BROADCASTS"])
end

def with_puma_server(rack_app = ActionCable.server, port = 3099)
Expand Down
2 changes: 1 addition & 1 deletion test/subscription_adapter/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def teardown
def subscribe_as_queue(channel, adapter = @rx_adapter)
queue = Queue.new

callback = -> data { queue << data }
callback = -> msg { queue << msg.data }
subscribed = Concurrent::Event.new
adapter.subscribe(channel, callback, Proc.new { subscribed.set })
subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
Expand Down

0 comments on commit 10e1a80

Please sign in to comment.