diff --git a/lib/src/protobuf/protobuf_codec.dart b/lib/src/protobuf/protobuf_codec.dart index 005abd9..da3f747 100644 --- a/lib/src/protobuf/protobuf_codec.dart +++ b/lib/src/protobuf/protobuf_codec.dart @@ -1,3 +1,5 @@ +//ignore_for_file: unintended_html_in_doc_comment + import 'dart:convert'; import 'package:protobuf/protobuf.dart' as pb; diff --git a/lib/src/spinify.dart b/lib/src/spinify.dart index ba12d77..9375243 100644 --- a/lib/src/spinify.dart +++ b/lib/src/spinify.dart @@ -1231,6 +1231,7 @@ final class Spinify implements ISpinify { _setState(SpinifyState$Closed()); } on Object {/* ignore */} finally { if (!force) _mutex.unlock(); + if (!state.isClosed) _setState(SpinifyState$Closed()); _statesController.close().ignore(); _eventController.close().ignore(); _log( diff --git a/test/smoke/smoke_test.dart b/test/smoke/smoke_test.dart index bb9d000..9d9f2f8 100644 --- a/test/smoke/smoke_test.dart +++ b/test/smoke/smoke_test.dart @@ -274,5 +274,43 @@ void main() { expect(client.state, isA<SpinifyState$Closed>()); expect(sub.state.isUnsubscribed, isTrue); }); + + test('History', () async { + final client = $createClient(); + await client.connect($url); + expect(client.state, isA<SpinifyState$Connected>()); + final sub = client.newSubscription('history:index'); + await expectLater(sub.subscribe(), completes); + final result = await sub.history(); + expect( + result, + isA<SpinifyHistory>().having( + (h) => h.publications, + 'publications', + allOf( + [ + isA<List<SpinifyPublication>>(), + isNotEmpty, + hasLength(equals(2)), + containsAll([ + isA<SpinifyPublication>().having( + (p) => utf8.decode(p.data), + 'data', + equals('{"input": "History message 1"}'), + ), + isA<SpinifyPublication>().having( + (p) => utf8.decode(p.data), + 'data', + equals('{"input": "History message 2"}'), + ), + ]), + ], + ), + ), + ); + await expectLater(sub.unsubscribe(), completes); + await client.close(); + expect(client.state, isA<SpinifyState$Closed>()); + }); }); } diff --git a/test/unit/subscription_test.dart b/test/unit/subscription_test.dart index 70dab04..16d6428 100644 --- a/test/unit/subscription_test.dart +++ b/test/unit/subscription_test.dart @@ -739,5 +739,130 @@ void main() { }, ), ); + + test( + 'History', + () => fakeAsync( + (async) { + Timer? pingTimer; + final client = createFakeClient(transport: (_) async { + pingTimer?.cancel(); + late WebSocket$Fake ws; // ignore: close_sinks + return ws = WebSocket$Fake() + ..onAdd = (bytes, sink) { + final command = ProtobufCodec.decode(pb.Command(), bytes); + scheduleMicrotask(() { + if (command.hasConnect()) { + final reply = pb.Reply( + id: command.id, + connect: pb.ConnectResult( + client: 'fake', + version: '0.0.1', + expires: false, + ttl: null, + data: null, + subs: <String, pb.SubscribeResult>{}, + ping: 600, + pong: false, + session: 'fake', + node: 'fake', + ), + ); + sink.add(ProtobufCodec.encode(reply)); + pingTimer = Timer.periodic( + Duration(milliseconds: reply.connect.ping), + (timer) { + if (ws.isClosed) { + timer.cancel(); + return; + } + sink.add(ProtobufCodec.encode(pb.Reply())); + }, + ); + } else if (command.hasSubscribe() && + command.subscribe.channel == 'publications:index') { + final reply = pb.Reply( + id: command.id, + subscribe: pb.SubscribeResult( + data: utf8.encode('publications:index'), + epoch: '...', + offset: Int64.ZERO, + expires: false, + ttl: null, + positioned: false, + publications: <pb.Publication>[], + recoverable: false, + recovered: false, + wasRecovering: false, + ), + ); + sink.add(ProtobufCodec.encode(reply)); + } else if (command.hasHistory() && + command.history.channel == 'publications:index') { + final reply = pb.Reply( + id: command.id, + history: pb.HistoryResult( + epoch: '...', + offset: Int64.ZERO, + publications: <pb.Publication>[ + for (var i = 0; i < 256; i++) + pb.Publication( + offset: Int64(i), + data: <int>[ + for (var j = 0; j < 256; j++) j & 0xFF, + ], + info: pb.ClientInfo( + client: 'fake', + user: 'fake', + ), + tags: const <String, String>{ + 'type': 'notification', + }, + ), + ], + ), + ); + sink.add(ProtobufCodec.encode(reply)); + } else { + debugger(); + } + }); + }; + }) + ..connect(url); + async.elapse(client.config.timeout); + expect(client.state.isConnected, isTrue); + async.elapse(const Duration(days: 1)); + expect(client.state.isConnected, isTrue); + final channel = client.newSubscription('publications:index'); + expect(client.subscriptions.client['publications:index'], isNotNull); + expect(channel.state.isSubscribed, isFalse); + channel.subscribe(); // Start subscription + async.elapse(client.config.timeout); + expect(channel.state.isSubscribed, isTrue); // Subscribed + final history = channel.history(); // Get history + async.elapse(const Duration(seconds: 1)); + expect( + history, + completion( + isA<SpinifyHistory>().having( + (h) => h.publications, + 'publications', + allOf( + isA<List<SpinifyPublication>>(), + hasLength(256), // 256 publications + ), + ), + ), + ); // History received + async.elapse(client.config.timeout); + pingTimer?.cancel(); + client.close(); // Close connection + async.elapse(const Duration(seconds: 10)); + expect(client.state.isClosed, isTrue); + expect(channel.state.isUnsubscribed, isTrue); + }, + ), + ); }); } diff --git a/tool/echo/echo.go b/tool/echo/echo.go index 571318a..489a1f4 100644 --- a/tool/echo/echo.go +++ b/tool/echo/echo.go @@ -50,7 +50,7 @@ func waitExitSignal(n *centrifuge.Node, s *http.Server, sigCh chan os.Signal) { <-done } -var channels = []string{"public:index", "chat:index"} +var channels = []string{"public:index", "chat:index", "history:index"} // Check whether channel is allowed for subscribing. In real case permission // check will probably be more complex than in this example. @@ -178,15 +178,30 @@ func Centrifuge() (*centrifuge.Node, error) { return } - cb(centrifuge.SubscribeReply{ - Options: centrifuge.SubscribeOptions{ - EnableRecovery: true, - EmitPresence: true, - EmitJoinLeave: true, - PushJoinLeave: true, - Data: []byte(`{"msg": "welcome"}`), - }, - }, nil) + if (e.Channel == "history:index") { + cb(centrifuge.SubscribeReply{ + Options: centrifuge.SubscribeOptions{ + EnableRecovery: true, + EmitPresence: true, + EmitJoinLeave: true, + PushJoinLeave: true, + Data: []byte(`{"msg": "welcome"}`), + HistoryMetaTTL: 24 * time.Hour, + ChannelInfo: []byte(`{"channel": "history:index"}`), + ExpireAt: 0, + }, + }, nil) + } else { + cb(centrifuge.SubscribeReply{ + Options: centrifuge.SubscribeOptions{ + EnableRecovery: true, + EmitPresence: true, + EmitJoinLeave: true, + PushJoinLeave: true, + Data: []byte(`{"msg": "welcome"}`), + }, + }, nil) + } }) client.OnMessage(func(e centrifuge.MessageEvent) { @@ -220,6 +235,51 @@ func Centrifuge() (*centrifuge.Node, error) { cb(centrifuge.PublishReply{Result: &result}, err) }) + client.OnHistory(func(e centrifuge.HistoryEvent, cb centrifuge.HistoryCallback) { + log.Printf("[user %s] requests history for channel %s", client.UserID(), e.Channel) + if !client.IsSubscribed(e.Channel) { + log.Printf("[user %s] requests history for channel %s: permission denied", client.UserID(), e.Channel) + cb(centrifuge.HistoryReply{}, centrifuge.ErrorPermissionDenied) + return + } else if (e.Channel == "history:index") { + cb(centrifuge.HistoryReply{ + Result: ¢rifuge.HistoryResult{ + Publications: []*centrifuge.Publication{ + { + Offset: 1, + Data: []byte(`{"input": "History message 1"}`), + Info: ¢rifuge.ClientInfo{ + ClientID: "server", + UserID: "42", + ConnInfo: []byte(`{"user": "42"}`), + ChanInfo: []byte(`{"channel": "history:index"}`), + }, + Tags: map[string]string{ + "key": "value", + }, + }, + { + Offset: 2, + Data: []byte(`{"input": "History message 2"}`), + Info: ¢rifuge.ClientInfo{ + ClientID: "server", + UserID: "42", + ConnInfo: []byte(`{"user": "42"}`), + ChanInfo: []byte(`{"channel": "history:index"}`), + }, + Tags: map[string]string{ + "key": "value", + }, + }, + }, + }, + }, nil) + } else { + cb(centrifuge.HistoryReply{}, nil) + } + }) + + client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) { log.Printf("[user %s] sent RPC, method: %s, data: %s", client.UserID(), e.Method, string(e.Data)) switch e.Method {