Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fastlane broadcasts #10

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Benchmark

on:
push:
branches:
- main
pull_request:
workflow_dispatch:

jobs:
benchmark:
if: ${{ !contains(github.event.head_commit.message, '[ci skip]') }}
runs-on: ubuntu-latest
env:
BUNDLE_JOBS: 4
BUNDLE_RETRY: 3
CI: true
RAILS_VERSION: ${{ matrix.rails_version }}
DATABASE_URL: postgresql://postgres:postgres@localhost:5432
services:
redis:
image: redis:7.0-alpine
ports: ["6379:6379"]
options: --health-cmd="redis-cli ping" --health-interval 1s --health-timeout 3s --health-retries 30
strategy:
fail-fast: false
matrix:
rails_version: ["~> 7.0", "~> 8.0.0"]
adapter: ["redis", "async"]
features: [""]
include:
- rails_version: "~> 8.0.0"
adapter: "redis"
features: "FASTLANE_BROADCASTS=1"
steps:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
with:
ruby-version: 3.3
bundler-cache: true
- name: Run benchmarks
env:
ACTION_CABLE_ADAPTER: ${{ matrix.adapter }}
MODE: smoke
run: |
${{ matrix.features }} bundle exec ruby --yjit benchmarks/broadcasting.rb
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## main

- Add `config.fastlane_broadcasts_enabled` to opt-in for optimized broadcasts (no double JSON encoding).

## 0.1.1

- Added RSpec patch.
Expand Down
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ gem "cgi", ">= 0.3.6", require: false
# Workaround until all supported Ruby versions ship with uri version 0.13.1 or higher.
gem "uri", ">= 0.13.1", require: false

# For benchmarks and profiling
gem "benchmark-ips", require: false
gem "vernier", require: false

group :rubocop do
gem "rubocop", ">= 1.25.1", require: false
gem "rubocop-minitest", require: false
Expand Down
203 changes: 203 additions & 0 deletions benchmarks/broadcasting.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# frozen_string_literal: true

require "bundler/setup"

require "benchmark/ips"
require "action_cable"

ActionCable.server.config.cable = { "adapter" => ENV.fetch("ACTION_CABLE_ADAPTER", "async") }
ActionCable.server.config.logger = ENV["LOG"] == "1" ? Logger.new(STDOUT) : Logger.new(nil)
ActionCable.server.config.fastlane_broadcasts_enabled = true if %w[1 t true].include?(ENV["FASTLANE_BROADCASTS"])
ActionCable.server.config.worker_pool_size = ENV.fetch("WORKER_POOL_SIZE", 4).to_i

# Number of clients
N = ENV.fetch("N", 10).to_i

# Number of messages to broadcast per run
B = ENV.fetch("M", N * 10).to_i

# Benchmark mode
mode = ENV.fetch("MODE", "benchmark")

$stdout.puts "Running #{mode} with N=#{N}, M=#{B}, adapter=#{ActionCable.server.config.cable["adapter"]}, fastlane_broadcasts_enabled=#{ActionCable.server.config.fastlane_broadcasts_enabled}, worker_pool_size=#{ActionCable.server.config.worker_pool_size}"

module ApplicationCable
class Connection < ActionCable::Connection::Base
end

class Channel < ActionCable::Channel::Base
end
end

class TestChannel < ApplicationCable::Channel
def subscribed
stream_from "all"
end
end

class TestSocket
attr_reader :server
delegate :pubsub, :config, :executor, :logger, :worker_pool, to: :server

def initialize(server, sync_queue: nil)
@coder = ActiveSupport::JSON
@sync_queue = sync_queue
@server = server
@transmissions = []
end

def perform_work(receiver, method, *args)
worker_pool.async_invoke(receiver, method, *args, connection: self)
end

def transmit(cable_message)
encode(cable_message).then do
raw_transmit(_1)
end
end

def raw_transmit(msg)
@transmissions << msg
@sync_queue&.push(msg)
end

def close(...)
end

def encode(cable_message)
@coder.encode cable_message
end
end

sync_queue = Queue.new
clients = N.times.map do
ApplicationCable::Connection.new(ActionCable.server, TestSocket.new(ActionCable.server, sync_queue:))
end

