Skip to content

Commit

Permalink
Fix transaction filtering by taking into account snapshot's xmax and …
Browse files Browse the repository at this point in the history
…xip_list (#2331)

Electric now stores the full pg snapshot info in the shape snapshot.
Transaction filtering logic is then improved to skip additional
transactions that are already included in the snapshot.

This change is backwards-compatible with existing shapes.

Fixes #2313.
  • Loading branch information
alco authored Feb 19, 2025
1 parent d22e363 commit e6e0eae
Show file tree
Hide file tree
Showing 15 changed files with 280 additions and 101 deletions.
16 changes: 5 additions & 11 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Electric.ShapeCacheBehaviour do

@type shape_handle :: String.t()
@type shape_def :: Shape.t()
@type xmin :: non_neg_integer()

@callback get_shape(shape_def(), opts :: Access.t()) ::
{shape_handle(), current_snapshot_offset :: LogOffset.t()} | nil
Expand Down Expand Up @@ -234,10 +233,7 @@ defmodule Electric.ShapeCache do
{shape_state, state}
else
{:ok, shape_handle} = shape_status.add_shape(state.shape_status_state, shape)

{:ok, _pid, _snapshot_xmin, latest_offset} =
start_shape(shape_handle, shape, state, otel_ctx)

{:ok, latest_offset} = start_shape(shape_handle, shape, state, otel_ctx)
{{shape_handle, latest_offset}, state}
end

Expand Down Expand Up @@ -299,7 +295,7 @@ defmodule Electric.ShapeCache do
|> state.shape_status.list_shapes()
|> Enum.each(fn {shape_handle, shape} ->
try do
{:ok, _pid, _snapshot_xmin, _latest_offset} = start_shape(shape_handle, shape, state)
{:ok, _latest_offset} = start_shape(shape_handle, shape, state)

# recover publication filter state
publication_manager.recover_shape(shape, publication_manager_opts)
Expand All @@ -315,7 +311,7 @@ defmodule Electric.ShapeCache do
end

defp start_shape(shape_handle, shape, state, otel_ctx \\ nil) do
with {:ok, pid} <-
with {:ok, _pid} <-
Electric.Shapes.DynamicConsumerSupervisor.start_shape_consumer(
state.consumer_supervisor,
stack_id: state.stack_id,
Expand All @@ -334,10 +330,8 @@ defmodule Electric.ShapeCache do
otel_ctx: otel_ctx
) do
consumer = Shapes.Consumer.name(state.stack_id, shape_handle)

{:ok, snapshot_xmin, latest_offset} = Shapes.Consumer.initial_state(consumer)

{:ok, pid, snapshot_xmin, latest_offset}
{:ok, latest_offset} = Shapes.Consumer.initial_state(consumer)
{:ok, latest_offset}
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Electric.ShapeCache.CrashingFileStorage do
defdelegate get_all_stored_shapes(opts), to: FileStorage
defdelegate get_total_disk_usage(opts), to: FileStorage
defdelegate get_current_position(opts), to: FileStorage
defdelegate set_snapshot_xmin(xmin, opts), to: FileStorage
defdelegate set_pg_snapshot(pg_snapshot, opts), to: FileStorage
defdelegate snapshot_started?(opts), to: FileStorage
defdelegate make_new_snapshot!(data_stream, opts), to: FileStorage
defdelegate mark_snapshot_as_started(opts), to: FileStorage
Expand Down
20 changes: 13 additions & 7 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Electric.ShapeCache.FileStorage do
@shape_definition_file_name "shape_defintion.json"

@xmin_key :snapshot_xmin
@pg_snapshot_key :pg_snapshot
@snapshot_meta_key :snapshot_meta
@snapshot_started_key :snapshot_started
@compaction_info_key :compaction_info
Expand Down Expand Up @@ -112,7 +113,7 @@ defmodule Electric.ShapeCache.FileStorage do
def initialise(%FS{} = opts) do
stored_version = stored_version(opts)

if stored_version != opts.version || snapshot_xmin(opts) == nil ||
if stored_version != opts.version || is_nil(pg_snapshot(opts)) ||
not File.exists?(shape_definition_path(opts)) ||
not CubDB.has_key?(opts.db, @snapshot_meta_key) do
cleanup_internals!(opts)
Expand Down Expand Up @@ -218,7 +219,7 @@ defmodule Electric.ShapeCache.FileStorage do

@impl Electric.ShapeCache.Storage
def get_current_position(%FS{} = opts) do
{:ok, latest_offset(opts), snapshot_xmin(opts)}
{:ok, latest_offset(opts), pg_snapshot(opts)}
end

defp latest_offset(opts) do
Expand All @@ -245,13 +246,17 @@ defmodule Electric.ShapeCache.FileStorage do
end
end

defp snapshot_xmin(opts) do
CubDB.get(opts.db, @xmin_key)
defp pg_snapshot(opts) do
# Temporary fallback to @xmin_key until we do a breaking release that drops that key entirely.
with nil <- CubDB.get(opts.db, @pg_snapshot_key),
xmin when not is_nil(xmin) <- CubDB.get(opts.db, @xmin_key) do
%{xmin: xmin, xmax: xmin + 1, xip_list: [xmin], filter_txns?: true}
end
end

@impl Electric.ShapeCache.Storage
def set_snapshot_xmin(xmin, %FS{} = opts) do
CubDB.put(opts.db, @xmin_key, xmin)
def set_pg_snapshot(pg_snapshot, %FS{} = opts) do
CubDB.put(opts.db, @pg_snapshot_key, pg_snapshot)
end

@impl Electric.ShapeCache.Storage
Expand Down Expand Up @@ -663,8 +668,9 @@ defmodule Electric.ShapeCache.FileStorage do

defp cleanup_internals!(%FS{} = opts) do
[
@snapshot_meta_key,
@xmin_key,
@pg_snapshot_key,
@snapshot_meta_key,
@snapshot_started_key
]
|> Enum.concat(keys_from_range(log_start(), log_end(), opts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Electric.ShapeCache.InMemoryStorage do

@snapshot_start_index 0
@snapshot_end_index :end
@xmin_key :xmin
@pg_snapshot_key :pg_snapshot

defstruct [
:table_base_name,
Expand Down Expand Up @@ -85,16 +85,13 @@ defmodule Electric.ShapeCache.InMemoryStorage do

@impl Electric.ShapeCache.Storage
def get_current_position(%MS{} = opts) do
{:ok, current_offset(opts), current_xmin(opts)}
{:ok, current_offset(opts), pg_snapshot(opts)}
end

defp current_xmin(opts) do
case :ets.lookup(opts.snapshot_table, @xmin_key) do
[] ->
nil

[{@xmin_key, xmin}] ->
xmin
defp pg_snapshot(opts) do
case :ets.lookup(opts.snapshot_table, @pg_snapshot_key) do
[{@pg_snapshot_key, pg_snapshot}] -> pg_snapshot
[] -> nil
end
end

Expand All @@ -103,8 +100,8 @@ defmodule Electric.ShapeCache.InMemoryStorage do
end

@impl Electric.ShapeCache.Storage
def set_snapshot_xmin(xmin, %MS{} = opts) do
:ets.insert(opts.snapshot_table, {@xmin_key, xmin})
def set_pg_snapshot(pg_snapshot, %MS{} = opts) do
:ets.insert(opts.snapshot_table, {@pg_snapshot_key, pg_snapshot})
:ok
end

Expand Down
17 changes: 12 additions & 5 deletions packages/sync-service/lib/electric/shape_cache/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ defmodule Electric.ShapeCache.Storage do

@type shape_handle :: Electric.ShapeCacheBehaviour.shape_handle()
@type xmin :: Electric.ShapeCacheBehaviour.xmin()
@type pg_snapshot :: %{
xmin: pos_integer(),
xmax: pos_integer(),
xip_list: [pos_integer()],
filter_txns?: boolean()
}
@type offset :: LogOffset.t()

@type compiled_opts :: term()
Expand Down Expand Up @@ -47,13 +53,14 @@ defmodule Electric.ShapeCache.Storage do
@callback get_total_disk_usage(compiled_opts()) :: non_neg_integer()

@doc """
Get the current xmin and offset for the shape storage.
Get the current pg_snapshot and offset for the shape storage.
If the instance is new, then it MUST return `{LogOffset.first(), nil}`.
"""
@callback get_current_position(shape_opts()) :: {:ok, offset(), xmin() | nil} | {:error, term()}
@callback get_current_position(shape_opts()) ::
{:ok, offset(), pg_snapshot() | nil} | {:error, term()}

@callback set_snapshot_xmin(xmin(), shape_opts()) :: :ok
@callback set_pg_snapshot(pg_snapshot(), shape_opts()) :: :ok

@doc "Check if snapshot for a given shape handle already exists"
@callback snapshot_started?(shape_opts()) :: boolean()
Expand Down Expand Up @@ -159,8 +166,8 @@ defmodule Electric.ShapeCache.Storage do
end

@impl __MODULE__
def set_snapshot_xmin(xmin, {mod, shape_opts}) do
mod.set_snapshot_xmin(xmin, shape_opts)
def set_pg_snapshot(pg_snapshot, {mod, shape_opts}) do
mod.set_pg_snapshot(pg_snapshot, shape_opts)
end

@impl __MODULE__
Expand Down
Loading

0 comments on commit e6e0eae

Please sign in to comment.