Skip to content

Commit

Permalink
bugfix: EPQ recheck does not work with ExecScan()
Browse files Browse the repository at this point in the history
ExecScan() fetches base-table's tuple if Scan->scanrelid > 0, and
CustomScan is expected to return a base-tuple, however, it does not
fit our CPU fallback operations (that returns a tuple-image after
the device projection).
So, it leads incompatible slot references, then crash.

issue reported at #690
  • Loading branch information
kaigai committed Dec 11, 2023
1 parent 7efa303 commit 2991ec0
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 38 deletions.
135 changes: 121 additions & 14 deletions src/executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ ExecFallbackRowDataStore(pgstromTaskState *pts,
ItemPointerCopy(&tupitem->htup.t_ctid, &tuple.t_self);
tuple.t_tableOid = kds->table_oid;
tuple.t_data = &tupitem->htup;
pts->cb_cpu_fallback(pts, kds, &tuple);
pts->cb_cpu_fallback(pts, &tuple);
}
}

Expand Down Expand Up @@ -1140,7 +1140,7 @@ ExecFallbackBlockDataStore(pgstromTaskState *pts,
tuple.t_tableOid = kds->table_oid;
tuple.t_data = (HeapTupleHeader)PageGetItem((Page)pg_page, lpp);

pts->cb_cpu_fallback(pts, kds, &tuple);
pts->cb_cpu_fallback(pts, &tuple);
}
}
}
Expand All @@ -1161,7 +1161,7 @@ ExecFallbackColumnDataStore(pgstromTaskState *pts,
bool should_free;

tuple = ExecFetchSlotHeapTuple(pts->base_slot, false, &should_free);
pts->cb_cpu_fallback(pts, kds, tuple);
pts->cb_cpu_fallback(pts, tuple);
if (should_free)
pfree(tuple);
}
Expand All @@ -1185,7 +1185,7 @@ ExecFallbackArrowDataStore(pgstromTaskState *pts,
break;

tuple = ExecFetchSlotHeapTuple(pts->base_slot, false, &should_free);
pts->cb_cpu_fallback(pts, kds, tuple);
pts->cb_cpu_fallback(pts, tuple);
if (should_free)
pfree(tuple);
}
Expand Down Expand Up @@ -1697,14 +1697,83 @@ pgstromExecScanAccess(pgstromTaskState *pts)
/*
* pgstromExecScanReCheck
*/
static bool
pgstromExecScanReCheck(pgstromTaskState *pts, TupleTableSlot *epq_slot)
static TupleTableSlot *
pgstromExecScanReCheck(pgstromTaskState *pts, EPQState *epqstate)
{
/*
* NOTE: Only immutable operators/functions are executable
* on the GPU devices, so its decision will never changed.
*/
return true;
Index scanrelid = ((Scan *)pts->css.ss.ps.plan)->scanrelid;

/* see ExecScanFetch */
if (scanrelid == 0)
{
elog(ERROR, "Bug? CustomScan(%s) has scanrelid==0",
pts->css.methods->CustomName);
}
else if (epqstate->relsubs_done[scanrelid-1])
{
return NULL;
}
else if (epqstate->relsubs_slot[scanrelid-1])
{
TupleTableSlot *ss_slot = pts->css.ss.ss_ScanTupleSlot;
TupleTableSlot *epq_slot = epqstate->relsubs_slot[scanrelid-1];
size_t fallback_index_saved = pts->fallback_index;
size_t fallback_usage_saved = pts->fallback_usage;

Assert(epqstate->relsubs_rowmark[scanrelid - 1] == NULL);
/* Mark to remember that we shouldn't return it again */
epqstate->relsubs_done[scanrelid - 1] = true;

/* Return empty slot if we haven't got a test tuple */
if (TupIsNull(epq_slot))
ExecClearTuple(ss_slot);
else
{
HeapTuple epq_tuple;
bool should_free;
#if 0
slot_getallattrs(epq_slot);
for (int j=0; j < epq_slot->tts_nvalid; j++)
{
elog(INFO, "epq_slot[%d] isnull=%s values=0x%lx", j,
epq_slot->tts_isnull[j] ? "true" : "false",
epq_slot->tts_values[j]);
}
#endif
epq_tuple = ExecFetchSlotHeapTuple(epq_slot, false,
&should_free);
if (pts->cb_cpu_fallback(pts, epq_tuple) &&
pts->fallback_tuples != NULL &&
pts->fallback_buffer != NULL &&
pts->fallback_nitems > fallback_index_saved)
{
HeapTupleData htup;
kern_tupitem *titem = (kern_tupitem *)
(pts->fallback_buffer +
pts->fallback_tuples[fallback_index_saved]);

htup.t_len = titem->t_len;
htup.t_data = &titem->htup;
ss_slot = pts->css.ss.ss_ScanTupleSlot;
ExecForceStoreHeapTuple(&htup, ss_slot, false);
}
else
{
ExecClearTuple(ss_slot);
}
/* release fallback tuple & buffer */
if (should_free)
pfree(epq_tuple);
pts->fallback_index = fallback_index_saved;
pts->fallback_usage = fallback_usage_saved;
}
return ss_slot;
}
else if (epqstate->relsubs_rowmark[scanrelid-1])
{
elog(ERROR, "RowMark on CustomScan(%s) is not implemented yet",
pts->css.methods->CustomName);
}
return pgstromExecScanAccess(pts);
}

