Skip to content

Commit

Permalink
Merge pull request #113 from fjall-rs/fix/journal-truncation
Browse files Browse the repository at this point in the history
Fix: journal truncation
  • Loading branch information
marvin-j97 authored Dec 19, 2024
2 parents c186528 + 182be17 commit 4e19e24
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 111 deletions.
4 changes: 2 additions & 2 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ impl Batch {
}
}

drop(journal_writer);

// NOTE: Fully (write) lock, so the batch can be committed atomically
log::trace!("batch: Acquiring partitions lock");
let partitions = self.keyspace.partitions.write().expect("lock is poisoned");
Expand Down Expand Up @@ -174,6 +172,8 @@ impl Batch {
partitions_with_possible_stall.insert(partition.clone());
}

drop(journal_writer);

drop(locked_memtables);
drop(partitions);

Expand Down
42 changes: 2 additions & 40 deletions src/journal/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

use super::writer::Writer;
use crate::PartitionHandle;
use lsm_tree::{AbstractTree, Memtable, SeqNo};
use std::{
path::PathBuf,
sync::{Arc, MutexGuard},
};
use lsm_tree::{AbstractTree, SeqNo};
use std::{path::PathBuf, sync::MutexGuard};

/// Stores the highest seqno of a partition found in a journal.
#[derive(Clone)]
Expand Down Expand Up @@ -100,41 +97,6 @@ impl JournalManager {
self.disk_space_in_bytes
}

/// Rotates & lists partitions to be flushed so that the oldest journal can be safely evicted
pub(crate) fn rotate_partitions_to_flush_for_oldest_journal_eviction(
&self,
) -> Vec<(PartitionHandle, EvictionWatermark, u64, Arc<Memtable>)> {
let mut v = Vec::new();

if let Some(item) = self.items.first() {
for item in &item.watermarks {
let highest_persisted_seqno = item.partition.tree.get_highest_persisted_seqno();

if highest_persisted_seqno.is_none()
|| highest_persisted_seqno.expect("unwrap") < item.lsn
{
if let Some((yanked_id, yanked_memtable)) =
item.partition.tree.rotate_memtable()
{
v.push((
item.partition.clone(),
EvictionWatermark {
partition: item.partition.clone(),
lsn: yanked_memtable
.get_highest_seqno()
.expect("memtable should not be empty"),
},
yanked_id,
yanked_memtable,
));
}
}
}
}

v
}

