Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archetypes for external datasets #353

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kgforge/core/archetypes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
from .mapping import Mapping
from .mapper import Mapper
from .store import Store
from .read_store import ReadStore
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
from .model import Model
from .resolver import Resolver
133 changes: 133 additions & 0 deletions kgforge/core/archetypes/external_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#
# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Blue Brain Nexus Forge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
# General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Blue Brain Nexus Forge. If not, see <https://choosealicense.com/licenses/lgpl-3.0/>.

import json
import requests
from abc import abstractmethod

from typing import Optional, Union, List
from kgforge.core import Resource
from kgforge.core.archetypes.read_store import ReadStore
from kgforge.core.archetypes.model import Model
from kgforge.core.commons.exceptions import QueryingError
from kgforge.specializations.stores.bluebrain_nexus import BlueBrainNexus


class ExternalDataset(ReadStore):
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
"""A class to link to external databases, query and search directly on datasets. """

def __init__(self, model: Optional[Model] = None,
) -> None:
super().__init__(model)

def types(self):
# TODO: add other datatypes used, for instance, inside the mappings
return list(self.model.mappings(self.model.source, False).keys())

def search(self, resolvers, *filters, **params):
"""Search within the database.

:param keep_original: bool
"""
keep_original = params.pop('keep_original', True)
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
unmapped_resources = self._search(resolvers, *filters, **params)
if isinstance(self.service, BlueBrainNexus) or keep_original:
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
return unmapped_resources
# Try to find the type of the resources within the filters
resource_type = type_from_filters(*filters)
return self.map(unmapped_resources, type_=resource_type)
crisely09 marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
def _search(self):
...

def sparql(self, query: str, debug: bool = False, limit: Optional[int] = None,
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
offset: Optional[int] = None, **params) -> Optional[Union[List[Resource], Resource]]:
"""Use SPARQL within the database.

:param keep_original: bool
"""
keep_original = params.pop('keep_original', True)
unmapped_resources = self._sparql(query, debug, limit, offset, **params)
if keep_original:
return unmapped_resources
return self.map(unmapped_resources)


def type_from_filters(*filters) -> Optional[str]:
"""Returns the first `type` found in filters."""
resource_type = None
filters = filters[0]
if isinstance(filters, dict):
if 'type' in filters:
resource_type = filters['type']
else:
# check filters grouping
if isinstance(filters, (list, tuple)):
filters = [filter for filter in filters]
else:
filters = [filters]
for filter in filters:
if 'type' in filter.path and filter.operator is "__eq__":
resource_type = filter.value
break
return resource_type


def resources_from_request(url, headers, **params):
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
"""Perform a HTTP request
params:
-------
response_loc : list[str]
The nested location of the relevat metadata in the
response.
Example: NeuroMorpho uses response["_embedded"]["neuronResources"]
which should be given as: response_loc = ["_embedded", "neuronResources"]
"""
response_location = params.pop('response_loc', None)
try:
response = requests.get(
url, params=params, headers=headers, verify=False
)
response.raise_for_status()
except Exception as e:
raise QueryingError(e)
else:
data = response.json()
if response_location:
# Get the resources directly from a location in the response
if isinstance(response_location, str):
results = data[response_location]
elif isinstance(response_location, (list, tuple)):
for inner in response_location:
data = data[inner]
results = data
return [Resource(**result) for result in results]
else:
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
# Standard response format
results = data["results"]["bindings"]
return resources_from_results(results)


def resources_from_results(results):
"""Returns Resources from standard response bindings."""
return [
Resource(**{k: json.loads(str(v["value"]).lower())
if v['type'] == 'literal' and ('datatype' in v and v['datatype'] == 'http://www.w3.org/2001/XMLSchema#boolean')
else (int(v["value"])
if v['type'] == 'literal' and ('datatype' in v and v['datatype'] == 'http://www.w3.org/2001/XMLSchema#integer')
else v["value"])
for k, v in x.items()})
for x in results
]
223 changes: 223 additions & 0 deletions kgforge/core/archetypes/read_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
#
# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Blue Brain Nexus Forge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
# General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Blue Brain Nexus Forge. If not, see <https://choosealicense.com/licenses/lgpl-3.0/>.
import re
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

from kgforge.core import Resource
from kgforge.core.archetypes.model import Model
from kgforge.core.archetypes.resolver import Resolver
from kgforge.core.commons.attributes import repr_class
from kgforge.core.commons.context import Context
from kgforge.core.commons.exceptions import (
DownloadingError,
)
from kgforge.core.commons.execution import not_supported
from kgforge.core.reshaping import collect_values
from kgforge.core.wrappings.dict import DictWrapper


class ReadStore(ABC):
crisely09 marked this conversation as resolved.
Show resolved Hide resolved

# See demo_store.py in kgforge/specializations/stores/ for a reference implementation.

# POLICY Methods of archetypes, except __init__, should not have optional arguments.

# POLICY Implementations should be declared in kgforge/specializations/stores/__init__.py.
# POLICY Implementations should not add methods but private functions in the file.
# TODO Move from BDD to classical testing to have a more parameterizable test suite. DKE-135.
# POLICY Implementations should pass tests/specializations/stores/demo_store.feature tests.

def __init__(
self,
model: Optional[Model] = None,
) -> None:
self.model: Optional[Model] = model
self.model_context: Optional[Context] = (
self.model.context() if hasattr(self.model, 'context') else None
)

def __repr__(self) -> str:
return repr_class(self)