/*
Expand Down Expand Up @@ -1762,16 +1831,54 @@ TupleTableSlot *
pgstromExecTaskState(CustomScanState *node)
{
pgstromTaskState *pts = (pgstromTaskState *)node;
EState *estate = pts->css.ss.ps.state;
ExprState *host_quals = node->ss.ps.qual;
ExprContext *econtext = node->ss.ps.ps_ExprContext;
ProjectionInfo *proj_info = node->ss.ps.ps_ProjInfo;
TupleTableSlot *slot;

if (!pts->conn)
{
if (!__pgstromExecTaskOpenConnection(pts))
return NULL;
Assert(pts->conn);
}
return ExecScan(&pts->css.ss,
(ExecScanAccessMtd) pgstromExecScanAccess,
(ExecScanRecheckMtd) pgstromExecScanReCheck);

/*
* see, ExecScan() - it assumes CustomScan with scanrelid > 0 returns
* tuples identical with table definition, thus these tuples are not
* suitable for input of ExecProjection().
*/
if (estate->es_epq_active)
{
slot = pgstromExecScanReCheck(pts, estate->es_epq_active);
if (TupIsNull(slot))
return NULL;
ResetExprContext(econtext);
econtext->ecxt_scantuple = slot;
if (proj_info)
return ExecProject(proj_info);
return slot;
}

for (;;)
{
slot = pgstromExecScanAccess(pts);
if (TupIsNull(slot))
break;
/* check whether the current tuple satisfies the qual-clause */
ResetExprContext(econtext);
econtext->ecxt_scantuple = slot;

if (!host_quals || ExecQual(host_quals, econtext))
{
if (proj_info)
return ExecProject(proj_info);
return slot;
}
InstrCountFiltered1(pts, 1);
}
return NULL;
}

/*
Expand Down
12 changes: 6 additions & 6 deletions src/gpu_join.c
Original file line number Diff line number Diff line change
Expand Up @@ -2254,15 +2254,14 @@ __execFallbackCpuJoinOneDepth(pgstromTaskState *pts, int depth)
}
}

void
ExecFallbackCpuJoin(pgstromTaskState *pts,
kern_data_store *kds,
HeapTuple tuple)
bool
ExecFallbackCpuJoin(pgstromTaskState *pts, HeapTuple tuple)
{
pgstromPlanInfo *pp_info = pts->pp_info;
ExprContext *econtext = pts->css.ss.ps.ps_ExprContext;
TupleTableSlot *base_slot = pts->base_slot;
TupleTableSlot *fallback_slot = pts->fallback_slot;
size_t fallback_index_saved = pts->fallback_index;
ListCell *lc;

ExecForceStoreHeapTuple(tuple, base_slot, false);
Expand All @@ -2272,7 +2271,7 @@ ExecFallbackCpuJoin(pgstromTaskState *pts,
{
ResetExprContext(econtext);
if (!ExecQual(pts->base_quals, econtext))
return;
return 0;
}

/*
Expand All @@ -2292,7 +2291,7 @@ ExecFallbackCpuJoin(pgstromTaskState *pts,
pgstromStoreFallbackTuple(pts, proj_htup);
if (should_free)
pfree(proj_htup);
return;
return 1;
}

/* Load the base tuple (depth-0) to the fallback slot */
Expand All @@ -2318,6 +2317,7 @@ ExecFallbackCpuJoin(pgstromTaskState *pts,
/* Run JOIN, if any */
Assert(pts->h_kmrels);
__execFallbackCpuJoinOneDepth(pts, 1);
return (pts->fallback_index - fallback_index_saved > 0);
}

