diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index 267c6e3e49fd..909769ac7f07 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -196,7 +196,6 @@ public void onFailure(Exception e) { } public List replayWALFrom(long offset, long timestamp) throws IOException { - List allIds = new ArrayList<>(); return kafkaAppender.replayDMLRecordsFrom(offset, timestamp); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java index 28499a7390c9..beb9391f3146 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/KafkaAppender.java @@ -143,10 +143,7 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio if (batchSnapshotId == -1L) { throw new IllegalStateException("invalid ingestSnapshotId [" + batchSnapshotId + "]"); } - logger.debug( - "append batch to WAL. requestId [{}], snapshotId [{}]", - task.requestId, - batchSnapshotId); + logger.debug("append batch to WAL. snapshotId [{}]", batchSnapshotId); long latestSnapshotId = task.operationBatch.getLatestSnapshotId(); if (latestSnapshotId > 0 && latestSnapshotId < batchSnapshotId) { throw new IllegalStateException( @@ -163,7 +160,7 @@ private long processTask(LogWriter logWriter, IngestTask task) throws IOExceptio int storeId = entry.getKey(); OperationBatch batch = entry.getValue().build(); // logger.info("Log writer append partitionId [{}]", storeId); - logWriter.append(storeId, new LogEntry(batchSnapshotId, batch)); + logWriter.append(storeId, new LogEntry(ingestSnapshotId.get(), batch)); } } catch (Exception e) { // write failed, just throw out to fail this task @@ -249,8 +246,7 @@ public List replayDMLRecordsFrom(long offset, long timestamp) throws IOExc if (batch.getOperationCount() == 0) { continue; } - batchSnapshotId = this.ingestSnapshotId.get(); - logWriter.appendAsync(storeId, new LogEntry(batchSnapshotId, batch)); + logWriter.append(storeId, new LogEntry(ingestSnapshotId.get(), batch)); replayCount++; } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java index 408316a39784..bb3a48961f38 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/SnapshotSortQueue.java @@ -120,11 +120,11 @@ public StoreDataBatch poll() throws InterruptedException { } } else { logger.warn( - "Illegal entry polled from queue [{}]. entrySnapshotId [{}] <" - + " currentSnapshotId [{}]. Ignored entry", - currentPollQueueIdx, + "Illegal entry polled from queue, entrySnapshotId [{}] <" + + " currentSnapshotId [{}]. Ignored entry {}", snapshotId, - currentPollSnapshotId); + currentPollSnapshotId, + entry.toProto()); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index 2454021ba908..b869914961bc 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -56,8 +56,8 @@ public class WriterAgent implements MetricsAgent { private volatile boolean shouldStop = true; private SnapshotSortQueue bufferQueue; - private volatile long lastCommitSnapshotId; - private volatile long consumeSnapshotId; + private volatile long lastCommitSI; + private volatile long consumeSI; private volatile long consumeDdlSnapshotId; private final AtomicReference availSnapshotInfoRef; private ExecutorService commitExecutor; @@ -97,8 +97,8 @@ public void init(long availSnapshotId) { } public void start() { - this.lastCommitSnapshotId = -1L; - this.consumeSnapshotId = 0L; + this.lastCommitSI = -1L; + this.consumeSI = 0L; this.consumeDdlSnapshotId = 0L; this.shouldStop = false; @@ -187,23 +187,24 @@ private void processBatches() { if (batch == null) { continue; } - long batchSnapshotId = batch.getSnapshotId(); - logger.debug("polled one batch [" + batchSnapshotId + "]"); + long batchSI = batch.getSnapshotId(); + logger.debug("polled one batch [" + batchSI + "]"); boolean hasDdl = writeEngineWithRetry(batch); this.totalWrite += batch.getSize(); - if (this.consumeSnapshotId < batchSnapshotId) { + if (this.consumeSI < batchSI) { SnapshotInfo availSInfo = this.availSnapshotInfoRef.get(); - long availSI = Math.max(availSInfo.getSnapshotId(), batchSnapshotId - 1); + long availSI = Math.max(availSInfo.getSnapshotId(), batchSI - 1); long availDdlSI = Math.max(availSInfo.getDdlSnapshotId(), consumeDdlSnapshotId); - this.consumeSnapshotId = batchSnapshotId; + this.consumeSI = batchSI; this.availSnapshotInfoRef.set(new SnapshotInfo(availSI, availDdlSI)); this.commitExecutor.execute(this::asyncCommit); + } else { + logger.warn("consumedSI {} >= batchSI {}, ignored", consumeSI, batchSI); } if (hasDdl) { - this.consumeDdlSnapshotId = batchSnapshotId; + this.consumeDdlSnapshotId = batchSI; } - // this.consumedQueueOffsets.set(batch.getQueueId(), - // batch.getOffset()); + // this.consumedQueueOffsets.set(batch.getQueueId(), batch.getOffset()); this.consumedQueueOffsets.set(0, batch.getOffset()); } catch (InterruptedException e) { logger.error("processBatches interrupted"); @@ -215,22 +216,20 @@ private void processBatches() { private void asyncCommit() { SnapshotInfo snapshotInfo = this.availSnapshotInfoRef.get(); - long availSnapshotId = snapshotInfo.getSnapshotId(); - if (availSnapshotId > this.lastCommitSnapshotId) { + long curSI = snapshotInfo.getSnapshotId(); + if (curSI > this.lastCommitSI) { long ddlSnapshotId = snapshotInfo.getDdlSnapshotId(); List queueOffsets = new ArrayList<>(this.consumedQueueOffsets); try { // logger.info("commit SI {}, last DDL SI {}", availSnapshotId, ddlSnapshotId); this.snapshotCommitter.commitSnapshotId( - storeId, availSnapshotId, ddlSnapshotId, queueOffsets); - this.lastCommitSnapshotId = availSnapshotId; + storeId, curSI, ddlSnapshotId, queueOffsets); + this.lastCommitSI = curSI; } catch (Exception e) { - logger.warn( - "commit failed. SI {}, offset {}. ignored", - availSnapshotId, - queueOffsets, - e); + logger.warn("commit failed. SI {}, offset {}. ignored", curSI, queueOffsets, e); } + } else { + logger.warn("curSI {} <= lastCommitSI {}, ignored", curSI, lastCommitSI); } }