Skip to content

Commit

Permalink
fix for issue #6, Run ZODB cache garbage collection when returning la…
Browse files Browse the repository at this point in the history
…rge result sets (#7)

* fix for issue #11, refactor multiple brain creation code blocks

* added unit tests for issue #6, ZODB cache overflow on reading large result sets

* patch for issue #6, cacheGC() when reading long result sets
  • Loading branch information
eprigorodov authored and hannosch committed Jul 17, 2016
1 parent e753360 commit 8ab5ebe
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 53 deletions.
62 changes: 33 additions & 29 deletions src/Products/ZCatalog/Catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,8 @@ def __getitem__(self, index):
else:
# otherwise no score, set all scores to 1
normalized_score, score, key = (1, 1, index)

data = self.data[key]
klass = self._v_result_class
schema_len = len(klass.__record_schema__)
if schema_len == len(data) + 3:
# if we have complete data, create in a single pass
r = klass(tuple(data) + (key, score, normalized_score))
else:
r = klass(data)
r.data_record_id_ = key
r.data_record_score_ = score
r.data_record_normalized_score_ = normalized_score
r = r.__of__(aq_parent(self))
return r
return self.instantiate((key, self.data[key]),
score_data=(score, normalized_score))

def __setstate__(self, state):
""" initialize your brains. This method is called when the
Expand Down Expand Up @@ -429,9 +417,35 @@ def recordify(self, object):
record.append(attr)
return tuple(record)

def instantiate(self, record):
r = self._v_result_class(record[1])
r.data_record_id_ = record[0]
def _maintain_zodb_cache(self):
parent = aq_parent(self)
if hasattr(aq_base(parent), 'maintain_zodb_cache'):
parent.maintain_zodb_cache()

def instantiate(self, record, score_data=None):
""" internal method: create and initialise search result object.
record should be a tuple of (document RID, metadata columns tuple),
score_data can be a tuple of (scode, normalized score) or be omitted"""
self._maintain_zodb_cache()
key, data = record
klass = self._v_result_class
if score_data:
score, normalized_score = score_data
schema_len = len(klass.__record_schema__)
if schema_len == len(data) + 3:
# if we have complete data, create in a single pass
data = tuple(data) + (key, score, normalized_score)
return klass(data).__of__(aq_parent(self))
r = klass(data)
r.data_record_id_ = key
if score_data:
# preserved during refactoring for compatibility reasons:
# can only be reached if score_data is present,
# but schema length is not equal to len(data) + 3
# no known use cases
r.data_record_score_ = score
r.data_record_normalized_score_ = normalized_score
return r.__of__(aq_parent(self))
return r.__of__(self)

def getMetadataForRID(self, rid):
Expand Down Expand Up @@ -678,19 +692,9 @@ def getScoredResult(item, max=max, self=self):
passed into self.useBrains.
"""
score, key = item
data = self.data[key]
klass = self._v_result_class
schema_len = len(klass.__record_schema__)
norm_score = int(100.0 * score / max)
if schema_len == len(data) + 3:
r = klass(tuple(data) + (key, score, norm_score))
else:
r = klass(data)
r.data_record_id_ = key
r.data_record_score_ = score
r.data_record_normalized_score_ = norm_score
r = r.__of__(aq_parent(self))
return r
return self.instantiate((key, self.data[key]),
score_data=(score, norm_score))

sequence, slen = self._limit_sequence(rs, rlen, b_start,
b_size)
Expand Down
50 changes: 28 additions & 22 deletions src/Products/ZCatalog/ZCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,30 @@ def manage_reindexIndex(self, ids=None, REQUEST=None, RESPONSE=None,
'/manage_catalogIndexes'
'?manage_tabs_message=Reindexing%20Performed')

security.declarePrivate('maintain_zodb_cache')
def maintain_zodb_cache(self):
# self.threshold represents the number of times that catalog_object
# needs to be called in order for the catalog to commit
# a subtransaction.
if self.threshold is not None:
# figure out whether or not to commit a subtransaction.
t = id(transaction.get())
if t != self._v_transaction:
self._v_total = 0
self._v_transaction = t
self._v_total = self._v_total + 1
# increment the _v_total counter for this thread only and get a
# reference to the current transaction. the _v_total counter is
# zeroed if we notice that we're in a different transaction than the
# last one that came by. The semantics here mean that we should GC
# the cache if our threshhold is exceeded within the boundaries of
# the current transaction.
if self._v_total > self.threshold:
self._p_jar.cacheGC()
self._v_total = 0
return True
return False

security.declareProtected(manage_zcatalog_entries, 'catalog_object')
def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1,
pghandler=None):
Expand All @@ -486,28 +510,10 @@ def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1,
# catalogObject (which is a word count), because it's
# worthless to us here.

