Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add history test #17

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/src/protobuf/protobuf_codec.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//ignore_for_file: unintended_html_in_doc_comment

import 'dart:convert';

import 'package:protobuf/protobuf.dart' as pb;
Expand Down
1 change: 1 addition & 0 deletions lib/src/spinify.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,7 @@ final class Spinify implements ISpinify {
_setState(SpinifyState$Closed());
} on Object {/* ignore */} finally {
if (!force) _mutex.unlock();
if (!state.isClosed) _setState(SpinifyState$Closed());
_statesController.close().ignore();
_eventController.close().ignore();
_log(
Expand Down
38 changes: 38 additions & 0 deletions test/smoke/smoke_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,43 @@ void main() {
expect(client.state, isA<SpinifyState$Closed>());
expect(sub.state.isUnsubscribed, isTrue);
});

test('History', () async {
final client = $createClient();
await client.connect($url);
expect(client.state, isA<SpinifyState$Connected>());
final sub = client.newSubscription('history:index');
await expectLater(sub.subscribe(), completes);
final result = await sub.history();
expect(
result,
isA<SpinifyHistory>().having(
(h) => h.publications,
'publications',
allOf(
[
isA<List<SpinifyPublication>>(),
isNotEmpty,
hasLength(equals(2)),
containsAll([
isA<SpinifyPublication>().having(
(p) => utf8.decode(p.data),
'data',
equals('{"input": "History message 1"}'),
),
isA<SpinifyPublication>().having(
(p) => utf8.decode(p.data),
'data',
equals('{"input": "History message 2"}'),
),
]),
],
),
),
);
await expectLater(sub.unsubscribe(), completes);
await client.close();
expect(client.state, isA<SpinifyState$Closed>());
});
});
}
125 changes: 125 additions & 0 deletions test/unit/subscription_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -739,5 +739,130 @@ void main() {
},
),
);

test(
'History',
() => fakeAsync(
(async) {
Timer? pingTimer;
final client = createFakeClient(transport: (_) async {
pingTimer?.cancel();
late WebSocket$Fake ws; // ignore: close_sinks
return ws = WebSocket$Fake()
..onAdd = (bytes, sink) {
final command = ProtobufCodec.decode(pb.Command(), bytes);
scheduleMicrotask(() {
if (command.hasConnect()) {
final reply = pb.Reply(
id: command.id,
connect: pb.ConnectResult(
client: 'fake',
version: '0.0.1',
expires: false,
ttl: null,
data: null,
subs: <String, pb.SubscribeResult>{},
ping: 600,
pong: false,
session: 'fake',
node: 'fake',
),
);
sink.add(ProtobufCodec.encode(reply));
pingTimer = Timer.periodic(
Duration(milliseconds: reply.connect.ping),
(timer) {
if (ws.isClosed) {
timer.cancel();
return;
}
sink.add(ProtobufCodec.encode(pb.Reply()));
},
);
} else if (command.hasSubscribe() &&
command.subscribe.channel == 'publications:index') {
final reply = pb.Reply(
id: command.id,
subscribe: pb.SubscribeResult(
data: utf8.encode('publications:index'),
epoch: '...',
offset: Int64.ZERO,
expires: false,
ttl: null,
positioned: false,
publications: <pb.Publication>[],
recoverable: false,
recovered: false,
wasRecovering: false,
),
);
sink.add(ProtobufCodec.encode(reply));
} else if (command.hasHistory() &&
command.history.channel == 'publications:index') {
final reply = pb.Reply(
id: command.id,
history: pb.HistoryResult(
epoch: '...',
offset: Int64.ZERO,
publications: <pb.Publication>[
for (var i = 0; i < 256; i++)
pb.Publication(
offset: Int64(i),
data: <int>[
for (var j = 0; j < 256; j++) j & 0xFF,
],
info: pb.ClientInfo(
client: 'fake',
user: 'fake',
),
tags: const <String, String>{
'type': 'notification',
},
),
],
),
);
sink.add(ProtobufCodec.encode(reply));
} else {
debugger();
}
});
};
})
..connect(url);
async.elapse(client.config.timeout);
expect(client.state.isConnected, isTrue);
async.elapse(const Duration(days: 1));
expect(client.state.isConnected, isTrue);
final channel = client.newSubscription('publications:index');
expect(client.subscriptions.client['publications:index'], isNotNull);
expect(channel.state.isSubscribed, isFalse);
channel.subscribe(); // Start subscription
async.elapse(client.config.timeout);
expect(channel.state.isSubscribed, isTrue); // Subscribed
final history = channel.history(); // Get history
async.elapse(const Duration(seconds: 1));
expect(
history,
completion(
isA<SpinifyHistory>().having(
(h) => h.publications,
'publications',
allOf(
isA<List<SpinifyPublication>>(),
hasLength(256), // 256 publications
),
),
),
); // History received
async.elapse(client.config.timeout);
pingTimer?.cancel();
client.close(); // Close connection
async.elapse(const Duration(seconds: 10));
expect(client.state.isClosed, isTrue);
expect(channel.state.isUnsubscribed, isTrue);
},
),
);
});
}
80 changes: 70 additions & 10 deletions tool/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func waitExitSignal(n *centrifuge.Node, s *http.Server, sigCh chan os.Signal) {
<-done
}

