Skip to content

Commit

Permalink
Support persistent cache.
Browse files Browse the repository at this point in the history
Fixes issue s3ql#17.
  • Loading branch information
Nikratio committed Nov 6, 2018
1 parent 7dac0a6 commit 98557ef
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 58 deletions.
6 changes: 6 additions & 0 deletions Changes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ UNRELEASED CHANGES
* Fixed occasional crashes with a "dugong.StateError" exception.
Thanks to Roger Gammans for extensive testing!

* mount.s3ql and fsck.s3ql got a new `--keep-cache` option. If this
is specified, locally cached data will not be removed on unmount
(or after fsck) so that it can be re-used if the file system is
mounted again on the same system without having been mounted
elsewhere.

2018-10-08, S3QL 2.31

* Object removal now makes more use of parallelism.
Expand Down
50 changes: 40 additions & 10 deletions src/s3ql/block_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import shutil
import threading
import time
import re
import sys

# standard logger for this module
Expand Down Expand Up @@ -161,7 +162,7 @@ def __init__(self, inode, blockno, filename, mode='w+b'):
self.inode = inode
self.blockno = blockno
self.last_access = 0
self.pos = 0
self.pos = self.fh.tell()
self.size = os.fstat(self.fh.fileno()).st_size

def read(self, size=None):
Expand Down Expand Up @@ -223,13 +224,14 @@ def __init__(self, max_size, max_entries):
self.max_entries = max_entries
self.size = 0

def remove(self, key):
def remove(self, key, unlink=True):
'''Remove *key* from disk and cache, update size'''

el = self.pop(key)
el.close()
el.unlink()
self.size -= el.size
if unlink:
el.unlink()

