Skip to content

Commit

Permalink
Added my time stamp list splitter to splitIdList for imgs workflow (#84)
Browse files Browse the repository at this point in the history
Added option to id list splitter to compare time stamps and only add ids to the list if the target object is older than the holdings file pdb date.
  • Loading branch information
trumbullm authored Feb 4, 2025
1 parent 498942b commit 2d5a9d3
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 4 deletions.
3 changes: 2 additions & 1 deletion HISTORY.txt
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,5 @@
Add CLI support for performing final sanity check for ExDB loading and holdings in etl.load_ex.DbLoadingWorkflow task;
Update CI/CD testing to use python 3.10
23-Dec-2024 V1.726 Skip integers that exceed max int32 in DataTransformFactory
7-Jan-2025 V1.727 Handle "None" values in vrpt data
7-Jan-2025 V1.727 Handle "None" values in vrpt data
22-Jan-2025 V1.728 Add Imgs format option (for jpg/svg generation) to splitIdList()
8 changes: 8 additions & 0 deletions rcsb/db/cli/RepoLoadExec.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Add arguments and logic to support CLI usage from weekly-update workflow;
# Add support for logging output to a specific file
# 25-Apr-2024 - dwp Add support for remote config file loading; use underscores instead of hyphens for arg choices
# 22-Jan-2025 - mjt Add Imgs format option flags
##
__docformat__ = "restructuredtext en"
__author__ = "John Westbrook"
Expand Down Expand Up @@ -116,6 +117,10 @@ def main():
help="Compare the number of loaded entries with the number expected by the holdings (for op 'pdbx_loader_check')"
)
parser.add_argument("--log_file_path", default=None, help="Path to runtime log file output.")
# args for imgs workflow format
parser.add_argument("--incremental_update", default=False, action="store_true", help="Whether the process should look at timestamps (see --target_file_dir and --target_file_suffix) to find a delta list of ids to update. Default is a full update.")
parser.add_argument("--target_file_dir", default=None, help="Location of files for timestamp comparisons.")
parser.add_argument("--target_file_suffix", default="", help="Suffix attached to pdb id for timestamp comparison file.")
#
args = parser.parse_args()
#
Expand Down Expand Up @@ -262,6 +267,9 @@ def processArguments(args):
"forceReload": args.force_reload,
"minNpiValidationCount": int(args.min_npi_validation_count) if args.min_npi_validation_count else None,
"checkLoadWithHoldings": args.check_load_with_holdings,
"incrementalUpdate": args.incremental_update,
"targetFileDir": args.target_file_dir,
"targetFileSuffix": args.target_file_suffix,
}

return op, commonD, loadD
Expand Down
2 changes: 1 addition & 1 deletion rcsb/db/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
__author__ = "John Westbrook"
__email__ = "[email protected]"
__license__ = "Apache 2.0"
__version__ = "1.727"
__version__ = "1.728"
143 changes: 143 additions & 0 deletions rcsb/db/tests/testPdbCsmImageSplitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
##
# File: testPdbCsmImageSplitter.py
# Author: Michael Trumbull
# Date: 12-Dec-2024
# Version: 0.01
#
# Updates:
#
##

__docformat__ = "google en"
__author__ = "Michael Trumbull"
__email__ = "[email protected]"
__license__ = "Apache 2.0"

import logging
import os
import platform
import resource
import time
import unittest
from pathlib import Path

from rcsb.db.wf.RepoLoadWorkflow import RepoLoadWorkflow

HERE = os.path.abspath(os.path.dirname(__file__))
TOPDIR = os.path.dirname(os.path.dirname(os.path.dirname(HERE)))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]-%(module)s.%(funcName)s: %(message)s")
logger = logging.getLogger(__name__)


class TestPdbCsmImagesSplitter(unittest.TestCase):
def setUp(self) -> None:
self.__startTime = time.time()
# self.__cachePath = os.path.join(HERE, "test-data")
self.__workPath = os.path.join(HERE, "test-output")
self.mockdataDir = os.path.join(TOPDIR, "rcsb", "mock-data", "MOCK_IMGS_WF_BCIF_DATA")
logger.info("Starting %s at %s", self.id(), time.strftime("%Y %m %d %H:%M:%S", time.localtime()))

def tearDown(self) -> None:
unitS = "MB" if platform.system() == "Darwin" else "GB"
rusageMax = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
logger.info("Maximum resident memory size %.4f %s", rusageMax / 10 ** 6, unitS)
endTime = time.time()
logger.info("Completed %s at %s (%.4f seconds)", self.id(), time.strftime("%Y %m %d %H:%M:%S", time.localtime()), endTime - self.__startTime)

