Skip to content

Commit

Permalink
combine system tests into a single one to remove potential ordering i…
Browse files Browse the repository at this point in the history
…ssues
  • Loading branch information
ChainReaction31 committed Jun 6, 2024
1 parent 4fcbf3c commit f00cccb
Showing 1 changed file with 62 additions and 59 deletions.
121 changes: 62 additions & 59 deletions tests/test_connectedsystems_osh.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,55 @@
from time import sleep

import pytest
import requests

from owslib.ogcapi.connectedsystems import Systems, Deployments, Datastreams, Observations, ControlChannels, Commands, \
SystemEvents, SystemHistory, SamplingFeatures, Properties
from owslib.util import Authentication
from tests.utils import service_ok


class OSHFixtures:
NODE_TEST_OK_URL = 'http://localhost:8585/sensorhub/test'
TEST_URL = 'http://localhost:8585/sensorhub/api/'
auth = Authentication('admin', 'admin')
NODE_TEST_OK_URL = 'http://34.67.197.57:8585/sensorhub/test'
# Directs to OSH hosted test server
TEST_URL = 'http://34.67.197.57:8585/sensorhub/api/'
auth = Authentication('auto_test', 'automated_tester24')
sml_headers = {'Content-Type': 'application/sml+json'}
json_headers = {'Content-Type': 'application/json'}
geojson_headers = {'Content-Type': 'application/geo+json'}
omjson_headers = {'Content-Type': 'application/om+json'}

system_definitions = [
{"type": "Feature",
"properties": {
"featureType": "http://www.w3.org/ns/sosa/Sensor",
"uid": "urn:osh:sensor:testcase:001",
"name": "Test Sensor 001"},
"geometry": None},
{"type": "Feature",
"properties": {
"featureType": "http://www.w3.org/ns/sosa/Sensor",
"uid": "urn:osh:sensor:testcase:002",
"name": "Test Sensor 002"},
"geometry": None}
{
"type": "SimpleProcess",
"uniqueId": "urn:osh:sensor:testsmlsensor:001",
"label": "Test SML Sensor",
"description": "A Sensor created from an SML document",
"definition": "http://www.w3.org/ns/ssn/Sensor"
},
{
"type": "SimpleProcess",
"uniqueId": "urn:osh:sensor:testsmlsensor:002",
"label": "Test SML Sensor #2",
"description": "A Sensor created from an SML document",
"definition": "http://www.w3.org/ns/ssn/Sensor"
}
]
sys_sml_def = {

sys_sml_to_update = {
"type": "SimpleProcess",
"uniqueId": "urn:osh:sensor:testsmlsensor:001",
"label": "Test SML Sensor",
"description": "A Sensor created from an SML document",
"definition": "http://www.w3.org/ns/ssn/Sensor"
}

sys_sml_def = {
"type": "SimpleProcess",
"uniqueId": "urn:osh:sensor:testsmlsensor:solo",
"label": "Test SML Sensor - Created on its own",
"description": "A Sensor created from an SML document",
"definition": "http://www.w3.org/ns/ssn/Sensor"
}

sml_component = {
"type": "SimpleProcess",
"uniqueId": "urn:osh:sensor:testcomponent:001",
Expand Down Expand Up @@ -193,57 +203,36 @@ def test_collection_item(self):
def test_collection_item_create(self):
assert False

def test_systems(self):
fixtures = OSHFixtures()
list_systems = fixtures.systems_api.systems()
assert list_systems is not None

def test_system(self):
list_sys_by_id = self.fixtures.systems_api.system('94n1f19ld7tlc')
assert list_sys_by_id is not None

def test_system_create(self):
self.fixtures.systems_api.headers = {'Content-Type': 'application/sml+json'}
sml_str = json.dumps(self.fixtures.sys_sml_def)
post_systems = self.fixtures.systems_api.system_create(sml_str)

assert post_systems is not None
def test_system_functions(self):
# insertion of systems
self.fixtures.systems_api.headers = self.fixtures.sml_headers
sys_create_res = self.fixtures.systems_api.system_create(json.dumps(self.fixtures.system_definitions))
assert sys_create_res is not None

def test_system_update(self):
sml_desc_copy = self.fixtures.sys_sml_def.copy()
# update of system and retrieval
sml_desc_copy = self.fixtures.sys_sml_to_update.copy()
sml_desc_copy['description'] = 'Updated Description'
sml_str = json.dumps(sml_desc_copy)
self.fixtures.systems_api.headers = {'Content-Type': 'application/sml+json'}
print(sml_str)
post_systems = self.fixtures.systems_api.system_update('blid74chqmses', sml_str)
print(post_systems)

check_result = self.fixtures.systems_api.system('blid74chqmses')
print(check_result)
assert check_result['properties']['description'] == 'Updated Description'

def test_preconfiguredsystem_update(self):
updated_def = {
"type": "PhysicalSystem",
"id": "0s2lbn2n1bnc8",
"uniqueId": "urn:osh:sensor:simweathernetwork:001",
"definition": "http://www.w3.org/ns/sosa/Sensor",
"label": "Simulated Weather Station Network",
"description": "Updated description to include more data!!!",
"validTime": [
"2024-04-29T02:30:07.961Z",
"now"
]
}
self.fixtures.systems_api.headers = self.fixtures.sml_headers
res = self.fixtures.systems_api.system_update('0s2lbn2n1bnc8',
json.dumps(updated_def))
print(res)
assert res is not None
# deletion of system
all_systems = self.fixtures.systems_api.systems()
print(all_systems)

# @pytest.mark.xfail(response="Response doesn't include information to test against")
def test_system_delete(self):
res = self.fixtures.systems_api.system_delete('blid74chqmses')
print(f'res: {res}')
assert res['code'] == 204
# clear datastreams
delete_all_datastreams()

for system in all_systems['items']:
print(f'System: {system}')
res = self.fixtures.systems_api.system_delete(system['id'])
print(f'res: {res}')
assert res['code'] == 204


@pytest.mark.skip("Not implemented by server")
Expand Down Expand Up @@ -688,3 +677,17 @@ def test_system_history_update_description(self):
@pytest.mark.skip(reason="Will break test server")
def test_system_history_delete(self):
assert False


def delete_all_systems():
# delete datastreams first
delete_all_datastreams()
systems = OSHFixtures.systems_api.systems()
for system in systems['items']:
OSHFixtures.systems_api.system_delete(system['id'])


def delete_all_datastreams():
datastreams = OSHFixtures.datastream_api.datastreams()
for ds in datastreams['items']:
OSHFixtures.datastream_api.datastream_delete(ds['id'])

0 comments on commit f00cccb

Please sign in to comment.