Skip to content

Commit

Permalink
fix: Rewrite xid comparison to use proper modulo-2^32 arithmetic (#2320)
Browse files Browse the repository at this point in the history
The logic implemented in
#2217 was wrong. This PR
fixes the comparison by porting the logic from Postgres' C
implementation to Elixir's bitstring syntax.

Fixes #2312.
  • Loading branch information
alco authored Feb 19, 2025
1 parent b535c61 commit d22e363
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 65 deletions.
5 changes: 5 additions & 0 deletions .changeset/short-clouds-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fix transaction ID comparison logic to use correct modulo-2^32 arithmetic.
2 changes: 1 addition & 1 deletion packages/sync-service/.formatter.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
import_deps: [:plug]
import_deps: [:plug, :stream_data]
]
159 changes: 110 additions & 49 deletions packages/sync-service/lib/electric/postgres/xid.ex
Original file line number Diff line number Diff line change
@@ -1,66 +1,127 @@
defmodule Electric.Postgres.Xid do
import Bitwise
@uint32_max 0xFFFFFFFF
@uint32_half_max 0x80000000

@int32_max 0xFFFFFFFF
@int32_half_max 0x7FFFFFFF
# A 64-bit XID with an arbitrary epoch that is equal to @uint32_half_max when truncated to 32
# bits.
@uint64_xid 0x1080000000

@type anyxid :: pos_integer
@type cmp_result :: :lt | :eq | :gt

defguardp int32?(int) when abs(int) <= @int32_max
# We don't include 0 in the definition of uint32 because it is not a valid transaction ID.
defguardp uint32?(num) when num > 0 and num <= @uint32_max

# This is a specialized guard for that specifically determines whether the 32-bit first
# argument is less than the xid8 argument. For the general principle this is based on, look
# at the implementation of `compare/2` below.
defguard xid_lt_xid8(xid, xid8)
when int32?(xid) and
((xid - band(xid8, @int32_max) <= @int32_half_max and
xid < band(xid8, @int32_max)) or
(xid - band(xid8, @int32_max) > @int32_half_max and
xid > band(xid8, @int32_max)))
@doc """
In Postgres, any 32-bit xid has ~2 billion values preceding it and ~2 billion values following it.
Regular autovacuuming maintains this invariant. When we see a difference between two
xids that is larger than 2^31, we know there's been at least one transaction ID wraparound.
Given the invariant mentioned earlier, we assume there's been only one wraparound and so the xid
whose value is larger precedes the other one (or, equivalently, the smaller xid belongs to a
more recent transaction).
@spec compare(anyxid, anyxid) :: cmp_result
For 64-bit xids (Postgres type `xid8`), the regular integer comparison is used because those
xids include the epoch number that tracks the number of xid wraparounds that have happened.
def compare(xid, xid), do: :eq

# When both arguments are 32-bit integers or both have values that don't fit in 32 bits, use the
# direct comparison.
def compare(xid_l, xid_r)
when (int32?(xid_l) and int32?(xid_r)) or not (int32?(xid_l) or int32?(xid_r)),
do: direct_cmp(xid_l, xid_r)

# When one of the arguments is 32-bit and the other one has a value that doesn't fit in 32 bits,
# perform the comparison on masked values.
#
# In Postgres, any xid has ~2 billion values preceding it and ~2 billion values following it.
# Regular autovacuuming maintains this invariant. So when we see a difference between two
# xids that is larger than 2^31, it means the 32-bit argument is a wrapped value, so it
# must be the most recent one.
def compare(xid8, xid) when int32?(xid) do
compare(xid, xid8)
|> reverse_cmp_result()
end
If any one or both arguments are 32-bit xids, the comparison is performed modulo-2^32, the same way it's done in Postgres:
https://github.com/postgres/postgres/blob/302cf15759233e654512979286ce1a5c3b36625f/src/backend/access/transam/transam.c#L276-L293
def compare(xid, xid8) when int32?(xid) do
xid8_masked = band(xid8, @int32_max)
## Tests
diff = xid - xid8_masked
wrapped? = diff > @int32_half_max
iex> compare(3, 3)
:eq
diff_to_cmp_result(wrapped?, diff)
end
iex> compare(2, 1)
:gt
iex> compare(2, 2)
:eq
iex> compare(2, 3)
:lt
iex> compare(#{@uint32_max}, #{@uint32_max})
:eq
iex> compare(1, #{@uint32_half_max})
:lt
iex> compare(1, #{@uint32_half_max + 1})
:lt
iex> compare(1, #{@uint32_half_max + 2})
:gt
iex> compare(1, #{@uint32_max})
:gt
iex> compare(#{@uint32_max}, 1)
:lt
iex> compare(#{@uint32_half_max}, 1)
:gt
iex> compare(#{@uint32_half_max + 1}, 1)
:lt
iex> compare(#{@uint32_half_max}, #{@uint32_max})
:lt
@spec diff_to_cmp_result(wrapped? :: boolean, diff :: integer) :: cmp_result
defp diff_to_cmp_result(false, diff) when diff > 0, do: :gt
defp diff_to_cmp_result(false, diff) when diff < 0, do: :lt
defp diff_to_cmp_result(true, diff) when diff > 0, do: :lt
defp diff_to_cmp_result(true, diff) when diff < 0, do: :gt
iex> compare(#{@uint32_half_max - 1}, #{@uint32_max})
:lt
###
iex> compare(#{@uint32_half_max - 2}, #{@uint32_max})
:gt
defp direct_cmp(xid_l, xid_r) when xid_l < xid_r, do: :lt
defp direct_cmp(xid_l, xid_r) when xid_l > xid_r, do: :gt
Any of the two arguments can be 64-bit, the order doesn't matter:
iex> compare(1, #{@uint64_xid})
:lt
iex> compare(1, #{@uint64_xid + 1})
:lt
iex> compare(1, #{@uint64_xid + 2})
:gt
iex> compare(#{@uint64_xid}, 1)
:gt
iex> compare(#{@uint64_xid + 1}, 1)
:lt
# When both numbers are 64-bit, regular comparison rules apply:
iex> compare(#{@uint64_xid + 2}, #{@uint64_xid + 1})
:gt
iex> compare(#{@uint64_xid}, #{@uint64_xid + @uint32_half_max + 2})
:lt
"""
@spec compare(anyxid, anyxid) :: cmp_result

