From 4633c618b44380135ef638daaafeb1c5e36bade1 Mon Sep 17 00:00:00 2001 From: Ian Patterson Date: Tue, 11 Jun 2024 12:33:18 -0500 Subject: [PATCH] OGC Connected Systems API (#928) * [CSAPI] add systems * [CSAPI] add procedures and deployments * [CSAPI] add properties and samplingFeatures * [CSAPI] add datastreams and observations * [CSAPI] add controlchannels.py and commands.py * [CSAPI] add systemevents.py and systemhistory.py * [CSAPI] consolidate all files into one * [CSAPI] reorganize sections to fit docs * [CSAPI] add query parameters to methods for part 1 and 2 * [CSAPI] update _request method to use headers for PUTs * [CSAPI] add tests for most parts of CSAPI, fix bugs encountered * move repeatedly referenced data to fixtures in test * fix and annotate remaining possible tests * add a response handler option to and default to PUT, POST, and DELETE methods for more positive control over response results * update tests to take advantage to response handler function data where needed * combine system tests into a single one to remove potential ordering issues * update systems, datastreams observations, and system history tests. Add headers to PUT method in ogcapi base class * update sampling features tests and combine to help ensure repeatability * remove unnecessary print statements in test * remove unused import * remove unused default response handlers * remove redundant observation deletion test and move functionality into comprehensive observation test that cleans up after itself * remove intentionally skipped tests * remove more fixture deps for datastream test, use individual instances in utility functions * rename test_connectedsystems_osh.py to test_ogcapi_connectedsystems_osh.py * add noqa to allow "unused" doc local variable same as parent ogcapi files * add Connected Systems API info to the docs * clean up import order, fix small error in doc example * fix a grammar issue * add readonly tests and skip transactional ones --- docs/source/features.rst | 6 + docs/source/usage.rst | 80 ++ owslib/ogcapi/__init__.py | 2 +- owslib/ogcapi/connectedsystems.py | 1245 +++++++++++++++++++++ setup.py | 3 +- tests/test_ogcapi_connectedsystems_osh.py | 428 +++++++ 6 files changed, 1762 insertions(+), 2 deletions(-) create mode 100644 owslib/ogcapi/connectedsystems.py create mode 100644 tests/test_ogcapi_connectedsystems_osh.py diff --git a/docs/source/features.rst b/docs/source/features.rst index b253f10d..c2db2369 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst @@ -66,6 +66,10 @@ OGC API Support +--------------------------------------------------------------------------------------+------------+ | `OGC API - Processes - Part 1: Core`_ | 1.0 | +--------------------------------------------------------------------------------------+------------+ +| `OGC API - Connected Systems - Part 1: Feature Resources`_ | draft | ++--------------------------------------------------------------------------------------+------------+ +| `OGC API - Connected Systems - Part 2: Dynamic Data`_ | draft | ++--------------------------------------------------------------------------------------+------------+ .. _`OGC WMS`: https://www.opengeospatial.org/standards/wms .. _`OGC WFS`: https://www.opengeospatial.org/standards/wfs @@ -97,4 +101,6 @@ OGC API Support .. _`OGC API - Features - Part 4: Create, Replace, Update and Delete`: https://docs.ogc.org/DRAFTS/20-002.html .. _`OGC API - Coverages - Part 1: Core`: https://docs.ogc.org/DRAFTS/19-087.html .. _`OGC API - Processes - Part 1: Core`: https://docs.ogc.org/is/18-062r2/18-062r2.html +.. _`OGC API - Connected Systems - Part 1: Feature Resources`: https://docs.ogc.org/DRAFTS/23-001r0.html +.. _`OGC API - Connected Systems - Part 2: Dynamic Data`: https://docs.ogc.org/DRAFTS/23-002r0.html .. _`OpenSearch`: https://github.com/dewitt/opensearch diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 24fa631b..00ab05e8 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -363,6 +363,86 @@ OGC API - Environmental Data Retrieval - Part 1: Core 1.0 >>> icoads_sst = m.collection('icoads-sst') >>> data = e.query_data('icoads_sst', 'position', coords='POINT(-75 45)', parameter_names=['SST', 'AIRT']) +OGC API - Connected Systems - Part 1: Feature Resources & Part 2: Dynamic Data +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. note:: + The library covers all of parts 1 and 2, the example below is a short overview of the functionality. + All CRUD operations are performed in a very similar manner. Please see the Connected Systems API docs + for the full lists of properties, encoding requirements and expected responses. + +.. code-block:: python + + >>> from owslib.ogcapi.connectedsystems import Systems, Datastreams, Observations + >>> s = Systems('http://localhost:5000', auth=('user', 'password'), headers={'Content-Type': 'application/sml+json'}) + >>> ds = Datastreams('http://localhost:5000', auth=('user', 'password'), headers={'Content-Type': 'application/json'}) + >>> obs = Observations('http://localhost:5000', auth=('user', 'password'), headers={'Content-Type': 'application/json'}) + # insert a new system, datastream and observation + >>> system_info = { + >>> "type": "SimpleProcess", + >>> "uniqueId": "urn:osh:sensor:testsmlsensor:001", + >>> "label": "Test SML Sensor", + >>> "description": "A Sensor created from an SML document", + >>> "definition": "http://www.w3.org/ns/ssn/Sensor" + >>> } + >>> ds_definition = { + >>> "name": "Test Datastream", + >>> "outputName": "Test Output #1", + >>> "schema": { + >>> "obsFormat": "application/swe+json", + >>> "encoding": { + >>> "type": "JSONEncoding", + >>> "vectorAsArrays": False + >>> }, + >>> "recordSchema": { + >>> "type": "DataRecord", + >>> "label": "Test Datastream Record", + >>> "updatable": False, + >>> "optional": False, + >>> "definition": "http://test.com/Record", + >>> "fields": [ + >>> { + >>> "type": "Time", + >>> "label": "Test Datastream Time", + >>> "updatable": False, + >>> "optional": False, + >>> "definition": "http://test.com/Time", + >>> "name": "timestamp", + >>> "uom": { + >>> "href": "http://test.com/TimeUOM" + >>> } + >>> }, + >>> { + >>> "type": "Boolean", + >>> "label": "Test Datastream Boolean", + >>> "updatable": False, + >>> "optional": False, + >>> "definition": "http://test.com/Boolean", + >>> "name": "testboolean" + >>> } + >>> ] + >>> } + >>> } + >>> } + >>> observation = { + >>> "phenomenonTime": the_time, + >>> "resultTime": the_time, + >>> "result": { + >>> "timestamp": datetime.now().timestamp() * 1000, + >>> "testboolean": True + >>> } + >>> } + >>> s.create_system(system_info) + >>> system_id = s.resource_headers['Location'][0].split('/')[-1] + >>> ds.create_datastream(system_id, ds_definition) + >>> ds_id = ds.resource_headers['Location'][0].split('/')[-1] + >>> obs.create_observation(ds_id, observation) + >>> obs_id = obs.resource_headers['Location'][0].split('/')[-1] + >>> # retrieve the observations of our datastream + >>> observations = obs.get_observations(ds_id)['items'] + >>> + + WCS --- diff --git a/owslib/ogcapi/__init__.py b/owslib/ogcapi/__init__.py index 4e77fc7b..d34785c2 100644 --- a/owslib/ogcapi/__init__.py +++ b/owslib/ogcapi/__init__.py @@ -179,7 +179,7 @@ def _request(self, method: str = 'GET', path: str = None, response = http_post(url, headers=self.headers, request=data, auth=self.auth) elif method == 'PUT': - response = http_put(url, data=data, auth=self.auth) + response = http_put(url, headers=self.headers, data=data, auth=self.auth) elif method == 'DELETE': response = http_delete(url, auth=self.auth) diff --git a/owslib/ogcapi/connectedsystems.py b/owslib/ogcapi/connectedsystems.py new file mode 100644 index 00000000..fe996702 --- /dev/null +++ b/owslib/ogcapi/connectedsystems.py @@ -0,0 +1,1245 @@ +# ============================================================================== +# Copyright (c) 2024 Ian Patterson +# +# Author: Ian Patterson +# +# Contact email: ian@botts-inc.com +# ============================================================================== +import logging + +from owslib.ogcapi import API, Collections +from owslib.util import (Authentication) + +LOGGER = logging.getLogger(__name__) + + +class ConnectedSystems(Collections): + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = API.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def _request(self, method: str = 'GET', path: str = None, + data: str = None, as_dict: bool = True, kwargs: dict = {}) -> dict: + res = super()._request(method, path, data, as_dict, kwargs) + return res + + +class Systems(ConnectedSystems): + """Abstraction for OGC API - Connected Systems - Systems""" + + def __init__(self, url: str, json_: str = None, timeout: int = 30, + headers: dict = None, auth: Authentication = None): + __doc__ = Collections.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def system_collections(self) -> list: + """ + implements /collections filtered on systems + + @returns: `list` of filtered collections object + """ + + systems_ = [] + collections_ = super().collections() + + for c_ in collections_['collections']: + if 'itemType' in c_ and c_['itemType'].lower() == 'system': + systems_.append(c_['id']) + + return systems_ + + def collection_queryables(self, collection_id: str) -> dict: + """ + implements /collections/{collectionId}/queryables + + @type collection_id: string + @param collection_id: id of collection + + @returns: `dict` of system collection queryables + """ + + path = f'collections/{collection_id}/queryables' + return self._request(path=path) + + def collection_items(self, collection_id: str, **kwargs: dict) -> dict: + """ + implements /collection/{collectionId}/items + + @type collection_id: string + @param collection_id: id of collection + + @returns: system collection results + """ + + path = f'collections/{collection_id}/items' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'uid', 'bbox', 'datetime', 'geom', 'q', 'parent', 'procedure', 'foi', 'observedProperty', + 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def collection_item(self, collection_id: str, item_id: str) -> dict: + """ + implements /collection/{collectionId}/items/{itemId} + + @type collection_id: string + @param collection_id: id of collection + @type item_id: string + @param item_id: id of item + + @returns: system collection item result + """ + + path = f'collections/{collection_id}/items/{item_id}' + return self._request(path=path) + + def collection_item_create(self, collection_id: str, data: str) -> dict: + """ + implements POST /collection/{collectionId}/items + + @type collection_id: string + @param collection_id: id of collection + @type data: string + @param data: raw representation of data + + @returns: single item result + """ + + path = f'collections/{collection_id}/items' + return self._request(method='POST', path=path, data=data) + + def systems(self, **kwargs) -> dict: + """ + implements /systems + + @returns: `dict` of systems object + """ + + path = 'systems' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'uid', 'bbox', 'datetime', 'geom', 'q', 'parent', 'procedure', 'foi', 'observedProperty', + 'controlledProperty', 'recursive', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def system(self, system_id: str, **kwargs) -> dict: + """ + implements /systems/{systemId} + + @type system_id: string + @param system_id: id of system + + @returns: `dict` of system metadata + """ + + path = f'systems/{system_id}' + query_params = QueryArgs(**kwargs) + p_list = ['datetime'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def system_create(self, data: str) -> dict: + """ + implements /systems/{systemId} + + @type data: string + @param data: system data + + @returns: `dict` of system metadata + """ + + path = 'systems/' + + return self._request(path=path, method='POST', data=data) + + def system_update(self, system_id: str, data: str) -> dict: + """ + implements /systems/{systemId} + + @type system_id: string + @param system_id: id of system + @type data: string + @param data: system data + + @returns: `dict` of system metadata + """ + + path = f'systems/{system_id}' + return self._request(path=path, method='PUT', data=data) + + def system_delete(self, system_id: str) -> dict: + """ + implements /systems/{systemId} + + @type system_id: string + @param system_id: id of system + + @returns: `dict` of deletion result, should be empty + """ + + path = f'systems/{system_id}' + return self._request(path=path, method='DELETE') + + def system_components(self, system_id: str, **kwargs) -> dict: + """ + implements /systems/{systemId}/components + + @type system_id: string + @param system_id: id of system + + @returns: `dict` of system components + """ + + path = f'systems/{system_id}/components' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'parent', 'procedure', 'foi', 'observedProperty', + 'controlledProperty', 'recursive', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def system_components_create(self, system_id: str, data: str) -> dict: + """ + implements POST /systems/{systemId}/components + + @type system_id: string + @param system_id: id of system + @type data: string + @param data: raw representation of data + + @returns: single component result + """ + + path = f'systems/{system_id}/components' + return self._request(method='POST', path=path, data=data) + + def system_deployments(self, system_id: str) -> dict: + """ + implements /systems/{systemId}/deployments + + @type system_id: string + @param system_id: id of system + + @returns: `dict` of system deployments + """ + + path = f'systems/{system_id}/deployments' + return self._request(path=path) + + def system_sampling_features(self, system_id: str) -> dict: + """ + implements /systems/{systemId}/samplingFeatures + + @type system_id: string + @param system_id: id of system + + @returns: `dict` of system sampling features + """ + + path = f'systems/{system_id}/samplingFeatures' + return self._request(path=path) + + +class Procedures(ConnectedSystems): + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = API.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def procedures(self, **kwargs) -> dict: + """ + implements /procedures + @returns: `dict` of procedures object + """ + + path = 'procedures' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'datetime', 'q', 'observedProperty', 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def procedure(self, procedure_id: str, **kwargs) -> dict: + """ + implements /procedures/{procedureId} + @type procedure_id: string + @param procedure_id: id of procedure + @returns: `dict` of procedure metadata + """ + + path = f'procedures/{procedure_id}' + query_params = QueryArgs(**kwargs) + p_list = ['type', 'id', 'properties', 'geometry', 'bbox', 'links'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def procedure_create(self, data: str) -> dict: + """ + implements /procedures + @type data: dict + @param data: JSON object + @returns: `dict` of procedure metadata + """ + + path = 'procedures' + return self._request(path=path, data=data, method='POST') + + def procedure_update(self, procedure_id: str, data: str) -> dict: + """ + implements /procedures/{procedureId} + @type procedure_id: string + @param procedure_id: id of procedure + @type data: dict + @param data: JSON object + @returns: `dict` of procedure metadata + """ + + path = f'procedures/{procedure_id}' + return self._request(path=path, data=data, method='PUT') + + def procedure_delete(self, procedure_id: str) -> dict: + """ + implements /procedures/{procedureId} + @type procedure_id: string + @param procedure_id: id of procedure + @returns: `dict` of procedure metadata + """ + + path = f'procedures/{procedure_id}' + return self._request(path=path, method='DELETE') + + +class Deployments(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = API.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def deployments(self, **kwargs) -> dict: + """ implements /deployments + @returns: `dict` of deployments object + """ + path = 'deployments' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'system', 'foi', 'observedProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def deployment(self, deployment_id: str, **kwargs) -> dict: + """ implements /deployments/{deploymentId} + @type deployment_id: string + @param deployment_id: id of deployment + @returns: `dict` of deployment metadata + """ + path = f'deployments/{deployment_id}' + query_params = QueryArgs(**kwargs) + p_list = ['type', 'id', 'properties', 'geometry', 'bbox', 'links'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def deployment_create(self, data: str) -> bool: + """ implements /deployments + @type data: dict + @param data: JSON object + @returns: `dict` of deployment metadata + """ + path = 'deployments' + _ = self._request(path=path, data=data, method='POST') + + return True + + def deployment_update(self, deployment_id: str, data: str) -> bool: + """ implements /deployments/{deploymentId} + @type deployment_id: string + @param deployment_id: id of deployment + @type data: dict + @param data: JSON object + @returns: `dict` of deployment metadata + """ + path = f'deployments/{deployment_id}' + _ = self._request(path=path, data=data, method='PUT') + + return True + + def deployment_delete(self, deployment_id: str) -> bool: + """ implements /deployments/{deploymentId} + @type deployment_id: string + @param deployment_id: id of deployment + @returns: `dict` of deployment metadata + """ + path = f'deployments/{deployment_id}' + _ = self._request(path=path, method='DELETE') + + return True + + def deployment_list_deployed_systems(self, deployment_id: str, **kwargs) -> dict: + """ implements /deployments/{deploymentId}/systems + @type deployment_id: string + @param deployment_id: id of deployment + @returns: `dict` of systems in a particular deployment + """ + path = f'deployments/{deployment_id}/systems' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'procedure', 'foi', 'observedProperty', + 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def deployment_add_systems_to_deployment(self, deployment_id: str, data: str, use_member_endpoint=False) -> bool: + """ implements /deployments/{deploymentId}/systems + @type deployment_id: string + @param deployment_id: id of deployment + @type data: dict + @param data: JSON object + @param use_member_endpoint: + @returns: `dict` of systems in a particular deployment + """ + + if use_member_endpoint: + path = f'deployments/{deployment_id}/members' + else: + path = f'deployments/{deployment_id}/systems' + _ = self._request(path=path, data=data, method='POST') + + return True + + def deployment_retrieve_system_from_deployment(self, deployment_id: str, system_id: str, **kwargs) -> dict: + """ implements /deployments/{deploymentId}/systems/{systemId} + @type deployment_id: string + @param deployment_id: id of deployment + @type system_id: string + @param system_id: id of system + @returns: `dict` of system metadata + """ + path = f'deployments/{deployment_id}/systems/{system_id}' + query_params = QueryArgs(**kwargs) + p_list = ['type', 'id', 'properties', 'geometry', 'bbox'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def deployment_update_system_in_deployment(self, deployment_id: str, system_id: str, data: str) -> bool: + """ implements /deployments/{deploymentId}/systems/{systemId} + @type deployment_id: string + @param deployment_id: id of deployment + @type system_id: string + @param system_id: id of system + @type data: dict + @param data: JSON object + @returns: `dict` of system metadata + """ + path = f'deployments/{deployment_id}/systems/{system_id}' + _ = self._request(path=path, data=data, method='PUT') + + return True + + def deployment_delete_system_in_deployment(self, deployment_id: str, system_id: str) -> bool: + """ implements /deployments/{deploymentId}/systems/{systemId} + @type deployment_id: string + @param deployment_id: id of deployment + @type system_id: string + @param system_id: id of system + @returns: `dict` of system metadata + """ + path = f'deployments/{deployment_id}/systems/{system_id}' + _ = self._request(path=path, method='DELETE') + + return True + + def deployment_list_deployments_of_system(self, system_id: str, **kwargs) -> dict: + """ implements /systems/{systemId}/deployments + @type system_id: string + @param system_id: id of system + @returns: `dict` of deployments of a particular system + """ + path = f'systems/{system_id}/deployments' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'foi', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + +class SamplingFeatures(ConnectedSystems): + alternate_sampling_feature_url = None + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None, alternate_sampling_feature_url: str = None): + self.alternate_sampling_feature_url = alternate_sampling_feature_url + __doc__ = API.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def sampling_features(self, use_fois=False, **kwargs) -> dict: + """ + implements /samplingFeatures + @returns: `dict` of sampling features object + """ + + path = 'samplingFeatures' + if use_fois: + path = 'fois' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'foi', 'observedProperty', + 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def sampling_feature(self, sampling_feature_id: str, use_fois=False, **kwargs) -> dict: + """ + implements /samplingFeatures/{samplingFeatureId} + @type sampling_feature_id: string + @param sampling_feature_id: id of sampling feature + @returns: `dict` of sampling feature metadata + """ + + path = f'samplingFeatures/{sampling_feature_id}' + if use_fois: + path = f'fois/{sampling_feature_id}' + query_params = QueryArgs(**kwargs) + p_list = ['geometry', 'type', 'id', 'properties', 'bbox', 'links'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def sampling_features_from_system(self, system_id: str, use_fois=False, **kwargs) -> dict: + """ + implements /systems/{systemId}/samplingFeatures + @type system_id: string + @param system_id: id of system + @type use_fois: bool + @param use_fois: use FOIs instead of sampling features in the path + @returns: `dict` of sampling feature metadata + """ + + path = f'systems/{system_id}/samplingFeatures' + if use_fois: + path = f'systems/{system_id}/fois' + + query_params = QueryArgs(**kwargs) + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'foi', 'observedProperty', + 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def sampling_feature_create(self, system_id: str, data: str, use_fois: bool = False) -> dict: + """ + implements /systems/{systemId}/samplingFeatures + @type system_id: string + @param system_id: id of system to insert sampling feature into + @type data: dict + @param data: JSON object + @type use_fois: bool + @param use_fois: use FOIs instead of samplingFeatures in the path + @returns: `dict` of sampling feature metadata + """ + + path = f'systems/{system_id}/samplingFeatures' + if use_fois: + path = f'systems/{system_id}/fois' + + return self._request(path=path, data=data, method='POST') + + def sampling_feature_update(self, sampling_feature_id: str, data: str, use_fois=False) -> dict: + """ + implements /samplingFeatures/{samplingFeatureId} + @type sampling_feature_id: string + @param sampling_feature_id: id of sampling feature + @type data: dict + @param data: JSON object + @returns: `dict` of sampling feature metadata + """ + + path = f'samplingFeatures/{sampling_feature_id}' + if use_fois: + path = f'fois/{sampling_feature_id}' + return self._request(path=path, data=data, method='PUT') + + def sampling_feature_delete(self, sampling_feature_id: str, use_fois=False) -> dict: + """ + implements /samplingFeatures/{samplingFeatureId} + @type sampling_feature_id: string + @param sampling_feature_id: id of sampling feature + @returns: `dict` of sampling feature metadata + @param use_fois: + """ + + path = f'samplingFeatures/{sampling_feature_id}' + if use_fois: + path = f'fois/{sampling_feature_id}' + return self._request(path=path, method='DELETE') + + +class Properties(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = API.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def properties(self, **kwargs) -> dict: + """ + implements /properties + @returns: `dict` of properties object + """ + + path = 'properties' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'q', 'baseProperty', 'objectType', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def property(self, property_id: str, **kwargs) -> dict: + """ + implements /properties/{propertyId} + @type property_id: string + @param property_id: id of property + @returns: `dict` of property metadata + """ + + path = f'properties/{property_id}' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'label', 'description', 'uniqueId', 'baseProperty', 'objectType', 'statistic', 'qualifiers', + 'links'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def property_create(self, data: str) -> dict: + """ + implements /properties + @type data: dict + @param data: JSON object + @returns: `dict` of property metadata + """ + + path = 'properties' + return self._request(path=path, data=data, method='POST') + + def property_update(self, property_id: str, data: str) -> dict: + """ + implements /properties/{propertyId} + @type property_id: string + @param property_id: id of property + @type data: dict + @param data: JSON object + @returns: `dict` of property metadata + """ + + path = f'properties/{property_id}' + return self._request(path=path, data=data, method='PUT') + + def property_delete(self, property_id: str) -> dict: + """ + implements /properties/{propertyId} + @type property_id: string + @param property_id: id of property + @returns: `dict` of property metadata + """ + + path = f'properties/{property_id}' + return self._request(path=path, method='DELETE') + + +class Datastreams(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = Collections.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def datastreams(self, **kwargs) -> dict: + """ + implements /datastreams + @returns: `dict` of datastreams object + """ + + path = 'datastreams' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'q', 'phenomenonTime', 'resultTime', 'system', 'foi', 'observedProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def datastream(self, datastream_id: str) -> dict: + """ + implements /datastreams/{datastreamId} + @type datastream_id: string + @param datastream_id: id of datastream + @returns: `dict` of datastream metadata + """ + + path = f'datastreams/{datastream_id}' + + return self._request(path=path) + + def datastreams_of_system(self, system_id: str, **kwargs) -> dict: + """ + implements /systems/{systemId}/datastreams + @type system_id: string + @param system_id: id of system + @returns: `dict` of datastream metadata + """ + + path = f'systems/{system_id}/datastreams' + query_params = QueryArgs(**kwargs) + p_list = ['phenomenonTime', 'resultTime', 'q', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def datastream_create_in_system(self, system_id: str, data: str) -> dict: + """ + implements /datastreams + @type system_id: string + @param system_id: id of system + @type data: dict + @param data: JSON object + @returns: `dict` of datastream metadata + """ + + path = f'systems/{system_id}/datastreams' + + return self._request(path=path, data=data, method='POST') + + def datastream_update_description(self, datastream_id: str, data: str) -> dict: + """ + implements /datastreams/{datastreamId} + @type datastream_id: string + @param datastream_id: id of datastream + @type data: dict + @param data: JSON object + @returns: `dict` of datastream metadata + """ + + path = f'datastreams/{datastream_id}' + return self._request(path=path, data=data, method='PUT') + + def datastream_delete(self, datastream_id: str) -> dict: + """ + implements /datastreams/{datastreamId} + @type datastream_id: string + @param datastream_id: id of datastream + @returns: `dict` of datastream metadata + """ + + path = f'datastreams/{datastream_id}' + return self._request(path=path, method='DELETE') + + def datastream_retrieve_schema_for_format(self, datastream_id: str, **kwargs) -> dict: + """ + implements /datastreams/{datastreamId}/schema + @type datastream_id: string + @param datastream_id: id of datastream + @type obs_format: string + @param obs_format: observation format + @type type_: string + @param type_: type of schema (one of: 'view', 'create', 'replace', 'update' + + @returns: `dict` of schema + """ + + path = f'datastreams/{datastream_id}/schema' + query_params = QueryArgs(**kwargs) + p_list = ['obsFormat', 'type'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def datastream_update_schema_for_format(self, datastream_id: str, data: str) -> dict: + """ + implements /datastreams/{datastreamId}/schema + @type datastream_id: string + @param datastream_id: id of datastream + @type data: dict + @param data: JSON object + + @returns: `dict` of schema + """ + + path = f'datastreams/{datastream_id}/schema' + return self._request(path=path, data=data, method='PUT') + + +class Observations(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = Collections.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def observations(self, **kwargs) -> dict: + """ + implements /observations + @returns: `dict` of observations object + """ + + path = 'observations' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'phenomenonTime', 'resultTime', 'system', 'foi', 'observedProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def observation(self, observation_id: str) -> dict: + """ + implements /observations/{observationId} + @type observation_id: string + @param observation_id: id of observation + @returns: `dict` of observation metadata + """ + + path = f'observations/{observation_id}' + return self._request(path=path) + + def observations_of_datastream(self, datastream_id: str, **kwargs) -> dict: + """ + implements /datastreams/{datastreamId}/observations + @type datastream_id: string + @param datastream_id: id of datastream + @returns: `dict` of observations object + """ + + path = f'datastreams/{datastream_id}/observations' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'phenomenonTime', 'resultTime', 'foi', 'observedProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def observations_create_in_datastream(self, datastream_id: str, data: str) -> dict: + """ + implements /observations + @type datastream_id: string + @param datastream_id: id of datastream + @type data: dict + @param data: JSON object + @returns: `dict` of observation metadata + """ + + path = f'datastreams/{datastream_id}/observations' + return self._request(path=path, data=data, method='POST') + + def observations_update(self, observation_id: str, data: str) -> dict: + """ + implements /observations/{observationId} + @type observation_id: string + @param observation_id: id of observation + @type data: dict + @param data: JSON object + @returns: `dict` of observation metadata + """ + + path = f'observations/{observation_id}' + return self._request(path=path, data=data, method='PUT') + + def observations_delete(self, observation_id: str) -> dict: + """ + implements /observations/{observationId} + @type observation_id: string + @param observation_id: id of observation + @returns: `dict` of observation metadata + """ + path = f'observations/{observation_id}' + return self._request(path=path, method='DELETE') + + +class ControlChannels(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = Collections.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def controls(self, **kwargs) -> dict: + """ + implements /controls + @returns: `dict` of control channel objects + """ + + path = 'controls' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'q', 'issueTime', 'executionTime', 'system', 'foi', 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def control(self, control_id: str) -> dict: + """ + implements /controls/{control_id} + @type control_id: string + @param control_id: id of control channel + @returns: `dict` of control channels + """ + + path = f'controls/{control_id}' + return self._request(path=path) + + def controls_of_system(self, system_id: str, **kwargs) -> dict: + """ + implements /systems/{system_id}/controls + @type system_id: string + @param system_id: id of system + @returns: `dict` of control channels + """ + + path = f'systems/{system_id}/controls' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'q', 'issueTime', 'executionTime', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def control_create_in_system(self, system_id: str, data: str) -> dict: + """ + implements /controls + @type system_id: string + @param system_id: id of system + @type data: dict + @param data: JSON object + @returns: `dict` of control channels + """ + + path = f'systems/{system_id}/controls' + return self._request(path=path, data=data, method='POST') + + def control_update(self, control_id: str, data: str) -> dict: + """ + implements /controls/{control_id} + @type control_id: string + @param control_id: id of control channel + @type data: dict + @param data: JSON object + @returns: `dict` of control channels + """ + + path = f'controls/{control_id}' + return self._request(path=path, data=data, method='PUT') + + def control_delete(self, control_id: str) -> dict: + """ + implements /controls/{control_id} + @type control_id: string + @param control_id: id of control channel + @returns: `dict` of control channels + """ + + path = f'controls/{control_id}' + return self._request(path=path, method='DELETE') + + def control_retrieve_schema(self, control_id: str, **kwargs) -> dict: + """ + implements /controls/{control_id}/schema + @type control_id: string + @param control_id: id of control channel + @returns: `dict` of control channels + """ + + path = f'controls/{control_id}/schema' + query_params = QueryArgs(**kwargs) + p_list = ['cmdFormat', 'type'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def control_update_schema(self, control_id: str, data: str) -> dict: + """ + implements /controls/{control_id}/schema + @type control_id: string + @param control_id: id of control channel + @type data: dict + @param data: JSON object + @returns: `dict` of control channels + """ + + path = f'controls/{control_id}/schema' + return self._request(path=path, data=data, method='PUT') + + +class Commands(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = Collections.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def commands(self, **kwargs) -> dict: + """ + implements /commands + @returns: `dict` of commands object + """ + + path = 'commands' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'issueTime', 'executionTime', 'system', 'foi', 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def command(self, command_id: str) -> dict: + """ + implements /commands/{commandId} + @type command_id: string + @param command_id: id of command + @returns: `dict` of command metadata + """ + + path = f'commands/{command_id}' + return self._request(path=path) + + def commands_of_control_channel(self, control_id: str, **kwargs) -> dict: + """ + implements /controls/{control_id}/commands + @type control_id: string + @param control_id: id of control channel + @returns: `dict` of commands object + """ + + path = f'controls/{control_id}/commands' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'issueTime', 'executionTime', 'foi', 'controlledProperty', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def commands_send_command_in_control_stream(self, control_id: str, data: str) -> dict: + """ + implements /commands + @type control_id: string + @param control_id: id of control channel + @type data: dict + @param data: JSON object + @returns: `dict` of command metadata + """ + + path = f'controls/{control_id}/commands' + return self._request(path=path, data=data, method='POST') + + def commands_delete_command(self, command_id: str) -> dict: + """ + implements /commands/{commandId} + @type command_id: string + @param command_id: id of command + @returns: `dict` of command metadata + """ + + path = f'commands/{command_id}' + return self._request(path=path, method='DELETE') + + def commands_add_status_report(self, command_id: str, data: str) -> dict: + """ + implements /commands/{commandId}/status + @type command_id: string + @param command_id: id of command + @type data: dict + @param data: JSON object + @returns: `dict` of command metadata + """ + + path = f'commands/{command_id}/status' + return self._request(path=path, data=data, method='POST') + + def commands_retrieve_status_report(self, command_id: str, status_id: str, **kwargs) -> dict: + """ + implements /commands/{commandId}/status/{statusId} + @type command_id: string + @param command_id: id of command + @type status_id: string + @param status_id: id of status + @returns: `dict` of command metadata + """ + + path = f'commands/{command_id}/status/{status_id}' + query_params = QueryArgs(**kwargs) + p_list = ['id', 'reportTime', 'executionTime', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def commands_update_status_report(self, command_id: str, status_id: str, data: str) -> dict: + """ + implements /commands/{commandId}/status/{statusId} + @type command_id: string + @param command_id: id of command + @type status_id: string + @param status_id: id of status + @type data: dict + @param data: JSON object + @returns: `dict` of command metadata + """ + + path = f'commands/{command_id}/status/{status_id}' + return self._request(path=path, data=data, method='PUT') + + def commands_delete_status_report(self, command_id: str, status_id: str) -> dict: + """ + implements /commands/{commandId}/status/{statusId} + @type command_id: string + @param command_id: id of command + @type status_id: string + @param status_id: id of status + @returns: `dict` of command metadata + """ + + path = f'commands/{command_id}/status/{status_id}' + return self._request(path=path, method='DELETE') + + +class SystemEvents(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = Collections.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth=auth) + + def system_events(self, **kwargs) -> dict: + """ + implements /systemEvents + @returns: `dict` of system events object + """ + + path = 'systemEvents' + query_params = QueryArgs(**kwargs) + p_list = ['eventTime', 'q', 'limit'] + return self._request(path=path, method='GET', kwargs=query_params.check_params(p_list)) + + def system_events_of_specific_system(self, system_id: str, **kwargs) -> dict: + """ + implements /systems/{systemId}/events + @type system_id: string + @param system_id: id of system + @returns: `dict` of system events object + """ + + path = f'systems/{system_id}/events' + query_params = QueryArgs(**kwargs) + p_list = ['eventTime', 'q', 'limit'] + return self._request(path=path, method='GET', kwargs=query_params.check_params(p_list)) + + def system_event_add_se_to_system(self, system_id: str, data: str) -> dict: + """ + implements /systems/{systemId}/events + @type system_id: string + @param system_id: id of system + @type data: dict + @param data: JSON object + @returns: `dict` of system event metadata + """ + + path = f'systems/{system_id}/events' + return self._request(path=path, data=data, method='POST') + + def system_event(self, system_id: str, event_id: str) -> dict: + """ + implements /systems/{systemId}/events/{event_id} + @type system_id: string + @param system_id: id of system + @type event_id: string + @param event_id: id of system event + @returns: `dict` of system event metadata + """ + + path = f'systems/{system_id}/events/{event_id}' + return self._request(path=path) + + def system_event_update(self, system_id: str, event_id: str, data: str) -> dict: + """ + implements /systems/{systemId}/events/{event_id} + @type system_id: string + @param system_id: id of system + @type event_id: string + @param event_id: id of system event + @type data: dict + @param data: JSON object + @returns: `dict` of system event metadata + """ + + path = f'systems/{system_id}/events/{event_id}' + return self._request(path=path, data=data, method='PUT') + + def system_event_delete(self, system_id: str, event_id: str) -> dict: + """ + implements /systems/{systemId}/events/{event_id} + @type system_id: string + @param system_id: id of system + @type event_id: string + @param event_id: id of system event + @returns: `dict` of system event metadata + """ + + path = f'systems/{system_id}/events/{event_id}' + return self._request(path=path, method='DELETE') + + +class SystemHistory(ConnectedSystems): + + def __init__(self, url: str, json_: str = None, timeout: int = 30, headers: dict = None, + auth: Authentication = None): + __doc__ = Collections.__doc__ # noqa + super().__init__(url, json_, timeout, headers, auth) + + def system_history(self, system_id: str, **kwargs) -> dict: + """ + implements /systems/{system_id}/history + @type system_id: string + @param system_id: id of system + @returns: `dict` of system history + """ + + path = f'systems/{system_id}/history' + query_params = QueryArgs(**kwargs) + p_list = ['validTime', 'q', 'limit'] + return self._request(path=path, kwargs=query_params.check_params(p_list)) + + def system_history_by_id(self, system_id: str, history_id: str) -> dict: + """ + implements /systems/{system_id}/history/{history_id} + @type system_id: string + @param system_id: id of system + @type history_id: string + @param history_id: id of history + @returns: `dict` of system history + """ + + path = f'systems/{system_id}/history/{history_id}' + return self._request(path=path) + + def system_history_update_description(self, system_id: str, history_id: str, data: str) -> dict: + """ + implements /systems/{system_id}/history/{history_id} + @type system_id: string + @param system_id: id of system + @type history_id: string + @param history_id: id of history + @type data: dict + @param data: JSON object + @returns: `dict` of system history + """ + + path = f'systems/{system_id}/history/{history_id}' + return self._request(path=path, data=data, method='PUT') + + def system_history_delete(self, system_id: str, history_id: str) -> dict: + """ + implements /systems/{system_id}/history/{history_id} + @type system_id: string + @param system_id: id of system + @type history_id: string + @param history_id: id of history + @returns: `dict` of system history + """ + + path = f'systems/{system_id}/history/{history_id}' + return self._request(path=path, method='DELETE') + + +class QueryArgs: + + def __init__(self, **kwargs): + self.params = {} + if 'id' in kwargs: + self.params['id'] = kwargs['id'] + if 'uid' in kwargs: + self.params['uid'] = kwargs['uid'] + if 'bbox' in kwargs: + self.params['bbox'] = ','.join(list(map(str, kwargs['bbox']))) + if 'datetime' in kwargs: + self.params['datetime'] = kwargs['datetime'] + if 'geom' in kwargs: + self.params['geom'] = kwargs['geom'] + if 'q' in kwargs: + self.params['q'] = ','.join(list(map(str, kwargs['q']))) + if 'procedure' in kwargs: + self.params['procedure'] = kwargs['procedure'] + if 'parent' in kwargs: + self.params.parent = ','.join(list(map(str, kwargs['parent']))) + if 'foi' in kwargs: + self.params['foi'] = ','.join(list(map(str, kwargs['foi']))) + if 'observedProperty' in kwargs: + self.params['observedProperty'] = ','.join(list(map(str, kwargs['observedProperty']))) + if 'controlledProperty' in kwargs: + self.params['controlledProperty'] = ','.join(list(map(str, kwargs['controlledProperty']))) + if 'recursive' in kwargs: + self.params['recursive'] = kwargs['recursive'] + if 'limit' in kwargs: + self.params['limit'] = kwargs['limit'] + if 'system' in kwargs: + self.params['system'] = ','.join(list(map(str, kwargs['system']))) + + """ + Validation methods for query parameters + """ + + def v_sys_req_params(self): + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'parent', 'procedure', 'foi', 'observedProperty', + 'controlledProperty', 'recursive', 'limit', 'system'] + return self.check_params(p_list) + + def v_sys_list_system_deployment_params(self): + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'foi', 'limit'] + return self.check_params(p_list) + + def v_sys_list_system_sampling_feature_params(self): + p_list = ['id', 'bbox', 'datetime', 'geom', 'q', 'foi', 'observedProperty', 'controlledProperty', 'limit'] + return self.check_params(p_list) + + def check_params(self, param_list): + q_params = {k: v for k, v in self.params.items() if k in param_list} + return q_params diff --git a/setup.py b/setup.py index ed1d6ecc..5e3200ba 100644 --- a/setup.py +++ b/setup.py @@ -80,7 +80,8 @@ def get_package_version(): 'wcs', 'capabilities', 'metadata', - 'wmts' + 'wmts', + 'connectedsystems' ]), author='Sean Gillies', author_email='sean.gillies@gmail.com', diff --git a/tests/test_ogcapi_connectedsystems_osh.py b/tests/test_ogcapi_connectedsystems_osh.py new file mode 100644 index 00000000..e2442844 --- /dev/null +++ b/tests/test_ogcapi_connectedsystems_osh.py @@ -0,0 +1,428 @@ +# ============================================================================== +# Copyright (c) 2024 Ian Patterson +# +# Author: Ian Patterson +# +# Contact email: ian@botts-inc.com +# ============================================================================== + +from datetime import datetime +import json + +import pytest + +from owslib.ogcapi.connectedsystems import Commands, ControlChannels, Datastreams, Deployments, Observations, \ + Properties, SamplingFeatures, SystemEvents, SystemHistory, Systems +from owslib.util import Authentication + + +class OSHFixtures: + NODE_TEST_OK_URL = 'http://34.67.197.57:8585/sensorhub/test' + # Directs to OSH hosted test server + TEST_URL = 'http://34.67.197.57:8585/sensorhub/api/' + auth = Authentication('auto_test', 'automated_tester24') + sml_headers = {'Content-Type': 'application/sml+json'} + json_headers = {'Content-Type': 'application/json'} + geojson_headers = {'Content-Type': 'application/geo+json'} + omjson_headers = {'Content-Type': 'application/om+json'} + + system_definitions = [ + { + "type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testsmlsensor:001", + "label": "Test SML Sensor", + "description": "A Sensor created from an SML document", + "definition": "http://www.w3.org/ns/ssn/Sensor" + }, + { + "type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testsmlsensor:002", + "label": "Test SML Sensor #2", + "description": "A Sensor created from an SML document", + "definition": "http://www.w3.org/ns/ssn/Sensor" + } + ] + + sys_sml_to_update = { + "type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testsmlsensor:001", + "label": "Test SML Sensor", + "description": "A Sensor created from an SML document", + "definition": "http://www.w3.org/ns/ssn/Sensor" + } + + sys_sml_def = { + "type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testsmlsensor:solo", + "label": "Test SML Sensor - Created on its own", + "description": "A Sensor created from an SML document", + "definition": "http://www.w3.org/ns/ssn/Sensor" + } + + sml_component = { + "type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testcomponent:001", + "label": "Test Component", + "description": "Test Component Description", + "definition": "http://www.w3.org/ns/ssn/Sensor" + } + + sml_procedure_test_system = {"type": "SimpleProcess", + "uniqueId": "urn:osh:sensor:testsensorwithcomponents:001", + "label": "Test Process/Datastream Sensor", + "description": "A Sensor created to test procedure/datastream creation", + "definition": "http://www.w3.org/ns/ssn/Sensor"} + sml_procedure = { + "type": "SimpleProcess", + "id": "123456789", + "description": "Test Procedure inserted via OWSLib", + "uniqueId": "urn:osh:sensor:testprocedureows:001", + "label": "Test Procedure - OWSLib", + "definition": "http://www.w3.org/ns/sosa/Procedure" + } + + deployment_definition = { + "type": "Feature", + "properties": { + "featureType": "http://www.w3.org/ns/sosa/Deployment", + "uid": "urn:osh:sensor:testdeployment:001", + "name": "Test Deployment 001", + "description": "A test deployment", + "validTime": ["2024-01-01T00:00:00Z", "2024-12-31T23:59:59Z"] + }, + # "geometry": "POINT(-80.0 35.0)" + } + system_id = 'blid74chqmses' + deployment_expected_id = "vssamsrio5eb2" + weatherstation_id = '0s2lbn2n1bnc8' + datastream_id = 'etbrve0msmrre' + + feature_def = { + "geometry": { + "type": "Point", + "coordinates": [-80.0, 35.0] + }, + "type": "Feature", + "properties": { + "featureType": "http://www.w3.org/ns/sosa/Station", + "uid": "urn:osh:sensor:teststation:001", + "name": "Test Station 001", + "description": "A test station", + "parentSystem@link": {"href": "http://localhost:8585/sensorhub/api/systems/blid74chqmses"}, + "sampledFeature@link": { + "href": "https://data.example.com/link/to/resource", + "rel": "alternate", + "type": "application/json", + "hreflang": "en-US", + "title": "Resource Name", + "uid": "urn:x-org:resourceType:0001", + "rt": "http://www.example.org/uri/of/concept", + "if": "http://www.opengis.net/spec/spec-id/version"} + } + } + + ds_definition = { + "name": "Test Datastream", + "outputName": "Test Output #1", + "schema": { + "obsFormat": "application/swe+json", + "encoding": { + "type": "JSONEncoding", + "vectorAsArrays": False + }, + "recordSchema": { + "type": "DataRecord", + "label": "Test Datastream Record", + "updatable": False, + "optional": False, + "definition": "http://test.com/Record", + "fields": [ + { + "type": "Time", + "label": "Test Datastream Time", + "updatable": False, + "optional": False, + "definition": "http://test.com/Time", + "name": "timestamp", + "uom": { + "href": "http://test.com/TimeUOM" + } + }, + { + "type": "Boolean", + "label": "Test Datastream Boolean", + "updatable": False, + "optional": False, + "definition": "http://test.com/Boolean", + "name": "testboolean" + } + ] + } + } + } + + systems_api = Systems(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + procedure_api = Systems(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + deployment_api = Deployments(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + sampling_feature_api = SamplingFeatures(TEST_URL, auth=auth, headers=geojson_headers, + alternate_sampling_feature_url='featuresOfInterest') + properties_api = Properties(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + datastream_api = Datastreams(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + observations_api = Observations(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + control_channels_api = ControlChannels(TEST_URL, auth=auth, + headers={'Content-Type': 'application/json'}) + commands_api = Commands(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + system_events_api = SystemEvents(TEST_URL, auth=auth, headers=omjson_headers) + system_history_api = SystemHistory(TEST_URL, auth=auth, headers={'Content-Type': 'application/json'}) + + def update_dsid(self, ds_id): + self.datastream_id = ds_id + + +class TestSystems: + fixtures = OSHFixtures() + + def test_system_readonly(self): + # get all systems + res = self.fixtures.systems_api.systems() + assert len(res['items']) > 0 + check_ids = ["0s2lbn2n1bnc8", "94n1f19ld7tlc"] + assert [any(sys_id == item['id'] for item in res['items']) for sys_id in check_ids] + + # get a single system + res = self.fixtures.systems_api.system(check_ids[0]) + assert res is not None + assert res['id'] == check_ids[0] + + @pytest.mark.skip(reason="Skip transactional test") + def test_system_functions(self): + # insertion of systems + self.fixtures.systems_api.headers = self.fixtures.sml_headers + sys_create_res = self.fixtures.systems_api.system_create(json.dumps(self.fixtures.system_definitions)) + assert sys_create_res is not None + + # update of system and retrieval + sml_desc_copy = self.fixtures.sys_sml_to_update.copy() + sml_desc_copy['description'] = 'Updated Description' + sml_str = json.dumps(sml_desc_copy) + post_systems = self.fixtures.systems_api.system_update('blid74chqmses', sml_str) + + check_result = self.fixtures.systems_api.system('blid74chqmses') + assert check_result['properties']['description'] == 'Updated Description' + + # deletion of system + all_systems = self.fixtures.systems_api.systems() + + # clear datastreams + delete_all_datastreams() + + for system in all_systems['items']: + res = self.fixtures.systems_api.system_delete(system['id']) + assert res == {} + + +class TestDeployments: + fixtures = OSHFixtures() + + @pytest.mark.skip(reason="Skip transactional test") + def test_deployment_create(self): + res1 = self.fixtures.deployment_api.deployment_create(json.dumps(self.fixtures.deployment_definition)) + assert res1 + res2 = self.fixtures.deployment_api.deployments() + assert self.fixtures.deployment_expected_id in [x['id'] for x in res2['items']] + res3 = self.fixtures.deployment_api.deployment(self.fixtures.deployment_expected_id) + assert res3['properties']['name'] == 'Test Deployment 001' and res3[ + 'id'] == self.fixtures.deployment_expected_id + + @pytest.mark.skip(reason="Skip transactional test") + def test_deployment_update(self): + self.fixtures.deployment_definition['properties']['description'] = 'Updated Description of Deployment 001' + res = self.fixtures.deployment_api.deployment_update(self.fixtures.deployment_expected_id, + json.dumps(self.fixtures.deployment_definition)) + assert res is not None + + @pytest.mark.skip(reason="Skip transactional test") + def test_deployment_delete(self): + res = self.fixtures.deployment_api.deployment_delete(self.fixtures.deployment_expected_id) + assert res is not None + + +class TestSamplingFeatures: + fixtures = OSHFixtures() + + def test_sampling_features_readonly(self): + all_features = self.fixtures.sampling_feature_api.sampling_features(use_fois=True) + assert len(all_features['items']) == 51 + + feature_id = "c4nce3peo8hvc" + feature = self.fixtures.sampling_feature_api.sampling_feature(feature_id, use_fois=True) + assert feature['id'] == feature_id + assert feature['properties']['name'] == 'Station WS013' + + @pytest.mark.skip(reason="Skip transactional test") + def test_sampling_features_all(self): + # setup + delete_all_systems() + system_id = create_single_system() + + # create a sampling feature + self.fixtures.sampling_feature_api.headers = self.fixtures.geojson_headers + res = self.fixtures.sampling_feature_api.sampling_feature_create(system_id, + json.dumps(self.fixtures.feature_def), True) + assert self.fixtures.sampling_feature_api.response_headers['Location'] is not None + sampling_feature_id = self.fixtures.sampling_feature_api.response_headers['Location'].split('/')[-1] + + # get all sampling features + res = self.fixtures.sampling_feature_api.sampling_features(use_fois=True) + assert len(res['items']) > 0 + assert any(x['id'] == sampling_feature_id for x in res['items']) + + # get the sampling feature we created + res = self.fixtures.sampling_feature_api.sampling_feature(sampling_feature_id, use_fois=True) + assert res['properties']['name'] == 'Test Station 001' + assert res['properties']['featureType'] == 'http://www.w3.org/ns/sosa/Station' + + # get sampling features from a system + res = self.fixtures.sampling_feature_api.sampling_features_from_system(system_id, use_fois=True) + assert len(res['items']) > 0 + assert any(x['id'] == sampling_feature_id for x in res['items']) + + # delete the sampling feature + res = self.fixtures.sampling_feature_api.sampling_feature_delete(sampling_feature_id, use_fois=True) + res = self.fixtures.sampling_feature_api.sampling_features(use_fois=True) + assert res == {'items': []} + + +class TestDatastreams: + fixtures = OSHFixtures() + + def test_datastreams_readonly(self): + ds_id = 'kjg2qrcm40rfk' + datastreams = self.fixtures.datastream_api.datastreams() + assert len(datastreams['items']) > 0 + assert any(x['id'] == ds_id for x in datastreams['items']) + + datastream = self.fixtures.datastream_api.datastream(ds_id) + assert datastream['id'] == ds_id + assert datastream['name'] == "Simulated Weather Station Network - weather" + + @pytest.mark.skip(reason="Skip transactional test") + def test_all_ds_functions(self): + # preflight cleanup + delete_all_systems() + # setup systems needed + self.fixtures.systems_api.headers = self.fixtures.sml_headers + # systems = self.fixtures.systems_api.system_create(json.dumps(self.fixtures.system_definitions)) + system = create_single_system() + + # insert a datastream + ds_def_str = json.dumps(self.fixtures.ds_definition) + ds_api = Datastreams(self.fixtures.TEST_URL, auth=self.fixtures.auth, headers=self.fixtures.json_headers) + datastream_create = ds_api.datastream_create_in_system(system, ds_def_str) + + # get the datastream id from Location header + ds_id = ds_api.response_headers['Location'].split('/')[-1] + ds = ds_api.datastream(ds_id) + ds2 = ds_api.datastreams_of_system(system) + assert ds['id'] == ds_id + assert any(x['id'] == ds_id for x in ds2['items']) + + # update the datastream omitted due to server error + # update schema has a similar server side issue + + # retrieve the schema for the datastream + res = ds_api.datastream_retrieve_schema_for_format(ds_id) + assert res is not None and len(res) > 0 + + # delete the datastream + ds_delete = ds_api.datastream_delete(ds_id) + assert ds_delete == {} + + +class TestObservations: + fixtures = OSHFixtures() + + def test_observations_readonly(self): + ds_id = 'kjg2qrcm40rfk' + observations = self.fixtures.observations_api.observations_of_datastream(ds_id) + assert len(observations['items']) > 0 + assert 'result' in observations['items'][0] + + observation_of_ds = self.fixtures.observations_api.observations_of_datastream(ds_id) + assert observation_of_ds['items'][0]['result']['stationID'] == "WS013" + keys = ['stationID', 'temperature', 'pressure', 'humidity', 'windSpeed', 'windDirection'] + assert [key in observation_of_ds['items'][0]['result'] for key in keys] + + @pytest.mark.skip(reason="Skip transactional test") + def test_observations(self): + # setup + delete_all_systems() + system = create_single_system() + ds = create_single_datastream(system) + the_time = datetime.utcnow().isoformat() + 'Z' + + observation = { + "phenomenonTime": the_time, + "resultTime": the_time, + "result": { + "timestamp": datetime.now().timestamp() * 1000, + "testboolean": True + } + } + self.fixtures.observations_api.headers = {'Content-Type': 'application/om+json'} + res = self.fixtures.observations_api.observations_create_in_datastream(ds, json.dumps(observation)) + obs = self.fixtures.observations_api.observations_of_datastream(ds) + assert obs['items'][0]['phenomenonTime'] == the_time + obs_id = obs['items'][0]['id'] + res = self.fixtures.observations_api.observations_delete(obs_id) + obs = self.fixtures.observations_api.observations_of_datastream(ds) + assert obs['items'] == [] + delete_all_systems() + + +class TestSystemHistory: + fixtures = OSHFixtures() + + def test_system_history(self): + sys_id = '0s2lbn2n1bnc8' + res = self.fixtures.system_history_api.system_history(sys_id) + assert len(res['items']) > 0 + history_id = res['items'][0]['properties']['validTime'][0] + res = self.fixtures.system_history_api.system_history_by_id(system_id=sys_id, history_id=history_id) + assert res['id'] == sys_id + + +def create_single_system(): + sys_api = Systems(OSHFixtures.TEST_URL, auth=OSHFixtures.auth, headers=OSHFixtures.sml_headers) + sys_create_res = sys_api.system_create(json.dumps(OSHFixtures.system_definitions[0])) + sys_id = sys_api.response_headers['Location'].split('/')[-1] + return sys_id + + +def create_single_datastream(system_id: str): + ds_api = Datastreams(OSHFixtures.TEST_URL, auth=OSHFixtures.auth, headers=OSHFixtures.json_headers) + result = ds_api.datastream_create_in_system(system_id, json.dumps(OSHFixtures.ds_definition)) + ds_id = ds_api.response_headers['Location'].split('/')[-1] + return ds_id + + +def delete_all_systems(): + # delete datastreams first + delete_all_datastreams() + delete_all_sampling_features() + sys_api = Systems(OSHFixtures.TEST_URL, auth=OSHFixtures.auth, headers=OSHFixtures.sml_headers) + systems = sys_api.systems() + for system in systems['items']: + OSHFixtures.systems_api.system_delete(system['id']) + + +def delete_all_datastreams(): + datastreams = OSHFixtures.datastream_api.datastreams() + for ds in datastreams['items']: + OSHFixtures.datastream_api.datastream_delete(ds['id']) + + +def delete_all_sampling_features(): + sampling_features = OSHFixtures.sampling_feature_api.sampling_features(use_fois=True) + for sf in sampling_features['items']: + OSHFixtures.sampling_feature_api.sampling_feature_delete(sf['id'], use_fois=True)