Skip to content

Commit

Permalink
fix(interactive): snapshot id may lag behind in some scenarios (#3576)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored Feb 26, 2024
1 parent 76ac755 commit ca87fdb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ public void onFailure(Exception e) {
}

public List<Long> replayWALFrom(long offset, long timestamp) throws IOException {
List<Long> allIds = new ArrayList<>();
return kafkaAppender.replayDMLRecordsFrom(offset, timestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio
if (batchSnapshotId == -1L) {
throw new IllegalStateException("invalid ingestSnapshotId [" + batchSnapshotId + "]");
}
logger.debug(
"append batch to WAL. requestId [{}], snapshotId [{}]",
task.requestId,
batchSnapshotId);
logger.debug("append batch to WAL. snapshotId [{}]", batchSnapshotId);
long latestSnapshotId = task.operationBatch.getLatestSnapshotId();
if (latestSnapshotId > 0 && latestSnapshotId < batchSnapshotId) {
throw new IllegalStateException(
Expand All @@ -163,7 +160,7 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio
int storeId = entry.getKey();
OperationBatch batch = entry.getValue().build();
// logger.info("Log writer append partitionId [{}]", storeId);
logWriter.append(storeId, new LogEntry(batchSnapshotId, batch));
logWriter.append(storeId, new LogEntry(ingestSnapshotId.get(), batch));
}
} catch (Exception e) {
// write failed, just throw out to fail this task
Expand Down Expand Up @@ -249,8 +246,7 @@ public List<Long> replayDMLRecordsFrom(long offset, long timestamp) throws IOExc
if (batch.getOperationCount() == 0) {
continue;
}
batchSnapshotId = this.ingestSnapshotId.get();
logWriter.appendAsync(storeId, new LogEntry(batchSnapshotId, batch));
logWriter.append(storeId, new LogEntry(ingestSnapshotId.get(), batch));
replayCount++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ public StoreDataBatch poll() throws InterruptedException {
}
} else {
logger.warn(
"Illegal entry polled from queue [{}]. entrySnapshotId [{}] <"
+ " currentSnapshotId [{}]. Ignored entry",
currentPollQueueIdx,
"Illegal entry polled from queue, entrySnapshotId [{}] <"
+ " currentSnapshotId [{}]. Ignored entry {}",
snapshotId,
currentPollSnapshotId);
currentPollSnapshotId,
entry.toProto());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class WriterAgent implements MetricsAgent {

private volatile boolean shouldStop = true;
private SnapshotSortQueue bufferQueue;
private volatile long lastCommitSnapshotId;
private volatile long consumeSnapshotId;
private volatile long lastCommitSI;
private volatile long consumeSI;
private volatile long consumeDdlSnapshotId;
private final AtomicReference<SnapshotInfo> availSnapshotInfoRef;
private ExecutorService commitExecutor;
Expand Down Expand Up @@ -97,8 +97,8 @@ public void init(long availSnapshotId) {
}

public void start() {
this.lastCommitSnapshotId = -1L;
this.consumeSnapshotId = 0L;
this.lastCommitSI = -1L;
this.consumeSI = 0L;
this.consumeDdlSnapshotId = 0L;

this.shouldStop = false;
Expand Down Expand Up @@ -187,23 +187,24 @@ private void processBatches() {
if (batch == null) {
continue;
}
long batchSnapshotId = batch.getSnapshotId();
logger.debug("polled one batch [" + batchSnapshotId + "]");
long batchSI = batch.getSnapshotId();
logger.debug("polled one batch [" + batchSI + "]");
boolean hasDdl = writeEngineWithRetry(batch);
this.totalWrite += batch.getSize();
if (this.consumeSnapshotId < batchSnapshotId) {
if (this.consumeSI < batchSI) {
SnapshotInfo availSInfo = this.availSnapshotInfoRef.get();
long availSI = Math.max(availSInfo.getSnapshotId(), batchSnapshotId - 1);
long availSI = Math.max(availSInfo.getSnapshotId(), batchSI - 1);
long availDdlSI = Math.max(availSInfo.getDdlSnapshotId(), consumeDdlSnapshotId);
this.consumeSnapshotId = batchSnapshotId;
this.consumeSI = batchSI;
this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI));
this.commitExecutor.execute(this::asyncCommit);
} else {
logger.warn("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI);
}
if (hasDdl) {
this.consumeDdlSnapshotId = batchSnapshotId;
this.consumeDdlSnapshotId = batchSI;
}
// this.consumedQueueOffsets.set(batch.getQueueId(),
// batch.getOffset());
// this.consumedQueueOffsets.set(batch.getQueueId(), batch.getOffset());
this.consumedQueueOffsets.set(0, batch.getOffset());
} catch (InterruptedException e) {
logger.error("processBatches interrupted");
Expand All @@ -215,22 +216,20 @@ private void processBatches() {

private void asyncCommit() {
SnapshotInfo snapshotInfo = this.availSnapshotInfoRef.get();
long availSnapshotId = snapshotInfo.getSnapshotId();
if (availSnapshotId > this.lastCommitSnapshotId) {
long curSI = snapshotInfo.getSnapshotId();
if (curSI > this.lastCommitSI) {
long ddlSnapshotId = snapshotInfo.getDdlSnapshotId();
List<Long> queueOffsets = new ArrayList<>(this.consumedQueueOffsets);
try {
// logger.info("commit SI {}, last DDL SI {}", availSnapshotId, ddlSnapshotId);
this.snapshotCommitter.commitSnapshotId(
storeId, availSnapshotId, ddlSnapshotId, queueOffsets);
this.lastCommitSnapshotId = availSnapshotId;
storeId, curSI, ddlSnapshotId, queueOffsets);
this.lastCommitSI = curSI;
} catch (Exception e) {
logger.warn(
"commit failed. SI {}, offset {}. ignored",
availSnapshotId,
queueOffsets,
e);
logger.warn("commit failed. SI {}, offset {}. ignored", curSI, queueOffsets, e);
}
} else {
logger.warn("curSI {} <= lastCommitSI {}, ignored", curSI, lastCommitSI);
}
}

Expand Down

0 comments on commit ca87fdb

Please sign in to comment.