diff --git a/Changes.txt b/Changes.txt index 72bba368a..35f51b2b9 100644 --- a/Changes.txt +++ b/Changes.txt @@ -1,3 +1,7 @@ +Unreleased Changes + + * Object removal now makes more use of parallelism. + 2018-09-02, S3QL 2.30 * s3qladm has gained a new "recover-key" operation to restore the diff --git a/src/s3ql/backends/common.py b/src/s3ql/backends/common.py index 3d4fe6a62..c888c6262 100644 --- a/src/s3ql/backends/common.py +++ b/src/s3ql/backends/common.py @@ -288,8 +288,15 @@ def iteritems(self): @abstractmethod def has_native_rename(self): '''True if the backend has a native, atomic rename operation''' + pass + @property + def has_delete_multi(self): + '''True if the backend supports `delete_multi`.''' + + return False + def reset(self): '''Reset backend @@ -468,17 +475,7 @@ def delete_multi(self, keys, force=False): error. """ - if not isinstance(keys, list): - raise TypeError('*keys* parameter must be a list') - - for (i, key) in enumerate(keys): - try: - self.delete(key, force=force) - except: - del keys[:i] - raise - - del keys[:] + raise NotImplemented() @abstractmethod def list(self, prefix=''): diff --git a/src/s3ql/backends/comprenc.py b/src/s3ql/backends/comprenc.py index 5e6a54f40..f77d90639 100644 --- a/src/s3ql/backends/comprenc.py +++ b/src/s3ql/backends/comprenc.py @@ -63,6 +63,11 @@ def __init__(self, passphrase, compression, backend): def has_native_rename(self): return self.backend.has_native_rename + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return self.backend.has_delete_multi + @copy_ancestor_docstring def reset(self): self.backend.reset() diff --git a/src/s3ql/backends/local.py b/src/s3ql/backends/local.py index 5b4df417b..ae4c53d55 100644 --- a/src/s3ql/backends/local.py +++ b/src/s3ql/backends/local.py @@ -45,6 +45,11 @@ def __init__(self, options): def has_native_rename(self): return False + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return True + def __str__(self): return 'local directory %s' % self.prefix @@ -123,6 +128,17 @@ def contains(self, key): return False return True + @copy_ancestor_docstring + def delete_multi(self, keys, force=False): + for (i, key) in enumerate(keys): + try: + self.delete(key, force=force) + except: + del keys[:i] + raise + + del keys[:] + @copy_ancestor_docstring def delete(self, key, force=False): path = self._key_to_path(key) diff --git a/src/s3ql/backends/s3.py b/src/s3ql/backends/s3.py index 9d2ec0ca3..27a19a106 100644 --- a/src/s3ql/backends/s3.py +++ b/src/s3ql/backends/s3.py @@ -70,6 +70,11 @@ def _parse_storage_url(self, storage_url, ssl_context): def __str__(self): return 'Amazon S3 bucket %s, prefix %s' % (self.bucket_name, self.prefix) + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return True + @copy_ancestor_docstring def delete_multi(self, keys, force=False): log.debug('started with %s', keys) diff --git a/src/s3ql/backends/swift.py b/src/s3ql/backends/swift.py index cb88d4eec..22b68b285 100644 --- a/src/s3ql/backends/swift.py +++ b/src/s3ql/backends/swift.py @@ -565,6 +565,11 @@ def _delete_multi(self, keys, force=False): raise HTTPError(error_code, error_msg, {}) + @property + @copy_ancestor_docstring + def has_delete_multi(self): + return self.features.has_bulk_delete + @copy_ancestor_docstring def delete_multi(self, keys, force=False): log.debug('started with %s', keys) diff --git a/src/s3ql/block_cache.py b/src/s3ql/block_cache.py index dffc85be8..8f916cb13 100644 --- a/src/s3ql/block_cache.py +++ b/src/s3ql/block_cache.py @@ -297,11 +297,20 @@ def init(self, threads=1): self.upload_threads.append(t) self.to_remove = Queue(1000) - for _ in range(10): - t = threading.Thread(target=self._removal_loop) + with self.backend_pool() as backend: + has_delete_multi = backend.has_delete_multi + + if has_delete_multi: + t = threading.Thread(target=self._removal_loop_multi) t.daemon = True # interruption will do no permanent harm t.start() self.removal_threads.append(t) + else: + for _ in range(20): + t = threading.Thread(target=self._removal_loop_simple) + t.daemon = True # interruption will do no permanent harm + t.start() + self.removal_threads.append(t) def _lock_obj(self, obj_id, release_global=False): '''Acquire lock on *obj*id*''' @@ -689,13 +698,13 @@ def transfer_in_progress(self): return len(self.in_transit) > 0 - def _removal_loop(self): + def _removal_loop_multi(self): '''Process removal queue''' - # This method may look more complicated than necessary, but - # it ensures that we read as many objects from the queue - # as we can without blocking, and then hand them over to - # the backend all at once. + # This method may look more complicated than necessary, but it ensures + # that we read as many objects from the queue as we can without + # blocking, and then hand them over to the backend all at once. + ids = [] while True: try: @@ -704,7 +713,7 @@ def _removal_loop(self): except QueueEmpty: tmp = FlushSentinel - if tmp in (FlushSentinel,QuitSentinel) and ids: + if tmp in (FlushSentinel, QuitSentinel) and ids: log.debug('removing: %s', ids) try: with self.backend_pool() as backend: @@ -719,6 +728,21 @@ def _removal_loop(self): if tmp is QuitSentinel: break + def _removal_loop_simple(self): + '''Process removal queue''' + + while True: + log.debug('reading from queue..') + id_ = self.to_remove.get() + if id_ is QuitSentinel: + break + with self.backend_pool() as backend: + try: + backend.delete('s3ql_data_%d' % id_) + except NoSuchObject: + log.warning('Backend lost object s3ql_data_%d' % id_) + self.fs.failsafe = True + @contextmanager def get(self, inode, blockno): """Get file handle for block `blockno` of `inode` diff --git a/tests/t1_backends.py b/tests/t1_backends.py index 9c05c3d42..c0ea8efe2 100755 --- a/tests/t1_backends.py +++ b/tests/t1_backends.py @@ -176,6 +176,7 @@ def backend(request): backend = ComprencBackend(None, (comprenc_kind, 6), raw_backend) backend.unittest_info = raw_backend.unittest_info + yield backend def yield_local_backend(bi): @@ -467,6 +468,9 @@ def test_delete(backend): # ComprencBackend should just forward this 1:1 to the raw backend. @pytest.mark.with_backend('*/aes') def test_delete_multi(backend): + if not backend.has_delete_multi: + pytest.skip('backend does not support delete_multi') + keys = [ newname() for _ in range(30) ] value = newvalue() diff --git a/tests/t2_block_cache.py b/tests/t2_block_cache.py index ec3a2f145..d837febdf 100755 --- a/tests/t2_block_cache.py +++ b/tests/t2_block_cache.py @@ -50,7 +50,7 @@ def get_nowait(self): def put(self, obj, timeout=None): self.obj = obj - self.cache._removal_loop() + self.cache._removal_loop_simple() return True def get(self, block=True): @@ -140,14 +140,14 @@ def _upload_loop(*a, fn=ctx.cache._upload_loop): except NotADirectoryError: nonlocal upload_exc upload_exc = True - def _removal_loop(*a, fn=ctx.cache._removal_loop): + def _removal_loop_multi(*a, fn=ctx.cache._removal_loop_multi): try: return fn(*a) except NotADirectoryError: nonlocal removal_exc removal_exc = True ctx.cache._upload_loop = _upload_loop - ctx.cache._removal_loop = _removal_loop + ctx.cache._removal_loop_multi = _removal_loop_multi # Start threads ctx.cache.init(threads=3)