# If both numbers do not fit into 32 bits, then both are of type xid8 and we compare them
# using regular comparison.
def compare(xid8_l, xid8_r)
when not uint32?(xid8_l) and not uint32?(xid8_r) and xid8_l > 0 and xid8_r > 0 do
cmp(xid8_l, xid8_r)
end

# If one of the numbers is a 32-bit unsigned integer, we compare the two numbers using
# modulo-2^32 arithmetic.
def compare(xid_l, xid_r) when (uint32?(xid_l) or uint32?(xid_r)) and xid_l > 0 and xid_r > 0 do
# This produces equivalent results to the following C code:
#
# uint32 xid_l, xid_r;
# int32 signed_diff = (int32)(xid_l - xid_r);
#
<<signed_diff::signed-32>> = <<xid_l - xid_r::unsigned-32>>

# If signed_diff is a negative number, xid_l precedes xid_r.
cmp(signed_diff, 0)
end

defp reverse_cmp_result(:lt), do: :gt
defp reverse_cmp_result(:gt), do: :lt
defp cmp(a, b) when a == b, do: :eq
defp cmp(a, b) when a < b, do: :lt
defp cmp(a, b) when a > b, do: :gt
end
25 changes: 11 additions & 14 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ defmodule Electric.Shapes.Consumer do
restart: :temporary,
significant: true

import Electric.Postgres.Xid, only: [xid_lt_xid8: 2]

alias Electric.LogItems
alias Electric.Postgres.Inspector
alias Electric.Replication.Changes
Expand Down Expand Up @@ -243,19 +241,18 @@ defmodule Electric.Shapes.Consumer do
end
end

defp handle_txn(%Transaction{xid: xid}, %{snapshot_xmin: xmin} = state)
when xid_lt_xid8(xid, xmin) do
{:cont, state}
end