if self.threshold is not None:
# figure out whether or not to commit a subtransaction.
t = id(transaction.get())
if t != self._v_transaction:
self._v_total = 0
self._v_transaction = t
self._v_total = self._v_total + 1
# increment the _v_total counter for this thread only and get
# a reference to the current transaction.
# the _v_total counter is zeroed if we notice that we're in
# a different transaction than the last one that came by.
# self.threshold represents the number of times that
# catalog_object needs to be called in order for the catalog
# to commit a subtransaction. The semantics here mean that
# we should commit a subtransaction if our threshhold is
# exceeded within the boundaries of the current transaction.
if self._v_total > self.threshold:
transaction.savepoint(optimistic=True)
self._p_jar.cacheGC()
self._v_total = 0
if pghandler:
pghandler.info('committing subtransaction')
if self.maintain_zodb_cache():
transaction.savepoint(optimistic=True)
if pghandler:
pghandler.info('committing subtransaction')

security.declareProtected(manage_zcatalog_entries, 'uncatalog_object')
def uncatalog_object(self, uid):
Expand Down
9 changes: 7 additions & 2 deletions src/Products/ZCatalog/tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,13 @@ def testEmptyMappingReturnsAll(self):
for x in range(0, 10):
catalog.catalogObject(dummy(x), repr(x))
self.assertEqual(len(catalog), 10)
length = len(catalog({}))
self.assertEqual(length, 10)
all_data = catalog({})
self.assertEqual(len(all_data), 10)
for rec in all_data:
self.assertEqual(rec.aq_parent, catalog)
self.assertNotEqual(rec.data_record_id_, None)
self.assertEqual(rec.data_record_score_, None)
self.assertEqual(rec.data_record_normalized_score_, None)


class TestCatalogSearchArgumentsMap(unittest.TestCase):
Expand Down
120 changes: 120 additions & 0 deletions src/Products/ZCatalog/tests/test_zodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Unittests for ZCatalog interaction with ZODB persistency
"""

import ExtensionClass
from OFS.Folder import Folder
from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex
from Products.ZCatalog.ZCatalog import ZCatalog
import random
from Testing.makerequest import makerequest
import sys
import transaction
import unittest
import Zope2


class zdummy(ExtensionClass.Base):
meta_type = 'dummy'

def __init__(self, num):
self.id = 'dummy_%d' % (num,)
self.title = 'Dummy %d' % (num,)


class TestPersistentZCatalog(unittest.TestCase):

def setUp(self):
self.app = makerequest(Zope2.app())
self.app._setObject('Catalog', ZCatalog('Catalog'))
self.app.Catalog.addIndex('meta_type', FieldIndex('meta_type'))
self.app.Catalog.addColumn('id')
self.app.Catalog.addColumn('title')
self.app._setObject('Database', Folder('Database'))
# newly added objects have ._p_jar == None, initialize it
transaction.savepoint()

def tearDown(self):
for obj_id in ('Catalog', 'Database'):
self.app._delObject(obj_id, suppress_events=True)

def _make_dummy(self):
num = random.randint(0, sys.maxint)
return zdummy(num)

def _make_persistent_folder(self, obj_id):
self.app.Database._setObject(obj_id, Folder(obj_id))
result = self.app.Database[obj_id]
result.title = 'Folder %s' % (obj_id,)
return result

def _get_zodb_info(self, obj):
conn = obj._p_jar
cache_size_limit = conn.db().getCacheSize()
return conn, cache_size_limit

def _actual_cache_size(self, obj):
return obj._p_jar._cache.cache_non_ghost_count

NUM_RESULTS = 1500
TIMES_MORE = 10

def _fill_catalog(self, catalog, num_objects):
# catalog num_objects of "interesting" documents
# and intersperse them with (num_objects * TIMES_MORE) of dummy objects,
# making sure that "interesting" objects do not share
# the same metadata bucket (as it happens in typical use)
def catalog_dummies(num_dummies):
for j in range(num_dummies):
obj = self._make_dummy()
catalog.catalog_object(obj, uid=obj.id)
for i in range(num_objects):
# catalog average of TIMES_MORE / 2 dummy objects
catalog_dummies(random.randint(1, self.TIMES_MORE))
# catalog normal object
obj_id = 'folder_%i' % (i,)
catalog.catalog_object(self._make_persistent_folder(obj_id))
# catalog another TIMES_MORE / 2 dummy objects
catalog_dummies(random.randint(1, self.TIMES_MORE))
# attach new persistent objects to ZODB connection
transaction.savepoint()

def _test_catalog_search(self, threshold=None):
catalog = self.app.Catalog
self._fill_catalog(catalog, self.NUM_RESULTS)
conn, ignore = self._get_zodb_info(catalog)
conn.cacheGC()
# run large query and read its results
catalog.threshold = threshold
aggregate = 0
for record in catalog(meta_type='Folder'):
aggregate += len(record.title)
return catalog

def test_unmaintained_search(self):
# run large query without cache maintenance
catalog = self._test_catalog_search(threshold=None)
ignore, cache_size_limit = self._get_zodb_info(catalog)
# ZODB connection cache grows out of size limit and eats memory
actual_size = self._actual_cache_size(catalog)
self.assertTrue(actual_size > cache_size_limit * 2)

def test_maintained_search(self):
# run big query with cache maintenance
threshold = 128
catalog = self._test_catalog_search(threshold=threshold)
ignore, cache_size_limit = self._get_zodb_info(catalog)
# ZODB connection cache stays within its size limit
actual_size = self._actual_cache_size(catalog)
self.assertTrue(actual_size <= cache_size_limit + threshold)

0 comments on commit 8ab5ebe

Please sign in to comment.