# C[R]UD.

@abstractmethod
def retrieve(
self, id_: str, version: Optional[Union[int, str]], cross_bucket: bool, **params
) -> Resource:
# POLICY Should notify of failures with exception RetrievalError including a message.
# POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict().
# POLICY Resource _synchronized should be set to True.
# TODO These two operations might be abstracted here when other stores will be implemented.
pass

def _retrieve_filename(self, id: str) -> Tuple[str, str]:
# TODO This operation might be adapted if other file metadata are needed.
not_supported()

def _prepare_download_one(
self,
url: str,
store_metadata: Optional[DictWrapper],
cross_bucket: bool
) -> Tuple[str, str]:
# Prepare download url and download bucket
not_supported()

def download(
self,
data: Union[Resource, List[Resource]],
follow: str,
path: str,
overwrite: bool,
cross_bucket: bool,
content_type: str = None
) -> None:
# path: DirPath.
urls = []
store_metadata = []
to_download = [data] if isinstance(data, Resource) else data
for d in to_download:
collected_values = collect_values(d, follow, DownloadingError)
urls.extend(collected_values)
store_metadata.extend(
[d._store_metadata for _ in range(len(collected_values))]
)
if len(urls) == 0:
raise DownloadingError(
f"path to follow '{follow}' was not found in any provided resource."
)
dirpath = Path(path)
dirpath.mkdir(parents=True, exist_ok=True)
timestamp = time.strftime("%Y%m%d%H%M%S")
filepaths = []
buckets = []
download_urls = []
download_store_metadata = []
for i, x in enumerate(urls):
x_download_url, x_bucket = self._prepare_download_one(x, store_metadata[i],
cross_bucket)
filename, store_content_type = self._retrieve_filename(x_download_url)
if not content_type or (content_type and store_content_type == content_type):
filepath = dirpath / filename
if not overwrite and filepath.exists():
filepaths.append(f"{filepath}.{timestamp}")
else:
filepaths.append(str(filepath))
download_urls.append(x_download_url)
buckets.append(x_bucket)
download_store_metadata.append(store_metadata[i])
if len(download_urls) > 1:
self._download_many(download_urls, filepaths, download_store_metadata, cross_bucket,
content_type, buckets)
elif len(download_urls) == 1:
self._download_one(download_urls[0], filepaths[0], download_store_metadata[0],
cross_bucket, content_type, buckets[0])
else:
raise DownloadingError(
f"No resource with content_type {content_type} was found when following the resource path '{follow}'."
)

def _download_many(
self,
urls: List[str],
paths: List[str],
store_metadata: Optional[List[DictWrapper]],
cross_bucket: bool,
content_type: str,
buckets: List[str]
) -> None:
# paths: List[FilePath].
# Bulk downloading could be optimized by overriding this method in the specialization.
# POLICY Should follow self._download_one() policies.
for url, path, store_m, bucket in zip(urls, paths, store_metadata, buckets):
self._download_one(url, path, store_m, cross_bucket, content_type, bucket)

def _download_one(
self,
url: str,
path: str,
store_metadata: Optional[DictWrapper],
cross_bucket: bool,
content_type: str,
bucket: str
) -> None:
# path: FilePath.
# POLICY Should notify of failures with exception DownloadingError including a message.
not_supported()

# Querying.

@abstractmethod
def search(
self, resolvers: Optional[List[Resolver]], *filters, **params
) -> List[Resource]:

# Positional arguments in 'filters' are instances of type Filter from wrappings/paths.py
# A dictionary can be provided for filters:
# - {'key1': 'val', 'key2': {'key3': 'val'}} will be translated to
# - [Filter(operator='__eq__', path=['key1'], value='val'), Filter(operator='__eq__', path=['key2', 'key3'], value='val')]
# Keyword arguments in 'params' could be:
# - debug: bool,
# - limit: int,
# - offset: int,
# - deprecated: bool,
# - resolving: str, with values in ('exact', 'fuzzy'),
# - lookup: str, with values in ('current', 'children').
# POLICY Should use sparql() when 'sparql' is chosen as value for the param 'search_endpoint'.
# POLICY Should use elastic() when 'elastic' is chosen as value for the param 'search_endpoint'.
# POLICY Given parameters for limit and offset override the input query.
# POLICY Should notify of failures with exception QueryingError including a message.
# POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict().
# POLICY Resource _synchronized should be set to True.
# TODO These two operations might be abstracted here when other stores will be implemented.
...

@abstractmethod
def sparql(self, query: str, debug: bool = False, limit: Optional[int] = None,
offset: Optional[int] = None, **params) -> Optional[Union[List[Resource], Resource]]:
...
crisely09 marked this conversation as resolved.
Show resolved Hide resolved

# Versioning.

@abstractmethod
def _initialize_service(
self,
endpoint: Optional[str],
bucket: Optional[str],
token: Optional[str],
searchendpoints: Optional[Dict] = None,
**store_config,
) -> Any:
# POLICY Should initialize the access to the store according to its configuration.
...

@staticmethod
def _debug_query(query) -> None:
if isinstance(query, Dict):
print("Submitted query:", query)
else:
print(*["Submitted query:", *query.splitlines()], sep="\n ")
print()

def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str:
crisely09 marked this conversation as resolved.
Show resolved Hide resolved
"""Rewrite a given uri using the store Context
:param uri: a URI to rewrite.
:param context: a Store Context object
:return: str
"""
pass
Loading