# Subscribe all clients to the same channel
subscribe_cmd = { "command" => "subscribe", "identifier" => { "channel" => "TestChannel" }.to_json }
clients.each { _1.handle_channel_command(subscribe_cmd) }

received = 0
clients.size.times do
raise "Expected to receive #{clients.size} confirmations, got only #{received}" if sync_queue.pop(timeout: 5.0).nil?
received += 1
end

SMALL_JSON = { "message" => "hello" }.freeze

require "active_support/time_with_zone"
LARGE_JSON = ActiveSupport::TimeZone.all.take(5).to_json.then { JSON.parse(_1) }.freeze

TURBO_STREAM = <<~HTML
<turbo-stream action="append" target="comments-container">
<template>
<div class="flex space-x-4 p-4 bg-white rounded-lg shadow-sm border border-gray-200 mb-4" id="comment-123">
<div class="flex-shrink-0">
<img class="h-10 w-10 rounded-full object-cover"
src="https://avatars.githubusercontent.com/u/12345678"
alt="User avatar">
</div>
<div class="flex-grow">
<div class="flex items-center justify-between mb-2">
<div>
<h4 class="text-sm font-medium text-gray-900">Sarah Johnson</h4>
<p class="text-xs text-gray-500">Posted 2 minutes ago</p>
</div>
<div class="flex items-center space-x-2">
<button class="text-gray-400 hover:text-gray-600">
<svg class="h-4 w-4" fill="currentColor" viewBox="0 0 20 20">
<path d="M6 10a2 2 0 11-4 0 2 2 0 014 0zM12 10a2 2 0 11-4 0 2 2 0 014 0zM16 12a2 2 0 100-4 2 2 0 000 4z" />
</svg>
</button>
</div>
</div>
<div class="prose prose-sm text-gray-700">
<p>This is a really insightful article! I especially appreciated the section about implementing Hotwire in Ruby on Rails applications. The examples were clear and helped me understand the concepts better.</p>
</div>
<div class="mt-3 flex items-center space-x-4">
<button class="flex items-center text-sm text-gray-500 hover:text-gray-700">
<svg class="h-4 w-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4.318 6.318a4.5 4.5 0 000 6.364L12 20.364l7.682-7.682a4.5 4.5 0 00-6.364-6.364L12 7.636l-1.318-1.318a4.5 4.5 0 00-6.364 0z" />
</svg>
<span>12 likes</span>
</button>
<button class="flex items-center text-sm text-gray-500 hover:text-gray-700">
<svg class="h-4 w-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8 12h.01M12 12h.01M16 12h.01M21 12c0 4.418-4.03 8-9 8a9.863 9.863 0 01-4.255-.949L3 20l1.395-3.72C3.512 15.042 3 13.574 3 12c0-4.418 4.03-8 9-8s9 3.582 9 8z" />
</svg>
<span>Reply</span>
</button>
</div>
</div>
</div>
</template>
</turbo-stream>
HTML

HOTWIRE = { "html" => TURBO_STREAM }.freeze

SCENARIOS = {
"small json" => SMALL_JSON,
"large json" => LARGE_JSON,
"turbo stream" => HOTWIRE
}

if mode == "benchmark"
$stdout.puts "Running benchmarks..."

Benchmark.ips do |x|
x.config(warmup: 5, time: 10)

SCENARIOS.each do |name, payload|
x.report("#{name} (#{ActiveSupport::NumberHelper.number_to_human_size(payload.to_json.size)})") do
Thread.new do
B.times do
ActionCable.server.broadcast("all", payload)
end
end

(B * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }
end
end
end
elsif mode == "profile"
$stdout.puts "Profiling..."

require "vernier"

scenario = ENV.fetch("SCENARIO", "small json")
payload = SCENARIOS.fetch(scenario)
path = File.join(__dir__, "../tmp/broadcasting_#{[scenario, N, B, ActionCable.server.config.fastlane_broadcasts_enabled ? "fastlane" : nil].compact.join("-").parameterize}.json")

# warmup
th = Thread.new do
10.times do
ActionCable.server.broadcast("all", payload)
end
end

(10 * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }

Vernier.trace(out: path) do
th = Thread.new do
B.times do
ActionCable.server.broadcast("all", payload)
end
end