static void
Expand Down
7 changes: 3 additions & 4 deletions src/gpu_preagg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1713,16 +1713,15 @@ CreateDpuPreAggScanState(CustomScan *cscan)
/*
* ExecFallbackCpuPreAgg
*/
void
ExecFallbackCpuPreAgg(pgstromTaskState *pts,
kern_data_store *kds,
HeapTuple tuple)
bool
ExecFallbackCpuPreAgg(pgstromTaskState *pts, HeapTuple tuple)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CPU Fallback of GpuPreAgg is not implemented yet"),
errhint("'pg_strom.enable_gpupreagg' configuration can turn off GpuPreAgg only"),
errdetail("GpuPreAgg often detects uncontinuable errors during update of the final aggregation buffer. Even if we re-run the source data chunk, the final aggregation buffer is already polluted, so we have no reasonable way to recover right now.")));
return false;
}

/*
Expand Down
9 changes: 4 additions & 5 deletions src/gpu_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,8 @@ CreateDpuScanState(CustomScan *cscan)
/*
* ExecFallbackCpuScan
*/
void
ExecFallbackCpuScan(pgstromTaskState *pts,
kern_data_store *kds,
HeapTuple tuple)
bool
ExecFallbackCpuScan(pgstromTaskState *pts, HeapTuple tuple)
{
ExprContext *econtext = pts->css.ss.ps.ps_ExprContext;
bool should_free;
Expand All @@ -838,7 +836,7 @@ ExecFallbackCpuScan(pgstromTaskState *pts,
{
ResetExprContext(econtext);
if (!ExecQual(pts->base_quals, econtext))
return;
return false;
}
Assert(!pts->fallback_slot);

Expand All @@ -857,6 +855,7 @@ ExecFallbackCpuScan(pgstromTaskState *pts,
pgstromStoreFallbackTuple(pts, tuple);
if (should_free)
pfree(tuple);
return true;
}

/*
Expand Down
12 changes: 4 additions & 8 deletions src/pg_strom.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,7 @@ struct pgstromTaskState
XpuCommand *(*cb_final_chunk)(struct pgstromTaskState *pts,
kern_final_task *fin,
struct iovec *xcmd_iov, int *xcmd_iovcnt);
void (*cb_cpu_fallback)(struct pgstromTaskState *pts,
struct kern_data_store *kds,
bool (*cb_cpu_fallback)(struct pgstromTaskState *pts,
HeapTuple htuple);
/* inner relations state (if JOIN) */
int num_rels;
Expand Down Expand Up @@ -819,8 +818,7 @@ extern pgstromPlanInfo *buildOuterScanPlanInfo(PlannerInfo *root,
bool allow_host_quals,
bool allow_no_device_quals,
ParamPathInfo **p_param_info);
extern void ExecFallbackCpuScan(pgstromTaskState *pts,
kern_data_store *kds,
extern bool ExecFallbackCpuScan(pgstromTaskState *pts,
HeapTuple tuple);
extern void gpuservHandleGpuScanExec(gpuClient *gclient, XpuCommand *xcmd);
extern void pgstrom_init_gpu_scan(void);
Expand All @@ -844,8 +842,7 @@ extern CustomScan *PlanXpuJoinPathCommon(PlannerInfo *root,
pgstromPlanInfo *pp_info,
const CustomScanMethods *methods);
extern uint32_t GpuJoinInnerPreload(pgstromTaskState *pts);
extern void ExecFallbackCpuJoin(pgstromTaskState *pts,
kern_data_store *kds,
extern bool ExecFallbackCpuJoin(pgstromTaskState *pts,
HeapTuple tuple);
extern void ExecFallbackCpuJoinRightOuter(pgstromTaskState *pts);
extern void ExecFallbackCpuJoinOuterJoinMap(pgstromTaskState *pts,
Expand All @@ -863,8 +860,7 @@ extern void xpupreagg_add_custompath(PlannerInfo *root,
void *extra,
uint32_t task_kind,
const CustomPathMethods *methods);
extern void ExecFallbackCpuPreAgg(pgstromTaskState *pts,
kern_data_store *kds,
extern bool ExecFallbackCpuPreAgg(pgstromTaskState *pts,
HeapTuple tuple);
extern void pgstrom_init_gpu_preagg(void);
extern void pgstrom_init_dpu_preagg(void);
Expand Down
2 changes: 1 addition & 1 deletion src/relscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ __relScanDirectFallbackBlock(pgstromTaskState *pts,
HeapCheckForSerializableConflictOut(valid, relation, &htup,
buffer, snapshot);
if (valid)
pts->cb_cpu_fallback(pts, kds, &htup);
pts->cb_cpu_fallback(pts, &htup);
}
UnlockReleaseBuffer(buffer);
pg_atomic_fetch_add_u64(&ps_state->npages_buffer_read, PAGES_PER_BLOCK);
Expand Down

0 comments on commit 2991ec0

Please sign in to comment.