Skip to content

Commit

Permalink
functional test with coverage collection input
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Feb 1, 2025
1 parent af69bb1 commit ea3a462
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 14 deletions.
40 changes: 27 additions & 13 deletions tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -2502,13 +2502,12 @@ def test_execute_job_with_collection_input_stac_items(self):
])
def test_execute_job_with_collection_input_coverages_netcdf(self, coverage_parameters, coverage_request):
# type: (JSON, str) -> None
###def test_execute_job_with_collection_input_coverages_netcdf(self, coverage_parameters={"subset": "Lat(10:20),Lon(30:40)", "datetime": "2025-01-01/2025-01-02"}, coverage_request="?subset=Lat(10:20),Lon(30:40)&datetime=2025-01-01/2025-01-02"):
name = "DockerNetCDF2Text"
body = self.retrieve_payload(name, "deploy", local=True)
cwl = self.retrieve_payload(name, "package", local=True)
proc_name = "DockerNetCDF2Text"
body = self.retrieve_payload(proc_name, "deploy", local=True)
cwl = self.retrieve_payload(proc_name, "package", local=True)
body["executionUnit"] = [{"unit": cwl}]
proc = self.fully_qualified_test_name(self._testMethodName)
self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc)
proc_id = self.fully_qualified_test_name(self._testMethodName)
self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc_id)

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames
Expand Down Expand Up @@ -2537,7 +2536,7 @@ def test_execute_job_with_collection_input_coverages_netcdf(self, coverage_param

for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
proc_url = f"/processes/{proc}/execution"
proc_url = f"/processes/{proc_id}/execution"
resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5,
data=col_exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"
Expand All @@ -2547,16 +2546,31 @@ def test_execute_job_with_collection_input_coverages_netcdf(self, coverage_param
assert "output_txt" in results

job_id = status_url.rsplit("/", 1)[-1]
log_url = f"{status_url}/logs"
log_txt = self.app.get(log_url, headers={"Accept": ContentType.TEXT_PLAIN}).text
cov_col = "coverage.nc" # file name applied by 'collection_processor' (resolved by 'format' + 'type' extension)
cov_out = "coverage.txt" # extension modified by invoked process from input file name, literal copy of NetCDF
assert cov_col in log_txt, "Resolved NetCDF file from collection handler should have been logged."
assert cov_out in log_txt, "Chained NetCDF copied by the process as text should have been logged."

wps_dir = get_wps_output_dir(self.settings)
job_dir = os.path.join(wps_dir, job_id)
cov_fn = "coverage.nc" # file name applied by 'collection_processor' (resolved by 'format' + 'type' extension)
cov_out = "coverage.txt" # extension modified by invoked process from input file name
job_out = os.path.join(job_dir, "output_txt", cov_out)
assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]"
with open(job_out, mode="r", encoding="utf-8") as out_fd:
out_data = out_fd.read()
assert cov_fn in out_data
assert cov_out in out_data
with open(job_out, mode="rb") as out_fd: # output, although ".txt" is actually a copy of the submitted NetCDF
out_data = out_fd.read(3)
assert out_data == b"CDF", "Output file from (collection + process) chain should contain the NetCDF header."

for file_path in [
os.path.join(job_dir, cov_col),
os.path.join(job_dir, "inputs", cov_col),
os.path.join(job_dir, "output_txt", cov_col),
os.path.join(job_out, "inputs", cov_col),
os.path.join(job_out, "output_txt", cov_col),
]:
assert not os.path.exists(file_path), (
f"Intermediate collection coverage file should not exist: [{file_path}]"
)

def test_execute_job_with_context_output_dir(self):
cwl = {
Expand Down
24 changes: 23 additions & 1 deletion weaver/processes/builtin/collection_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from weaver.wps_restapi import swagger_definitions as sd # isort:skip # noqa: E402

if TYPE_CHECKING:
from typing import List
from typing import Dict, List

from pystac import Asset

Expand Down Expand Up @@ -133,6 +133,7 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
for arg in list(col_args):
if "-" in arg:
col_args[arg.replace("-", "_")] = col_args.pop(arg)
col_args = parse_collection_parameters(col_args)

logger.log( # pylint: disable=E1205 # false positive
logging.INFO,
Expand Down Expand Up @@ -274,6 +275,27 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
return resolved_files


def parse_collection_parameters(parameters):
# type: (Dict[str, JSON]) -> Dict[str, JSON]
"""
Applies any relevant conversions of known parameters between allowed request format and expected utilities.
"""
if not parameters:
return {}

subset = parameters.get("subset")
if subset and isinstance(subset, str):
subset_dims = {}
for item in subset.split(","):
dim, span = item.split("(", 1)
span = span.split(")", 1)[0]
ranges = span.split(":")
subset_dims[dim] = list(ranges)
parameters["subset"] = subset_dims

return parameters


def process_cwl(collection_input, input_definition, output_dir):
# type: (JobValueCollection, ProcessInputOutputItem, Path) -> CWL_IO_ValueMap
files = process_collection(collection_input, input_definition, output_dir)
Expand Down

0 comments on commit ea3a462

Please sign in to comment.