Skip to content

Commit

Permalink
Add support for pagination cursors
Browse files Browse the repository at this point in the history
Not tested live yet
  • Loading branch information
progval committed Jun 2, 2023
1 parent 7d583b2 commit ce16589
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 28 deletions.
150 changes: 126 additions & 24 deletions lib/matrix_client/chat_history.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,36 @@ defmodule M51.MatrixClient.ChatHistory do
def after_(sup_pid, room_id, anchor, limit) do
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)

case parse_anchor(anchor) do
{:ok, event_id} ->
case parse_anchor(anchor, true) do
{:ok, :msgid, event_id} ->
case M51.MatrixClient.Client.get_event_context(
client,
room_id,
event_id,
limit * 2
) do
{:ok, events} -> {:ok, process_events(sup_pid, room_id, events["events_after"])}
{:error, message} -> {:error, Kernel.inspect(message)}
{:ok, events} ->
{:ok,
process_events(sup_pid, room_id, events["events_after"], Map.get(events, "end"), nil)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end

{:ok, :cursor, cursor} ->
case M51.MatrixClient.Client.get_events_from_cursor(
client,
room_id,
"f",
cursor,
limit
) do
{:ok, events} ->
{:ok,
process_events(sup_pid, room_id, events["chunk"], Map.get(events, "end"), nil)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end

{:error, message} ->
Expand All @@ -42,9 +62,9 @@ defmodule M51.MatrixClient.ChatHistory do
def around(sup_pid, room_id, anchor, limit) do
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)

case parse_anchor(anchor) do
{:ok, event_id} ->
case M51.MatrixClient.Client.get_event_context(client, room_id, event_id, limit) do
case parse_anchor(anchor, false) do
{:ok, :msgid, event_id} ->
case M51.MatrixClient.Client.get_event_context(client, room_id, event_id, limit - 1) do
{:ok, events} ->
# TODO: if there aren't enough events after (resp. before), allow more
# events before (resp. after) than half the limit.
Expand All @@ -53,9 +73,16 @@ defmodule M51.MatrixClient.ChatHistory do

events_before = events["events_before"] |> Enum.slice(0, nb_before) |> Enum.reverse()
events_after = events["events_after"] |> Enum.slice(0, nb_after)
events = Enum.concat([events_before, [events["event"]], events_after])
events_list = Enum.concat([events_before, [events["event"]], events_after])

{:ok, process_events(sup_pid, room_id, events)}
{:ok,
process_events(
sup_pid,
room_id,
events_list,
Map.get(events, "end"),
Map.get(events, "start")
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
Expand All @@ -69,16 +96,45 @@ defmodule M51.MatrixClient.ChatHistory do
def before(sup_pid, room_id, anchor, limit) do
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)

case parse_anchor(anchor) do
{:ok, event_id} ->
case parse_anchor(anchor, true) do
{:ok, :msgid, event_id} ->
case M51.MatrixClient.Client.get_event_context(
client,
room_id,
event_id,
limit * 2
) do
{:ok, events} ->
{:ok, process_events(sup_pid, room_id, Enum.reverse(events["events_before"]))}
{:ok,
process_events(
sup_pid,
room_id,
Enum.reverse(events["events_before"]),
Map.get(events, "start"),
nil
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end

{:ok, :cursor, cursor} ->
case M51.MatrixClient.Client.get_events_from_cursor(
client,
room_id,
"b",
cursor,
limit
) do
{:ok, events} ->
{:ok,
process_events(
sup_pid,
room_id,
Enum.reverse(events["chunk"]),
Map.get(events, "end"),
nil
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
Expand All @@ -98,28 +154,41 @@ defmodule M51.MatrixClient.ChatHistory do
limit
) do
{:ok, events} ->
{:ok, process_events(sup_pid, room_id, Enum.reverse(events["chunk"]))}
{:ok,
process_events(
sup_pid,
room_id,
Enum.reverse(events["chunk"]),
Map.get(events, "end"),
nil
)}

{:error, message} ->
{:error, Kernel.inspect(message)}
end
end

defp parse_anchor(anchor) do
defp parse_anchor(anchor, allow_cursor) do
case String.split(anchor, "=", parts: 2) do
["msgid", msgid] ->
{:ok, msgid}
{:ok, :msgid, msgid}

["cursor", cursor] when allow_cursor ->
{:ok, :cursor, cursor}

["cursor", _] ->
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid='."}

["timestamp", _] ->
{:error,
"CHATHISTORY with timestamps is not supported. See https://github.com/progval/matrix2051/issues/1"}

_ ->
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid='."}
{:error, "Invalid anchor: '#{anchor}', it should start with 'msgid=' or 'cursor='."}
end
end

defp process_events(sup_pid, room_id, events) do
defp process_events(sup_pid, room_id, events, next, prev) do
pid = self()
write = fn cmd -> send(pid, {:command, cmd}) end

Expand Down Expand Up @@ -154,12 +223,45 @@ defmodule M51.MatrixClient.ChatHistory do
|> Task.await()

# Collect all commands
Stream.unfold(nil, fn _ ->
receive do
{:command, cmd} -> {cmd, nil}
{:finished_processing} -> nil
end
end)
|> Enum.to_list()
batch_content =
Stream.unfold(nil, fn _ ->
receive do
{:command, cmd} -> {cmd, nil}
{:finished_processing} -> nil
end
end)
|> Enum.to_list()

# Prepend cursors, if any
case {next, prev} do
{nil, nil} ->
batch_content

{next, nil} ->
cursors = %M51.Irc.Command{
command: "CHATHISTORY",
params: ["CURSORS", room_id, next]
}

[cursors | batch_content]

{nil, prev} ->
# what do we do here?
# https://github.com/ircv3/ircv3-specifications/pull/525/files#r1214764104
cursors = %M51.Irc.Command{
command: "CHATHISTORY",
params: ["CURSORS", room_id, "*", prev]
}

[cursors | batch_content]

{next, prev} ->
cursors = %M51.Irc.Command{
command: "CHATHISTORY",
params: ["CURSORS", room_id, next, prev]
}

[cursors | batch_content]
end
end
end
40 changes: 40 additions & 0 deletions lib/matrix_client/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,36 @@ defmodule M51.MatrixClient.Client do
{:reply, reply, state}
end

@impl true
def handle_call({:get_events_from_cursor, channel, dir, cursor, limit}, _from, state) do
%M51.MatrixClient.Client{
state: :connected,
irc_pid: irc_pid,
raw_client: raw_client
} = state

matrix_state = M51.IrcConn.Supervisor.matrix_state(irc_pid)

reply =
case M51.MatrixClient.State.room_from_irc_channel(matrix_state, channel) do
nil ->
{:error, {:room_not_found, channel}}

{room_id, _room} ->
path =
"/_matrix/client/v3/rooms/#{urlquote(room_id)}/messages?" <>
URI.encode_query(%{"limit" => limit, "dir" => dir, "from" => cursor})

case M51.Matrix.RawClient.get(raw_client, path) do
{:ok, events} -> {:ok, events}
{:error, error} -> {:error, error}
{:error, nil, error} -> {:error, error}
end
end

{:reply, reply, state}
end

@impl true
def handle_call({:get_latest_events, channel, limit}, _from, state) do
%M51.MatrixClient.Client{
Expand Down Expand Up @@ -559,6 +589,16 @@ defmodule M51.MatrixClient.Client do
GenServer.call(pid, {:get_event_context, channel, event_id, limit}, @timeout)
end

@doc """
Returns a page of events just before/after those returned by a previous call
to get_event_context/4 or get_events_from_cursor/5
https://matrix.org/docs/spec/client_server/r0.6.1#id131
"""
def get_events_from_cursor(pid, channel, dir, cursor, limit) do
GenServer.call(pid, {:get_events_from_cursor, channel, dir, cursor, limit}, @timeout)
end

@doc """
Returns latest events of a room
Expand Down
Loading

0 comments on commit ce16589

Please sign in to comment.