From 5647e09bd931d3fff815c2da0bb0db4c6bc93f22 Mon Sep 17 00:00:00 2001 From: Nikolaus Rath Date: Mon, 1 Oct 2018 12:18:19 +0100 Subject: [PATCH] Make delete_multi() optional. If delete_multi() is implemented in terms of repeated calls to delete(), then we should not use it because it completely removes any advantage that we get from multiple removal threads (all the objects can get assigned to one thread, which will then process them sequentially). --- Changes.txt | 4 ++++ src/s3ql/backends/common.py | 19 +++++++---------- src/s3ql/backends/comprenc.py | 5 +++++ src/s3ql/backends/local.py | 16 ++++++++++++++ src/s3ql/backends/s3.py | 5 +++++ src/s3ql/backends/swift.py | 5 +++++ src/s3ql/block_cache.py | 40 ++++++++++++++++++++++++++++------- tests/t1_backends.py | 4 ++++ tests/t2_block_cache.py | 6 +++--- 9 files changed, 82 insertions(+), 22 deletions(-) diff --git a/Changes.txt b/Changes.txt index 72bba368a..35f51b2b9 100644 --- a/Changes.txt +++ b/Changes.txt @@ -1,3 +1,7 @@ +Unreleased Changes + + * Object removal now makes more use of parallelism. + 2018-09-02, S3QL 2.30 * s3qladm has gained a new "recover-key" operation to restore the diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py index 3d4fe6a62..c888c6262 100644 --- a/src/s3ql/backends/common.py +++ b/src/s3ql/backends/common.py @@ -288,8 +288,15 @@ def iteritems(self): @abstractmethod def has_native_rename(self): '''True if the backend has a native, atomic rename operation''' + pass + @property + def has_delete_multi(self): + '''True if the backend supports `delete_multi`.''' + + return False + def reset(self): '''Reset backend @@ -468,17 +475,7 @@ def delete_multi(self, keys, force=False): error. """ - if not isinstance(keys, list): - raise TypeError('*keys* parameter must be a list') - - for (i, key) in enumerate(keys): - try: - self.delete(key, force=force) - except: - del keys[:i] - raise - - del keys[:] + raise NotImplemented() @abstractmethod def list(self, prefix=''): diff --git a/src/s3ql/backends/comprenc.py b/src/s3ql/backends/comprenc.py index 5e6a54f40..f77d90639 100644 --- a/src/s3ql/backends/comprenc.py +++ b/src/s3ql/backends/comprenc.py @@ -63,6 +63,11 @@ def __init__(self, passphrase, compression, backend): def has_native_rename(self): return self.backend.has_native_rename + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return self.backend.has_delete_multi + @copy_ancestor_docstring def reset(self): self.backend.reset() diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py index 5b4df417b..ae4c53d55 100644 --- a/src/s3ql/backends/local.py +++ b/src/s3ql/backends/local.py @@ -45,6 +45,11 @@ def __init__(self, options): def has_native_rename(self): return False + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return True + def __str__(self): return 'local directory %s' % self.prefix @@ -123,6 +128,17 @@ def contains(self, key): return False return True + @copy_ancestor_docstring + def delete_multi(self, keys, force=False): + for (i, key) in enumerate(keys): + try: + self.delete(key, force=force) + except: + del keys[:i] + raise + + del keys[:] + @copy_ancestor_docstring def delete(self, key, force=False): path = self._key_to_path(key) diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py index 9d2ec0ca3..27a19a106 100644 --- a/src/s3ql/backends/s3.py +++ b/src/s3ql/backends/s3.py @@ -70,6 +70,11 @@ def _parse_storage_url(self, storage_url, ssl_context): def __str__(self): return 'Amazon S3 bucket %s, prefix %s' % (self.bucket_name, self.prefix) + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return True + @copy_ancestor_docstring def delete_multi(self, keys, force=False): log.debug('started with %s', keys) diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py index cb88d4eec..22b68b285 100644 --- a/src/s3ql/backends/swift.py +++ b/src/s3ql/backends/swift.py @@ -565,6 +565,11 @@ def _delete_multi(self, keys, force=False): raise HTTPError(error_code, error_msg, {}) + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return self.features.has_bulk_delete + @copy_ancestor_docstring def delete_multi(self, keys, force=False): log.debug('started with %s', keys) diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py index dffc85be8..8f916cb13 100644 --- a/src/s3ql/block_cache.py +++ b/src/s3ql/block_cache.py @@ -297,11 +297,20 @@ def init(self, threads=1): self.upload_threads.append(t) self.to_remove = Queue(1000) - for _ in range(10): - t = threading.Thread(target=self._removal_loop) + with self.backend_pool() as backend: + has_delete_multi = backend.has_delete_multi + + if has_delete_multi: + t = threading.Thread(target=self._removal_loop_multi) t.daemon = True # interruption will do no permanent harm t.start() self.removal_threads.append(t) + else: + for _ in range(20): + t = threading.Thread(target=self._removal_loop_simple) + t.daemon = True # interruption will do no permanent harm + t.start() + self.removal_threads.append(t) def _lock_obj(self, obj_id, release_global=False): '''Acquire lock on *obj*id*''' @@ -689,13 +698,13 @@ def transfer_in_progress(self): return len(self.in_transit) > 0 - def _removal_loop(self): + def _removal_loop_multi(self): '''Process removal queue''' - # This method may look more complicated than necessary, but - # it ensures that we read as many objects from the queue - # as we can without blocking, and then hand them over to - # the backend all at once. + # This method may look more complicated than necessary, but it ensures + # that we read as many objects from the queue as we can without + # blocking, and then hand them over to the backend all at once. + ids = [] while True: try: @@ -704,7 +713,7 @@ def _removal_loop(self): except QueueEmpty: tmp = FlushSentinel - if tmp in (FlushSentinel,QuitSentinel) and ids: + if tmp in (FlushSentinel, QuitSentinel) and ids: log.debug('removing: %s', ids) try: with self.backend_pool() as backend: @@ -719,6 +728,21 @@ def _removal_loop(self): if tmp is QuitSentinel: break + def _removal_loop_simple(self): + '''Process removal queue''' + + while True: + log.debug('reading from queue..') + id_ = self.to_remove.get() + if id_ is QuitSentinel: + break + with self.backend_pool() as backend: + try: + backend.delete('s3ql_data_%d' % id_) + except NoSuchObject: + log.warning('Backend lost object s3ql_data_%d' % id_) + self.fs.failsafe = True + @contextmanager def get(self, inode, blockno): """Get file handle for block `blockno` of `inode` diff --git a/tests/t1_backends.py b/tests/t1_backends.py index 9c05c3d42..c0ea8efe2 100755 --- a/tests/t1_backends.py +++ b/tests/t1_backends.py @@ -176,6 +176,7 @@ def backend(request): backend = ComprencBackend(None, (comprenc_kind, 6), raw_backend) backend.unittest_info = raw_backend.unittest_info + yield backend def yield_local_backend(bi): @@ -467,6 +468,9 @@ def test_delete(backend): # ComprencBackend should just forward this 1:1 to the raw backend. @pytest.mark.with_backend('*/aes') def test_delete_multi(backend): + if not backend.has_delete_multi: + pytest.skip('backend does not support delete_multi') + keys = [ newname() for _ in range(30) ] value = newvalue() diff --git a/tests/t2_block_cache.py b/tests/t2_block_cache.py index ec3a2f145..d837febdf 100755 --- a/tests/t2_block_cache.py +++ b/tests/t2_block_cache.py @@ -50,7 +50,7 @@ def get_nowait(self): def put(self, obj, timeout=None): self.obj = obj - self.cache._removal_loop() + self.cache._removal_loop_simple() return True def get(self, block=True): @@ -140,14 +140,14 @@ def _upload_loop(*a, fn=ctx.cache._upload_loop): except NotADirectoryError: nonlocal upload_exc upload_exc = True - def _removal_loop(*a, fn=ctx.cache._removal_loop): + def _removal_loop_multi(*a, fn=ctx.cache._removal_loop_multi): try: return fn(*a) except NotADirectoryError: nonlocal removal_exc removal_exc = True ctx.cache._upload_loop = _upload_loop - ctx.cache._removal_loop = _removal_loop + ctx.cache._removal_loop_multi = _removal_loop_multi # Start threads ctx.cache.init(threads=3)