/// Performs maintenance, maybe deleting some old journals
pub(crate) fn maintenance(&mut self) -> crate::Result<()> {
loop {
Expand Down
99 changes: 34 additions & 65 deletions src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,12 @@
// (found in the LICENSE-* files in the repository)

use crate::{
config::Config as KeyspaceConfig,
flush::manager::{FlushManager, Task as FlushTask},
journal::{manager::JournalManager, Journal},
keyspace::Partitions,
snapshot_tracker::SnapshotTracker,
write_buffer_manager::WriteBufferManager,
Keyspace,
config::Config as KeyspaceConfig, flush::manager::FlushManager,
journal::manager::JournalManager, keyspace::Partitions, snapshot_tracker::SnapshotTracker,
write_buffer_manager::WriteBufferManager, Keyspace,
};
use lsm_tree::{AbstractTree, SequenceNumberCounter};
use std::sync::{Arc, RwLock};
use std_semaphore::Semaphore;
use std::sync::{atomic::AtomicBool, Arc, RwLock};

/// Monitors write buffer size & journal size
pub struct Monitor {
Expand All @@ -22,10 +17,9 @@ pub struct Monitor {
pub(crate) journal_manager: Arc<RwLock<JournalManager>>,
pub(crate) write_buffer_manager: WriteBufferManager,
pub(crate) partitions: Arc<RwLock<Partitions>>,
pub(crate) journal: Arc<Journal>,
pub(crate) flush_semaphore: Arc<Semaphore>,
pub(crate) seqno: SequenceNumberCounter,
pub(crate) snapshot_tracker: SnapshotTracker,
pub(crate) keyspace_poison: Arc<AtomicBool>,
}

impl Drop for Monitor {
Expand All @@ -48,10 +42,9 @@ impl Monitor {
keyspace_config: keyspace.config.clone(),
write_buffer_manager: keyspace.write_buffer_manager.clone(),
partitions: keyspace.partitions.clone(),
journal: keyspace.journal.clone(),
flush_semaphore: keyspace.flush_semaphore.clone(),
seqno: keyspace.seqno.clone(),
snapshot_tracker: keyspace.snapshot_tracker.clone(),
keyspace_poison: keyspace.is_poisoned.clone(),
}
}

Expand All @@ -60,70 +53,43 @@ impl Monitor {
"monitor: try flushing affected partitions because journals have passed 50% of threshold"
);

let mut journal_writer = self.journal.get_writer();
let mut journal_manager = self.journal_manager.write().expect("lock is poisoned");
let partitions = self.partitions.read().expect("lock is poisoned");

let seqno_map = journal_manager.rotate_partitions_to_flush_for_oldest_journal_eviction();

if seqno_map.is_empty() {
self.flush_semaphore.release();
// TODO: this may not scale well for many partitions
let lowest_persisted_partition = partitions
.values()
.min_by(|a, b| {
a.tree
.get_highest_persisted_seqno()
.cmp(&b.tree.get_highest_persisted_seqno())
})
.cloned();

if let Err(e) = journal_manager.maintenance() {
log::error!("journal GC failed: {e:?}");
};
} else {
log::debug!(
"monitor: need to flush {} partitions to evict oldest journal",
seqno_map.len()
);
drop(partitions);

if let Some(lowest_persisted_partition) = lowest_persisted_partition {
let partitions_names_with_queued_tasks = self
.flush_manager
.read()
.expect("lock is poisoned")
.get_partitions_with_tasks();

let actual_seqno_map = seqno_map
.iter()
.map(|(_, seqno, _, _)| seqno)
.cloned()
.collect::<Vec<_>>();

#[allow(clippy::collapsible_if)]
if actual_seqno_map
.iter()
.any(|x| !partitions_names_with_queued_tasks.contains(&x.partition.name))
{
if journal_manager
.rotate_journal(&mut journal_writer, actual_seqno_map)
.is_ok()
{
let mut flush_manager = self.flush_manager.write().expect("lock is poisoned");

for (partition, _, yanked_id, yanked_memtable) in seqno_map {
flush_manager.enqueue_task(
partition.name.clone(),
FlushTask {
id: yanked_id,
partition,
sealed_memtable: yanked_memtable,
},
);
}

self.flush_semaphore.release();
if partitions_names_with_queued_tasks.contains(&lowest_persisted_partition.name) {
return;
}

drop(flush_manager);
drop(journal_manager);
drop(journal_writer);
}
} else {
self.flush_semaphore.release();
match lowest_persisted_partition.rotate_memtable() {
Ok(_) => {}
Err(e) => {
log::error!(
"monitor: memtable rotation failed for {:?}: {e:?}",
lowest_persisted_partition.name
);

if let Err(e) = journal_manager.maintenance() {
log::error!("journal GC failed: {e:?}");
self.keyspace_poison
.store(true, std::sync::atomic::Ordering::Release);
}
}
};
}
}

Expand Down Expand Up @@ -170,6 +136,9 @@ impl Monitor {
"monitor: memtable rotation failed for {:?}: {e:?}",
partition.name
);

self.keyspace_poison
.store(true, std::sync::atomic::Ordering::Release);
}
};
}
Expand Down
8 changes: 4 additions & 4 deletions src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,10 +893,10 @@ impl PartitionHandle {
})?;
}

drop(journal_writer);

let (item_size, memtable_size) = self.tree.insert(key, value, seqno);

drop(journal_writer);

let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));

self.check_memtable_overflow(memtable_size)?;
Expand Down Expand Up @@ -967,10 +967,10 @@ impl PartitionHandle {
})?;
}

drop(journal_writer);

let (item_size, memtable_size) = self.tree.remove(key, seqno);

drop(journal_writer);

let write_buffer_size = self.write_buffer_manager.allocate(u64::from(item_size));

self.check_memtable_overflow(memtable_size)?;
Expand Down

0 comments on commit 4e19e24

Please sign in to comment.