defp handle_txn(%Transaction{} = txn, state) do
ot_attrs =
[xid: txn.xid, total_num_changes: txn.num_changes] ++
shape_attrs(state.shape_handle, state.shape)
defp handle_txn(%Transaction{xid: xid} = txn, %{snapshot_xmin: xmin} = state) do
if Electric.Postgres.Xid.compare(xid, xmin) == :lt do
{:cont, state}
else
ot_attrs =
[xid: txn.xid, total_num_changes: txn.num_changes] ++
shape_attrs(state.shape_handle, state.shape)

OpenTelemetry.with_span("shape_write.consumer.handle_txn", ot_attrs, state.stack_id, fn ->
do_handle_txn(txn, state)
end)
OpenTelemetry.with_span("shape_write.consumer.handle_txn", ot_attrs, state.stack_id, fn ->
do_handle_txn(txn, state)
end)
end
end

defp do_handle_txn(%Transaction{} = txn, state) do
Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ defmodule Electric.MixProject do
{:dialyxir, "~> 1.4", only: [:test], runtime: false},
{:excoveralls, "~> 0.18", only: [:test], runtime: false},
{:mox, "~> 1.1", only: [:test]},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false}
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:stream_data, "~> 1.0", only: [:test]}
]
end

Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"},
"sentry": {:hex, :sentry, "10.8.0", "1e8cc0ef21401e5914e6fc2f37489d6c685d31a0556dbd8ab4709cc1587a7232", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "92549e7ba776b7ccfed4e74d58987272d37d99606b130e4141bc015a1a8e4235"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"stream_data": {:hex, :stream_data, "1.1.3", "15fdb14c64e84437901258bb56fc7d80aaf6ceaf85b9324f359e219241353bfb", [:mix], [], "hexpm", "859eb2be72d74be26c1c4f272905667672a52e44f743839c57c7ee73a1a66420"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
"telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"},
"telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.2.1", "c9755987d7b959b557084e6990990cb96a50d6482c683fb9622a63837f3cd3d8", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5e2c599da4983c4f88a33e9571f1458bf98b0cf6ba930f1dc3a6e8cf45d5afb6"},
Expand Down
49 changes: 49 additions & 0 deletions packages/sync-service/test/electric/postgres/xid_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Electric.Postgres.XidTest do
use ExUnit.Case, async: true
use ExUnitProperties

import Electric.Postgres.Xid

doctest Electric.Postgres.Xid, import: true

@uint32_max 0xFFFFFFFF
@uint64_max 0xFFFFFFFFFFFFFFFF

# 2^31
@half_modulo 0x80000000

property "compare/2 compares 64-bit xids like ordinary integers" do
check all xid_l <- xid64_gen(), xid_r <- xid64_gen(), max_runs: 1_000_000, max_run_time: 600 do
diff = xid_l - xid_r

cond do
diff == 0 -> assert compare(xid_l, xid_r) == :eq
diff < 0 -> assert compare(xid_l, xid_r) == :lt
diff > 0 -> assert compare(xid_l, xid_r) == :gt
end
end
end

property "compare/2 treats larger xids as preceding the smaller ones when the difference is > ~2 billion" do
check all xid1 <- xid32_gen(),
xid2 <- StreamData.one_of([xid32_gen(), xid64_gen()]),
# Randomize the order of arguments passed to `compare/2`.
[xid_l, xid_r] <- StreamData.constant(Enum.shuffle([xid1, xid2])),
max_runs: 1_000_000,
max_run_time: 600 do
# Truncate the 64-bit xid to 32 bits to calculate the correct difference with the 32-bit xid.
<<diff::signed-32>> = <<xid_l - xid_r::signed-32>>

cond do
diff == 0 -> assert compare(xid_l, xid_r) == :eq
diff > @half_modulo -> assert compare(xid_l, xid_r) == :lt
diff < -@half_modulo -> assert compare(xid_l, xid_r) == :gt
diff < 0 and diff >= -@half_modulo -> assert compare(xid_l, xid_r) == :lt
diff > 0 and diff <= @half_modulo -> assert compare(xid_l, xid_r) == :gt
end
end
end

defp xid32_gen, do: StreamData.integer(1..@uint32_max)
defp xid64_gen, do: StreamData.integer((@uint32_max + 1)..@uint64_max)
end

0 comments on commit d22e363

Please sign in to comment.