def is_full(self):
return (self.size > self.max_size
Expand Down Expand Up @@ -277,12 +279,30 @@ def __init__(self, backend_pool, db, cachedir, max_size, max_entries=768):
self.to_upload = None
self.to_remove = None

if not os.path.exists(self.path):
if os.path.exists(self.path):
self.load_cache()
log.info('Loaded %d entries from cache', len(self.cache))
else:
os.mkdir(self.path)

# Initialized fromt the outside to prevent cyclic dependency
self.fs = None

def load_cache(self):
'''Initialize cache from disk'''

for filename in os.listdir(self.path):
match = re.match('^(\\d+)-(\\d+)$', filename)
if not match:
continue
inode = int(match.group(1))
blockno = int(match.group(2))

el = CacheEntry(inode, blockno,
os.path.join(self.path, filename), mode='r+b')
self.cache[(inode, blockno)] = el
self.cache.size += el.size

def __len__(self):
'''Get number of objects in cache'''
return len(self.cache)
Expand Down Expand Up @@ -349,18 +369,24 @@ def _unlock_entry(self, inode, blockno, release_global=False,
else:
self.mlock.release((inode, blockno), noerror=noerror)

def destroy(self):
def destroy(self, keep_cache=False):
'''Clean up and stop worker threads
This method should be called without the global lock held.
'''

log.debug('Dropping cache...')
log.debug('Flushing cache...')
try:
with lock:
self.drop()
if keep_cache:
self.flush() # releases global lock
for el in self.cache.values():
assert not el.dirty
el.close()
else:
self.drop()
except NoWorkerThreads:
log.error('Unable to drop cache, no upload threads left alive')
log.error('Unable to flush cache, no upload threads left alive')

# Signal termination to worker threads. If some of them
# terminated prematurely, continue gracefully.
Expand Down Expand Up @@ -401,7 +427,8 @@ def destroy(self):
self.upload_threads = None
self.removal_threads = None

os.rmdir(self.path)
if not keep_cache:
os.rmdir(self.path)

log.debug('cleanup done.')

Expand Down Expand Up @@ -1039,7 +1066,10 @@ def __del__(self):
# break reference loop
self.fs = None

if len(self.cache) == 0:
for el in self.cache.values():
if el.dirty:
break
else:
return

# Force execution of sys.excepthook (exceptions raised
Expand Down
69 changes: 47 additions & 22 deletions src/s3ql/fsck.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ def __init__(self, cachedir_, backend_, param, conn):
# don't move them there repeatedly)
self.moved_inodes = set()

def check(self):
def check(self, check_cache=True):
"""Check file system
If *check_cache* is False, assume that all cache files are clean (aka
have been uploaded to the backend). If *check_cache* is ``keep``, do not
remove cache files on exit.
Sets instance variable `found_errors`.
"""

Expand All @@ -75,7 +79,8 @@ def check(self):
try:
self.check_lof()
self.check_uploads()
self.check_cache()
if check_cache:
self.check_cache(check_cache == 'keep')
self.check_names_refcount()

self.check_contents_name()
Expand Down Expand Up @@ -166,7 +171,7 @@ def check_uploads(self):
self.conn.execute('DELETE FROM objects WHERE size = -1')


def check_cache(self):
def check_cache(self, keep_cache=False):
"""Commit uncommitted cache files"""

log.info("Checking for dirty cache objects...")
Expand Down Expand Up @@ -212,7 +217,8 @@ def check_cache(self):
log.debug('Inode %d, block %d has checksum %s', inode, blockno,
hash_is)
if hash_should == hash_is:
os.unlink(os.path.join(self.cachedir, filename))
if not keep_cache:
os.unlink(os.path.join(self.cachedir, filename))
continue

self.found_errors = True
Expand Down Expand Up @@ -253,7 +259,8 @@ def do_write(obj_fh):
self.conn.execute('UPDATE blocks SET refcount=refcount-1 WHERE id=?', (old_block_id,))
self.unlinked_blocks.add(old_block_id)

os.unlink(os.path.join(self.cachedir, filename))
if not keep_cache:
os.unlink(os.path.join(self.cachedir, filename))


def check_lof(self):
Expand Down Expand Up @@ -1097,6 +1104,8 @@ def parse_args(args):
parser.add_version()
parser.add_storage_url()

parser.add_argument("--keep-cache", action="store_true", default=False,
help="Do not purge locally cached files on exit.")
parser.add_argument("--batch", action="store_true", default=False,
help="If user input is required, exit without prompting.")
parser.add_argument("--force", action="store_true", default=False,
Expand Down Expand Up @@ -1145,7 +1154,6 @@ def main(args=None):
else:
log.info('Using cached metadata.')
db = Connection(cachepath + '.db')
assert not os.path.exists(cachepath + '-cache') or param['needs_fsck']

if param['seq_no'] > seq_no:
log.warning('File system has not been unmounted cleanly.')
Expand All @@ -1157,7 +1165,6 @@ def main(args=None):

else:
param = backend.lookup(meta_obj_name)
assert not os.path.exists(cachepath + '-cache')
# .db might exist if mount.s3ql is killed at exactly the right instant
# and should just be ignored.

Expand Down Expand Up @@ -1195,17 +1202,6 @@ def main(args=None):
param['seq_no'] = seq_no
param['needs_fsck'] = True

if not db and os.path.exists(cachepath + '-cache'):
for i in itertools.count():
bak_name = '%s-cache.bak%d' % (cachepath, i)
if not os.path.exists(bak_name):
break
log.warning('Found outdated cache directory (%s), renaming to .bak%d',
cachepath + '-cache', i)
log.warning('You should delete this directory once you are sure that '
'everything is in order.')
os.rename(cachepath + '-cache', bak_name)

if (not param['needs_fsck']
and param['max_inode'] < 2 ** 31
and (time.time() - param['last_fsck'])
Expand All @@ -1216,6 +1212,31 @@ def main(args=None):
log.info('File system is marked as clean. Use --force to force checking.')
return

# When using remote metadata, get rid of outdated local cache (so that we
# don't accidentally upload it)
outdated_cachedir = False
if not db:
try:
for name_ in os.listdir(cachepath + '-cache'):
if name_ not in ('.', '..'):
outdated_cachedir = True
break
except FileNotFoundError:
pass
if outdated_cachedir and param['needs_fsck']:
for i in itertools.count():
bak_name = '%s-cache.bak%d' % (cachepath, i)
if not os.path.exists(bak_name):
break
log.warning('Renaming outdated cache directory %s to .bak%d',
cachepath + '-cache', i)
log.warning('You should delete this directory once you are sure that '
'everything is in order.')
os.rename(cachepath + '-cache', bak_name)
elif outdated_cachedir:
log.info("Flushing outdated local cache...")
shutil.rmtree(cachepath + '-cache')

# If using local metadata, check consistency
if db:
log.info('Checking DB integrity...')
Expand All @@ -1233,22 +1254,26 @@ def main(args=None):
else:
db = download_metadata(backend, cachepath + '.db')

# We only read cache files if the filesystem was not
# unmounted cleanly. On a clean unmount, the cache files can
# not be dirty.
check_cache = param['needs_fsck']
if check_cache and options.keep_cache:
check_cache = 'keep'

# Increase metadata sequence no
param['seq_no'] += 1
param['needs_fsck'] = True
backend['s3ql_seq_no_%d' % param['seq_no']] = b'Empty'
save_params(cachepath, param)

fsck = Fsck(cachepath + '-cache', backend, param, db)
fsck.check()
fsck.check(check_cache)
param['max_inode'] = db.get_val('SELECT MAX(id) FROM inodes')

if fsck.uncorrectable_errors:
raise QuietError("Uncorrectable errors found, aborting.", exitcode=44+128)

if os.path.exists(cachepath + '-cache'):
os.rmdir(cachepath + '-cache')

if param['max_inode'] >= 2 ** 31:
renumber_inodes(db)
param['inode_gen'] += 1
Expand Down
2 changes: 2 additions & 0 deletions src/s3ql/mkfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def main(args=None):
backend.store('s3ql_seq_no_%d' % param['seq_no'], b'Empty')
with open(cachepath + '.params', 'wb') as fh:
fh.write(freeze_basic_mapping(param))
if os.path.exists(cachepath + '-cache'):
shutil.rmtree(cachepath + '-cache')

if data_pw is not None:
print('Please store the following master key in a safe location. It allows ',
Expand Down
21 changes: 8 additions & 13 deletions src/s3ql/mount.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import argparse
import faulthandler
import llfuse
import itertools
import os
import platform
import subprocess
Expand Down Expand Up @@ -194,7 +193,7 @@ def unmount():
mark_metadata_dirty(backend, cachepath, param)

block_cache.init(options.threads)
cm.callback(block_cache.destroy)
cm.callback(block_cache.destroy, options.keep_cache)

metadata_upload_thread.start()
cm.callback(metadata_upload_thread.join)
Expand Down Expand Up @@ -415,20 +414,14 @@ def get_metadata(backend, cachepath):
elif param['max_inode'] > 2 ** 31:
log.warning('Few free inodes remaining, running fsck is recommended')

if os.path.exists(cachepath + '-cache'):
for i in itertools.count():
bak_name = '%s-cache.bak%d' % (cachepath, i)
if not os.path.exists(bak_name):
break
log.warning('Found outdated cache directory (%s), renaming to .bak%d',
cachepath + '-cache', i)
log.warning('You should delete this directory once you are sure that '
'everything is in order.')
os.rename(cachepath + '-cache', bak_name)

# Download metadata
if not db:
db = download_metadata(backend, cachepath + '.db')

# Drop cache
if os.path.exists(cachepath + '-cache'):
shutil.rmtree(cachepath + '-cache')

save_params(cachepath, param)

return (param, db)
Expand Down Expand Up @@ -530,6 +523,8 @@ def compression_type(s):
'this number you have to make sure that your process file descriptor '
'limit (as set with `ulimit -n`) is high enough (at least the number '
'of cache entries + 100).')
parser.add_argument("--keep-cache", action="store_true", default=False,
help="Do not purge locally cached files on exit.")
parser.add_argument("--allow-other", action="store_true", default=False, help=
'Normally, only the user who called `mount.s3ql` can access the mount '
'point. This user then also has full access to it, independent of '
Expand Down
8 changes: 3 additions & 5 deletions tests/t2_block_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,9 @@ def _removal_loop_multi(*a, fn=ctx.cache._removal_loop_multi):

try:
# Try to clean-up (implicitly calls expire)
with assert_logs('Unable to drop cache, no upload threads left alive',
level=logging.ERROR, count=1):
with pytest.raises(OSError) as exc_info:
ctx.cache.destroy()
assert exc_info.value.errno == errno.ENOTEMPTY
with assert_logs('Unable to flush cache, no upload threads left alive',
level=logging.ERROR, count=1):
ctx.cache.destroy(keep_cache=True)
assert upload_exc
assert removal_exc
finally:
Expand Down
Loading

0 comments on commit 98557ef

Please sign in to comment.