Skip to content

Commit

Permalink
feat: Fallback to include whole table in replication if where clauses…
Browse files Browse the repository at this point in the history
… unsupported (#2367)

Fixes #2360

This will make the running Electric fallback to replicating whole tables
if it receives any shapes with unsupported where clauses (e.g. enums,
varchar with `IN` checks, user-defined data types in general, and who
knows what else).

There is no "recovery" mechanism to return to row-filtering as the
Postgres error does not allow for an easy way to check which where
clause caused the issue - once we go to relation-only filtering we stay
there, like we would if an active shape had no where clause or if we
were in PG14.

Ideally we would detect where clauses that are unsupported at the
relation filter processing level, so we can fine tune that, but until
then this fallback makes sure that Electric works even if an unsupported
where clause is provided.

As discussed in [this Discord
thread](https://discord.com/channels/933657521581858818/1341967559758581921),
we could also have a configuration flag and better errors to avoid this
sort of radical fallback, but we opted for an "always works" approach
here. IIRC some benchmarking had shown that our filtering is fast enough
that the PG level filtering might not be as important anyway, although
the limiting of transmitted data is definitely nice (despite several
issues we have with not being able to limit columns replicated etc).

This is related with
#1778 , and I'm also
referencing #1831 as we
had encountered many limitations to row filtering which has led to this
proposed change.

We can definitely improve this by detecting unsupported where clauses,
checking filter diffs to know what caused the issue and reverting back
after, periodically attempting to revert back to row-filtering, and an
array of different approaches, but this allows all where clauses to be
accepted and Electric to adjust accordingly.
  • Loading branch information
msfstef authored Feb 25, 2025
1 parent 7e860a6 commit f92d4b3
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changeset/lazy-cameras-clap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fallback to replicating whole relations if publication row filtering cannot support given where clauses.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Electric.Replication.PublicationManager do
:relation_filter_counters,
:prepared_relation_filters,
:committed_relation_filters,
:row_filtering_enabled,
:update_debounce_timeout,
:scheduled_updated_ref,
:retries,
Expand All @@ -30,6 +31,7 @@ defmodule Electric.Replication.PublicationManager do
relation_filter_counters: %{Electric.relation() => map()},
prepared_relation_filters: %{Electric.relation() => __MODULE__.RelationFilter.t()},
committed_relation_filters: %{Electric.relation() => __MODULE__.RelationFilter.t()},
row_filtering_enabled: boolean(),
update_debounce_timeout: timeout(),
scheduled_updated_ref: nil | reference(),
waiters: list(GenServer.from()),
Expand All @@ -43,6 +45,9 @@ defmodule Electric.Replication.PublicationManager do
defmodule RelationFilter do
defstruct [:relation, :where_clauses, :selected_columns]

def relation_only(%__MODULE__{relation: relation} = _filter),
do: %__MODULE__{relation: relation}

@type t :: %__MODULE__{
relation: Electric.relation(),
where_clauses: [Electric.Replication.Eval.Expr.t()] | nil,
Expand Down Expand Up @@ -149,6 +154,7 @@ defmodule Electric.Replication.PublicationManager do
relation_filter_counters: %{},
prepared_relation_filters: %{},
committed_relation_filters: %{},
row_filtering_enabled: true,
scheduled_updated_ref: nil,
retries: 0,
waiters: [],
Expand Down Expand Up @@ -204,7 +210,7 @@ defmodule Electric.Replication.PublicationManager do
state = %{state | scheduled_updated_ref: nil, retries: 0}

case update_publication(state) do
:ok ->
{:ok, state} ->
state = reply_to_waiters(:ok, state)
{:noreply, %{state | committed_relation_filters: relation_filters}}

Expand All @@ -229,24 +235,58 @@ defmodule Electric.Replication.PublicationManager do
defp schedule_update_publication(_timeout, %__MODULE__{scheduled_updated_ref: _} = state),
do: state

@spec update_publication(state()) :: :ok | {:error, term()}
@spec update_publication(state()) :: {:ok, state()} | {:error, term()}
defp update_publication(
%__MODULE__{
committed_relation_filters: committed_filters,
prepared_relation_filters: current_filters
} = _state
} = state
)
when current_filters == committed_filters,
do: :ok
do: {:ok, state}

defp update_publication(
%__MODULE__{
committed_relation_filters: committed_filters,
prepared_relation_filters: current_filters,
row_filtering_enabled: false,
publication_name: publication_name,
db_pool: db_pool,
pg_version: pg_version,
configure_tables_for_replication_fn: configure_tables_for_replication_fn
} = state
) do
# If row filtering is disabled, we only care about changes in actual relations
# included in the publication
if Map.keys(current_filters) == Map.keys(committed_filters) do
{:ok, state}
else
try do
configure_tables_for_replication_fn.(
db_pool,
Map.new(current_filters, fn {rel, filter} ->
{rel, RelationFilter.relation_only(filter)}
end),
pg_version,
publication_name
)

{:ok, state}
rescue
err -> {:error, err}
end
end
end

defp update_publication(
%__MODULE__{
prepared_relation_filters: relation_filters,
row_filtering_enabled: true,
publication_name: publication_name,
db_pool: db_pool,
pg_version: pg_version,
configure_tables_for_replication_fn: configure_tables_for_replication_fn
} = _state
} = state
) do
configure_tables_for_replication_fn.(
db_pool,
Expand All @@ -255,9 +295,28 @@ defmodule Electric.Replication.PublicationManager do
publication_name
)

:ok
{:ok, state}
rescue
err -> {:error, err}
# if we are unable to do row filtering for whatever reason, fall back to doing only
# relation-based filtering - this is a fallback for unsupported where clauses that we
# do not detect when composing relation filters
err ->
case err do
%Postgrex.Error{postgres: %{code: :feature_not_supported}} ->
Logger.warning(
"Row filtering is not supported, falling back to relation-based filtering"
)

update_publication(%__MODULE__{
state
| # disable row filtering and reset committed filters
row_filtering_enabled: false,
committed_relation_filters: %{}
})

_ ->
{:error, err}
end
end

defp get_pg_version(%{pg_version: pg_version} = state) when not is_nil(pg_version), do: state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ defmodule Electric.Postgres.ConfigurationTest do
"""
CREATE TABLE items (
id UUID PRIMARY KEY,
value TEXT NOT NULL
value TEXT NOT NULL,
value_c VARCHAR(255)
)
""",
[]
Expand Down Expand Up @@ -291,6 +292,48 @@ defmodule Electric.Postgres.ConfigurationTest do
)
end