(B * N).times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }
th.join
end

$stdout.puts "Profiling done: #{path}"
elsif mode == "smoke"
ActionCable.server.broadcast("all", { message: "hello" })
N.times { raise "No broadcast message received" unless sync_queue.pop(timeout: 5.0) }
puts "All good"
else
raise "Unknown mode: #{mode}"
end
13 changes: 12 additions & 1 deletion lib/action_cable/channel/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,21 @@ def streams
# Always wrap the outermost handler to invoke the user handler on the worker
# pool rather than blocking the event loop.
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
if !user_handler && !coder && connection.config.fastlane_broadcasts_enabled
return opt_worker_pool_stream_handler(broadcasting)
end

handler = stream_handler(broadcasting, user_handler, coder: coder)

-> message do
connection.perform_work handler, :call, message
connection.perform_work handler, :call, message.data
end
end

# Optimized stream handler that avoids double JSON encoding/decoding on broadcast
def opt_worker_pool_stream_handler(broadcasting)
-> (message) do
connection.raw_transmit message.encoded_for(@identifier)
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib/action_cable/connection/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def transmit(data) # :nodoc:
socket.transmit(data)
end

def raw_transmit(data) # :nodoc:
socket.raw_transmit(data)
end

# Close the connection.
def close(reason: nil, reconnect: true)
transmit(
Expand Down
2 changes: 1 addition & 1 deletion lib/action_cable/connection/internal_channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def internal_channel

def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message ActiveSupport::JSON.decode(message) }
callback = -> (message) { process_internal_message ActiveSupport::JSON.decode(message.data) }
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]

Expand Down
3 changes: 3 additions & 0 deletions lib/action_cable/server/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Configuration
attr_accessor :precompile_assets
attr_accessor :health_check_path, :health_check_application
attr_writer :pubsub_adapter
attr_accessor :fastlane_broadcasts_enabled

def initialize
@log_tags = []
Expand All @@ -31,6 +32,8 @@ def initialize
@allow_same_origin_as_host = true
@filter_parameters = []

@fastlane_broadcasts_enabled = false

@health_check_application = ->(env) {
[200, { Rack::CONTENT_TYPE => "text/html", "date" => Time.now.httpdate }, []]
}
Expand Down
6 changes: 6 additions & 0 deletions lib/action_cable/server/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ def transmit(cable_message)
websocket.transmit encode(cable_message)
end

def raw_transmit(message)
return unless websocket.alive?

websocket.transmit message
end

# Close the WebSocket connection.
def close(...)
websocket.close(...) if websocket.alive?
Expand Down
17 changes: 16 additions & 1 deletion lib/action_cable/subscription_adapter/subscriber_map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@
module ActionCable
module SubscriptionAdapter
class SubscriberMap
class Message < Struct.new(:data)
def initialize(...)
super
@cache = Concurrent::Map.new
end

def encoded_for(identifier)
@cache.compute_if_absent(identifier) do
ActiveSupport::JSON.encode({ identifier: identifier, message: ActiveSupport::JSON.decode(data) })
end
end
end

def initialize
@subscribers = Hash.new { |h, k| h[k] = [] }
@sync = Mutex.new
Expand Down Expand Up @@ -41,8 +54,10 @@ def broadcast(channel, message)
@subscribers[channel].dup
end

msg = Message.new(message)

list.each do |subscriber|
invoke_callback(subscriber, message)
invoke_callback(subscriber, msg)
end
end

Expand Down
3 changes: 3 additions & 0 deletions test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ def setup

# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true

# enable fastlane broadcasts (if env variable provided)
server.config.fastlane_broadcasts_enabled = true if %w[1 t true].include?(ENV["ACTION_CABLE_FASTLANE_BROADCASTS"])
end

def with_puma_server(rack_app = ActionCable.server, port = 3099)
Expand Down
2 changes: 1 addition & 1 deletion test/subscription_adapter/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def teardown
def subscribe_as_queue(channel, adapter = @rx_adapter)
queue = Queue.new

callback = -> data { queue << data }
callback = -> msg { queue << msg.data }
subscribed = Concurrent::Event.new
adapter.subscribe(channel, callback, Proc.new { subscribed.set })
subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
Expand Down
Loading