def testIdListGeneration(self) -> None:
"""Test id list file generation ..."""
try:
logger.info("creating object")
rlWf = RepoLoadWorkflow()
logger.info("Generating 3 id lists to run through.")
logger.info("mockdataDir %s", self.mockdataDir)
logger.info("workpath %s", self.__workPath)

ok = rlWf.splitIdList(
"pdbx_id_list_splitter",
databaseName="pdbx_core",
holdingsFilePath=os.path.join(self.mockdataDir, "holdings/released_structures_last_modified_dates.json.gz"),
loadFileListDir=self.__workPath,
numSublistFiles=3,
incrementalUpdate=True,
useTrippleFormat=True,
noBcifSubdirs=True,
targetFileDir=self.__workPath,
targetFileSuffix="_model-1.jpg",
)

self.assertTrue(ok)
ok1 = self.checkList(os.path.join(self.__workPath, "pdbx_core_ids-1.txt"))
if not ok1:
logger.error("idList_0.txt failed")
self.assertTrue(ok1)
ok2 = self.checkList(os.path.join(self.__workPath, "pdbx_core_ids-2.txt"))
if not ok2:
logger.error("idList_1.txt failed")
self.assertTrue(ok2)
ok3 = self.checkList(os.path.join(self.__workPath, "pdbx_core_ids-3.txt"))
if not ok3:
logger.error("idList_2.txt failed")
self.assertTrue(ok3)
ok = rlWf.splitIdList(
"pdbx_id_list_splitter",
databaseName="pdbx_comp_model_core",
holdingsFilePath=os.path.join(self.mockdataDir, "holdings/computed-models-holdings-list.json"),
loadFileListDir=self.__workPath,
numSublistFiles=3,
incrementalUpdate=True,
useTrippleFormat=True,
noBcifSubdirs=True,
targetFileDir=self.__workPath,
targetFileSuffix="_model-1.jpg",
)
self.assertTrue(ok)
ok1 = self.checkList(os.path.join(self.__workPath, "pdbx_comp_model_core_ids-1.txt"))
if not ok1:
logger.error("idList_0.txt failed")
self.assertTrue(ok1)
ok2 = self.checkList(os.path.join(self.__workPath, "pdbx_comp_model_core_ids-2.txt"))
if not ok2:
logger.error("idList_1.txt failed")
self.assertTrue(ok2)
ok3 = self.checkList(os.path.join(self.__workPath, "pdbx_comp_model_core_ids-3.txt"))
if not ok3:
logger.error("idList_2.txt failed")
self.assertTrue(ok3)

logger.info("Reading generated lists and checking for format.")

except Exception as e:
logger.exception("Failing with %s", str(e))
self.fail("Failed to build idLists")

def checkList(self, ids: str) -> bool:

try:
logger.info('ids path for checkList %s', ids)
allDataPresent = True
with Path(ids).open("r", encoding="utf-8") as file:
idList = [line.rstrip("\n") for line in file]
for line in idList:
logger.info('line from file is: %s', line)
fileId = line.split()
if (len(fileId) == 0):
logger.error("Found pdbid with length zero.")
allDataPresent = False
logger.info('End of a single checkList. Returning a value of %s', allDataPresent)
return allDataPresent
except Exception:
logger.exception("Failed to find created file %s", ids)
return False


def suiteFileGeneration():
suiteSelect = unittest.TestSuite()
suiteSelect.addTest(TestPdbCsmImagesSplitter("testIdListGeneration"))
return suiteSelect


if __name__ == "__main__":
mySuite = suiteFileGeneration()
unittest.TextTestRunner(verbosity=2).run(mySuite)
37 changes: 36 additions & 1 deletion rcsb/db/wf/RepoLoadWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# 26-Apr-2023 dwp Add regexPurge flag to control running regexp purge step during document load (with default set to False)
# 7-Nov-2023 dwp Add maxStepLength parameter
# 26-Mar-2024 dwp Add arguments and methods to support CLI usage from weekly-update workflow
# 22-Jan-2025 mjt Add Imgs format option (for jpg/svg generation) to splitIdList()
#
##
__docformat__ = "restructuredtext en"
Expand All @@ -22,6 +23,8 @@
import os
import random
import math
import datetime
from pathlib import Path