test "fails with invalid where clause error when unsupported clause provided",
%{pool: conn, publication_name: publication, pg_version: pg_version} do
if pg_version >= @pg_15 do
error =
assert_raise Postgrex.Error, fn ->
Configuration.configure_tables_for_replication!(
conn,
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value_c in ('a','b'))"}]
}
},
pg_version,
publication
)
end

assert %Postgrex.Error{
postgres: %{
code: :feature_not_supported,
detail:
"Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed."
}
} = error
else
# pg versions without row filtering should just accept this
assert _ =
Configuration.configure_tables_for_replication!(
conn,
%{
{"public", "items"} => %RelationFilter{
relation: {"public", "items"},
where_clauses: [%Eval.Expr{query: "(value_c in ('a','b'))"}]
}
},
pg_version,
publication
)
end
end

test "fails when a publication doesn't exist", %{pool: conn, pg_version: pg_version} do
assert_raise Postgrex.Error, ~r/undefined_object/, fn ->
Configuration.configure_tables_for_replication!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ defmodule Electric.Replication.PublicationManagerTest do
configure_tables_for_replication_fn: configure_tables_fn
})

%{opts: publication_manager_opts}
%{opts: publication_manager_opts, ctx: ctx}
end

describe "add_shape/2" do
Expand Down Expand Up @@ -220,6 +220,55 @@ defmodule Electric.Replication.PublicationManagerTest do

refute_receive {:filters, _}, 500
end

test "should fallback to relation-only filtering if we cannot do row filtering", %{
ctx: ctx,
opts: opts
} do
GenServer.stop(opts[:server])

test_id = self()

configure_tables_fn = fn _, filters, _, _ ->
if filters |> Map.values() |> Enum.any?(&(&1.where_clauses != nil)) do
send(test_id, {:got_filters, :with_where_clauses})
raise %Postgrex.Error{postgres: %{code: :feature_not_supported}}
end

send(test_id, {:got_filters, :without_where_clauses})
end

%{publication_manager: {_, publication_manager_opts}} =
with_publication_manager(%{
module: ctx.module,
test: ctx.test,
stack_id: ctx.stack_id,
update_debounce_timeout: Access.get(ctx, :update_debounce_timeout, 0),
publication_name: "pub_#{ctx.stack_id}",
pool: :no_pool,
pg_version: 150_001,
configure_tables_for_replication_fn: configure_tables_fn
})

shape1 = generate_shape({"public", "items"}, @where_clause_1)
shape2 = generate_shape({"public", "items"}, @where_clause_2)
shape3 = generate_shape({"public", "items_other"}, @where_clause_2)

# should fall back to relation-only filtering
assert :ok == PublicationManager.add_shape(shape1, publication_manager_opts)
assert_receive {:got_filters, :with_where_clauses}
assert_receive {:got_filters, :without_where_clauses}
refute_receive {:got_filters, _}, 50

# should remain in relation-only filtering mode after that, which
# only updates the publication if the tracked relations change
assert :ok == PublicationManager.add_shape(shape2, publication_manager_opts)
refute_receive {:got_filters, _}, 50

assert :ok == PublicationManager.add_shape(shape3, publication_manager_opts)
assert_receive {:got_filters, :without_where_clauses}
refute_receive {:got_filters, _}, 50
end
end

describe "remove_shape/2" do
Expand Down

0 comments on commit f92d4b3

Please sign in to comment.