var channels = []string{"public:index", "chat:index"}
var channels = []string{"public:index", "chat:index", "history:index"}

// Check whether channel is allowed for subscribing. In real case permission
// check will probably be more complex than in this example.
Expand Down Expand Up @@ -178,15 +178,30 @@ func Centrifuge() (*centrifuge.Node, error) {
return
}

cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
EnableRecovery: true,
EmitPresence: true,
EmitJoinLeave: true,
PushJoinLeave: true,
Data: []byte(`{"msg": "welcome"}`),
},
}, nil)
if (e.Channel == "history:index") {
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
EnableRecovery: true,
EmitPresence: true,
EmitJoinLeave: true,
PushJoinLeave: true,
Data: []byte(`{"msg": "welcome"}`),
HistoryMetaTTL: 24 * time.Hour,
ChannelInfo: []byte(`{"channel": "history:index"}`),
ExpireAt: 0,
},
}, nil)
} else {
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
EnableRecovery: true,
EmitPresence: true,
EmitJoinLeave: true,
PushJoinLeave: true,
Data: []byte(`{"msg": "welcome"}`),
},
}, nil)
}
})

client.OnMessage(func(e centrifuge.MessageEvent) {
Expand Down Expand Up @@ -220,6 +235,51 @@ func Centrifuge() (*centrifuge.Node, error) {
cb(centrifuge.PublishReply{Result: &result}, err)
})

client.OnHistory(func(e centrifuge.HistoryEvent, cb centrifuge.HistoryCallback) {
log.Printf("[user %s] requests history for channel %s", client.UserID(), e.Channel)
if !client.IsSubscribed(e.Channel) {
log.Printf("[user %s] requests history for channel %s: permission denied", client.UserID(), e.Channel)
cb(centrifuge.HistoryReply{}, centrifuge.ErrorPermissionDenied)
return
} else if (e.Channel == "history:index") {
cb(centrifuge.HistoryReply{
Result: &centrifuge.HistoryResult{
Publications: []*centrifuge.Publication{
{
Offset: 1,
Data: []byte(`{"input": "History message 1"}`),
Info: &centrifuge.ClientInfo{
ClientID: "server",
UserID: "42",
ConnInfo: []byte(`{"user": "42"}`),
ChanInfo: []byte(`{"channel": "history:index"}`),
},
Tags: map[string]string{
"key": "value",
},
},
{
Offset: 2,
Data: []byte(`{"input": "History message 2"}`),
Info: &centrifuge.ClientInfo{
ClientID: "server",
UserID: "42",
ConnInfo: []byte(`{"user": "42"}`),
ChanInfo: []byte(`{"channel": "history:index"}`),
},
Tags: map[string]string{
"key": "value",
},
},
},
},
}, nil)
} else {
cb(centrifuge.HistoryReply{}, nil)
}
})


client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) {
log.Printf("[user %s] sent RPC, method: %s, data: %s", client.UserID(), e.Method, string(e.Data))
switch e.Method {
Expand Down
Loading