Skip to content

Commit

Permalink
Make delete_multi() optional.
Browse files Browse the repository at this point in the history
If delete_multi() is implemented in terms of repeated calls to
delete(), then we should not use it because it completely removes
any advantage that we get from multiple removal threads (all the
objects can get assigned to one thread, which will then process them
sequentially).
  • Loading branch information
Nikratio committed Oct 1, 2018
1 parent 3fd3070 commit 5647e09
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 22 deletions.
4 changes: 4 additions & 0 deletions Changes.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Unreleased Changes

* Object removal now makes more use of parallelism.

2018-09-02, S3QL 2.30

* s3qladm has gained a new "recover-key" operation to restore the
Expand Down
19 changes: 8 additions & 11 deletions src/s3ql/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,15 @@ def iteritems(self):
@abstractmethod
def has_native_rename(self):
'''True if the backend has a native, atomic rename operation'''

pass

@property
def has_delete_multi(self):
'''True if the backend supports `delete_multi`.'''

return False

def reset(self):
'''Reset backend
Expand Down Expand Up @@ -468,17 +475,7 @@ def delete_multi(self, keys, force=False):
error.
"""

if not isinstance(keys, list):
raise TypeError('*keys* parameter must be a list')

for (i, key) in enumerate(keys):
try:
self.delete(key, force=force)
except:
del keys[:i]
raise

del keys[:]
raise NotImplemented()

@abstractmethod
def list(self, prefix=''):
Expand Down
5 changes: 5 additions & 0 deletions src/s3ql/backends/comprenc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def __init__(self, passphrase, compression, backend):
def has_native_rename(self):
return self.backend.has_native_rename

@property
@copy_ancestor_docstring
def has_delete_multi(self):
return self.backend.has_delete_multi

@copy_ancestor_docstring
def reset(self):
self.backend.reset()
Expand Down
16 changes: 16 additions & 0 deletions src/s3ql/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def __init__(self, options):
def has_native_rename(self):
return False

@property
@copy_ancestor_docstring
def has_delete_multi(self):
return True

def __str__(self):
return 'local directory %s' % self.prefix

Expand Down Expand Up @@ -123,6 +128,17 @@ def contains(self, key):
return False
return True

@copy_ancestor_docstring
def delete_multi(self, keys, force=False):
for (i, key) in enumerate(keys):
try:
self.delete(key, force=force)
except:
del keys[:i]
raise

del keys[:]

@copy_ancestor_docstring
def delete(self, key, force=False):
path = self._key_to_path(key)
Expand Down
5 changes: 5 additions & 0 deletions src/s3ql/backends/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def _parse_storage_url(self, storage_url, ssl_context):
def __str__(self):
return 'Amazon S3 bucket %s, prefix %s' % (self.bucket_name, self.prefix)

@property
@copy_ancestor_docstring
def has_delete_multi(self):
return True

@copy_ancestor_docstring
def delete_multi(self, keys, force=False):
log.debug('started with %s', keys)
Expand Down
5 changes: 5 additions & 0 deletions src/s3ql/backends/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,11 @@ def _delete_multi(self, keys, force=False):

raise HTTPError(error_code, error_msg, {})

@property
@copy_ancestor_docstring
def has_delete_multi(self):
return self.features.has_bulk_delete

@copy_ancestor_docstring
def delete_multi(self, keys, force=False):
log.debug('started with %s', keys)
Expand Down
40 changes: 32 additions & 8 deletions src/s3ql/block_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,20 @@ def init(self, threads=1):
self.upload_threads.append(t)

self.to_remove = Queue(1000)
for _ in range(10):
t = threading.Thread(target=self._removal_loop)
with self.backend_pool() as backend:
has_delete_multi = backend.has_delete_multi

if has_delete_multi:
t = threading.Thread(target=self._removal_loop_multi)
t.daemon = True # interruption will do no permanent harm
t.start()
self.removal_threads.append(t)
else:
for _ in range(20):
t = threading.Thread(target=self._removal_loop_simple)
t.daemon = True # interruption will do no permanent harm
t.start()
self.removal_threads.append(t)

def _lock_obj(self, obj_id, release_global=False):
'''Acquire lock on *obj*id*'''
Expand Down Expand Up @@ -689,13 +698,13 @@ def transfer_in_progress(self):

return len(self.in_transit) > 0

def _removal_loop(self):
def _removal_loop_multi(self):
'''Process removal queue'''

# This method may look more complicated than necessary, but
# it ensures that we read as many objects from the queue
# as we can without blocking, and then hand them over to
# the backend all at once.
# This method may look more complicated than necessary, but it ensures
# that we read as many objects from the queue as we can without
# blocking, and then hand them over to the backend all at once.

ids = []
while True:
try:
Expand All @@ -704,7 +713,7 @@ def _removal_loop(self):
except QueueEmpty:
tmp = FlushSentinel

if tmp in (FlushSentinel,QuitSentinel) and ids:
if tmp in (FlushSentinel, QuitSentinel) and ids:
log.debug('removing: %s', ids)
try:
with self.backend_pool() as backend:
Expand All @@ -719,6 +728,21 @@ def _removal_loop(self):
if tmp is QuitSentinel:
break

def _removal_loop_simple(self):
'''Process removal queue'''

while True:
log.debug('reading from queue..')
id_ = self.to_remove.get()
if id_ is QuitSentinel:
break
with self.backend_pool() as backend:
try:
backend.delete('s3ql_data_%d' % id_)
except NoSuchObject:
log.warning('Backend lost object s3ql_data_%d' % id_)
self.fs.failsafe = True

@contextmanager
def get(self, inode, blockno):
"""Get file handle for block `blockno` of `inode`
Expand Down
4 changes: 4 additions & 0 deletions tests/t1_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def backend(request):
backend = ComprencBackend(None, (comprenc_kind, 6), raw_backend)

backend.unittest_info = raw_backend.unittest_info

yield backend

def yield_local_backend(bi):
Expand Down Expand Up @@ -467,6 +468,9 @@ def test_delete(backend):
# ComprencBackend should just forward this 1:1 to the raw backend.
@pytest.mark.with_backend('*/aes')
def test_delete_multi(backend):
if not backend.has_delete_multi:
pytest.skip('backend does not support delete_multi')

keys = [ newname() for _ in range(30) ]
value = newvalue()

Expand Down
6 changes: 3 additions & 3 deletions tests/t2_block_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def get_nowait(self):

def put(self, obj, timeout=None):
self.obj = obj
self.cache._removal_loop()
self.cache._removal_loop_simple()
return True

def get(self, block=True):
Expand Down Expand Up @@ -140,14 +140,14 @@ def _upload_loop(*a, fn=ctx.cache._upload_loop):
except NotADirectoryError:
nonlocal upload_exc
upload_exc = True
def _removal_loop(*a, fn=ctx.cache._removal_loop):
def _removal_loop_multi(*a, fn=ctx.cache._removal_loop_multi):
try:
return fn(*a)
except NotADirectoryError:
nonlocal removal_exc
removal_exc = True
ctx.cache._upload_loop = _upload_loop
ctx.cache._removal_loop = _removal_loop
ctx.cache._removal_loop_multi = _removal_loop_multi

# Start threads
ctx.cache.init(threads=3)
Expand Down

0 comments on commit 5647e09

Please sign in to comment.