from rcsb.db.cli.RepoHoldingsEtlWorker import RepoHoldingsEtlWorker
from rcsb.db.cli.SequenceClustersEtlWorker import SequenceClustersEtlWorker
Expand Down Expand Up @@ -286,7 +289,11 @@ def splitIdList(self, op, **kwargs):
holdingsFilePath = kwargs.get("holdingsFilePath", None) # For CSMs: http://computed-models-internal-%s.rcsb.org/staging/holdings/computed-models-holdings-list.json
loadFileListDir = kwargs.get("loadFileListDir") # ExchangeDbConfig().loadFileListsDir
loadFileListPrefix = databaseName + "_ids" # pdbx_core_ids or pdbx_comp_model_core_ids
numSublistFiles = kwargs.get("numSublistFiles") # ExchangeDbConfig().pdbxCoreNumberSublistFiles
numSublistFiles = kwargs.get("numSublistFiles", 1) # ExchangeDbConfig().pdbxCoreNumberSublistFiles

incrementalUpdate = kwargs.get("incrementalUpdate", False)
targetFileDir = kwargs.get("targetFileDir", "")
targetFileSuffix = kwargs.get("targetFileSuffix", "_model-1.jpg")
#
mU = MarshalUtil(workPath=self.__cachePath)
#
Expand All @@ -295,7 +302,12 @@ def splitIdList(self, op, **kwargs):
if not holdingsFilePath:
holdingsFilePath = os.path.join(self.__cfgOb.getPath("PDB_REPO_URL", sectionName=self.__configName), "pdb/holdings/released_structures_last_modified_dates.json.gz")
holdingsFileD = mU.doImport(holdingsFilePath, fmt="json")

if incrementalUpdate:
holdingsFileD = self.getTimeStampCheck(holdingsFileD, targetFileDir, targetFileSuffix)

idL = [k.upper() for k in holdingsFileD]

logger.info("Total number of entries to load: %d (obtained from file: %s)", len(idL), holdingsFilePath)
random.shuffle(idL) # randomize the order to reduce the chance of consecutive large structures occurring (which may cause memory spikes)
filePathMappingD = self.splitIdListAndWriteToFiles(idL, numSublistFiles, loadFileListDir, loadFileListPrefix, holdingsFilePath)
Expand All @@ -309,10 +321,15 @@ def splitIdList(self, op, **kwargs):
holdingsFileBaseDir = self.__cfgOb.getPath("PDBX_COMP_MODEL_REPO_PATH", sectionName=self.__configName)
holdingsFileD = mU.doImport(holdingsFilePath, fmt="json")
#

if len(holdingsFileD) == 1:
# Split up single holdings file into multiple sub-lists
holdingsFile = os.path.join(holdingsFileBaseDir, list(holdingsFileD.keys())[0])
hD = mU.doImport(holdingsFile, fmt="json")

if incrementalUpdate:
hD = self.getTimeStampCheck(hD, targetFileDir, targetFileSuffix)

idL = [k.upper() for k in hD]
logger.info("Total number of entries to load for holdingsFile %s: %d", holdingsFile, len(idL))
filePathMappingD = self.splitIdListAndWriteToFiles(idL, numSublistFiles, loadFileListDir, loadFileListPrefix, holdingsFile)
Expand All @@ -324,6 +341,8 @@ def splitIdList(self, op, **kwargs):
for hF, count in holdingsFileD.items():
holdingsFile = os.path.join(holdingsFileBaseDir, hF)
hD = mU.doImport(holdingsFile, fmt="json")
if incrementalUpdate:
hD = self.getTimeStampCheck(hD, targetFileDir, targetFileSuffix)
idL = [k.upper() for k in hD]
logger.info("Total number of entries to load for holdingsFile %s: %d", holdingsFile, len(idL))
#
Expand Down Expand Up @@ -351,6 +370,22 @@ def splitIdList(self, op, **kwargs):

return ok

def getTimeStampCheck(self, hD, targetFileDir, targetFileSuffix):
res = hD.copy()
for pdbid, value in hD.items():
pathToItem = os.path.join(targetFileDir, pdbid + targetFileSuffix)
if isinstance(value, dict):
timeStamp = value["lastModifiedDate"]
value["modelPath"].lower()
else:
timeStamp = value
if Path(pathToItem).exists():
t1 = Path(pathToItem).stat().st_mtime
t2 = datetime.datetime.strptime(timeStamp, "%Y-%m-%dT%H:%M:%S%z").timestamp()
if t1 > t2:
res.pop(pdbid)
return res

def splitIdListAndWriteToFiles(self, inputList, nFiles, outfileDir, outfilePrefix, sourceFile):
"""Split input ID list into equally distributed sublists of size nFiles.
Expand Down

0 comments on commit 2d5a9d3

Please sign in to comment.