"""
This module contains base classes for data connectors.
Data connectors are the component of PEDASI which interacts directly with data provider APIs.
"""
import abc
from collections import abc as collections_abc
from collections import OrderedDict
import enum
import typing
import requests
import requests.auth
from core import plugin
[docs]class DatasetNotFoundError(Exception):
"""
Exception raised when a requested dataset cannot be found within a data source.
"""
pass
[docs]@enum.unique
class AuthMethod(enum.IntEnum):
"""
Authentication method to be used when performing a request to the external API.
"""
# Does not require authentication
NONE = -1
# Unknown - assume no authentication if a request is sent
UNKNOWN = 0
# HTTPBasicAuth from Requests
BASIC = 1
# Same as HTTPBasicAuth but key is already b64 encoded
HEADER = 2
@classmethod
def choices(cls):
return tuple((i.value, i.name) for i in cls)
REQUEST_AUTH_FUNCTIONS = OrderedDict([
(AuthMethod.NONE, None),
(AuthMethod.UNKNOWN, None),
(AuthMethod.BASIC, requests.auth.HTTPBasicAuth),
(AuthMethod.HEADER, HttpHeaderAuth),
])
[docs]class RequestCounter:
def __init__(self, count: int = 0):
self._count = count
def __iadd__(self, other: int):
self._count += other
return self
[docs] def count(self):
return self._count
[docs]class BaseDataConnector(metaclass=plugin.Plugin):
"""
Base class of data connectors which provide access to data / metadata via an external API.
DataConnectors may be defined for sources which provide:
* A single dataset
* A data catalogue - a collection of datasets
"""
#: Does this data connector represent a data catalogue containing multiple datasets?
is_catalogue = None
#: Help string to be shown when a data provider is choosing a connector
description = None
def __init__(self, location: str,
api_key: typing.Optional[str] = None,
auth: typing.Optional[typing.Callable] = None,
**kwargs):
self.location = location
self.api_key = api_key
self.auth = auth
self._request_counter = RequestCounter()
@property
def request_count(self):
return self._request_counter.count()
# TODO make normal method
[docs] @staticmethod
def determine_auth_method(url: str, api_key: str) -> AuthMethod:
"""
Determine which authentication method to use to access the data source.
Test each known authentication method in turn until one succeeds.
:param url: URL to authenticate against
:param api_key: API key to use for authentication
:return: First successful authentication method
"""
# If not using an API key - can't require auth
if not api_key:
return AuthMethod.NONE
for auth_method_id, auth_function in REQUEST_AUTH_FUNCTIONS.items():
try:
# Can we get a response using this auth method?
if auth_function is None:
response = requests.get(url)
else:
response = requests.get(url,
auth=auth_function(api_key, ''))
response.raise_for_status()
return auth_method_id
except requests.exceptions.HTTPError:
pass
# None of the attempted authentication methods was successful
raise requests.exceptions.ConnectionError('Could not authenticate against external API')
[docs] def get_response(self,
params: typing.Optional[typing.Mapping[str, str]] = None):
"""
Transparently return a response from a source API.
:param params: Optional query parameter filters
:return: Requested data / metadata - response is passed transparently
"""
return self._get_auth_request(self.location,
params=params)
def _get_auth_request(self, url, **kwargs):
self._request_counter += 1
if self.auth is None:
return requests.get(url, **kwargs)
return requests.get(url,
auth=self.auth(self.api_key, ''),
**kwargs)
[docs]class ReadOnlyInternalDataConnector(abc.ABC):
"""
Abstract mixin representing a connector for an internally hosted data source which is read only.
"""
[docs] @abc.abstractmethod
def clean_data(self, **kwargs):
"""
Clean, validate or otherwise structure the data contained within this data source.
"""
raise NotImplementedError
[docs]class InternalDataConnector(ReadOnlyInternalDataConnector):
"""
Abstract mixin representing a connector for an internally hosted data source which is able to be written to.
"""
[docs] @abc.abstractmethod
def clear_data(self):
"""
Clear all data from this data source.
"""
raise NotImplementedError
[docs] @abc.abstractmethod
def post_data(self, data: typing.Union[typing.MutableMapping[str, str],
typing.List[typing.MutableMapping[str, str]]]):
"""
Add data to this data source.
:param data: Data to add
"""
raise NotImplementedError
[docs]class DataCatalogueConnector(BaseDataConnector, collections_abc.Mapping):
"""
Base class of data connectors which provide access to a data catalogue.
"""
#: Does this data connector represent a data catalogue containing multiple datasets?
is_catalogue = True
[docs] def get_data(self,
params: typing.Optional[typing.Mapping[str, str]] = None):
raise TypeError('Data catalogues contain only metadata. You must select a dataset.')
[docs] @abc.abstractmethod
def get_datasets(self,
params: typing.Optional[typing.Mapping[str, str]] = None) -> typing.List[str]:
"""
Get the list of datasets provided by this catalogue.
:param params: Query parameters to pass to data source API
:return: List of datasets provided by this catalogue
"""
raise NotImplementedError
@abc.abstractmethod
def __getitem__(self, item: str) -> BaseDataConnector:
"""
Get a data connector for a single dataset within this catalogue.
:param item: Dataset id
:return: Data connector for dataset
:raises DatasetNotFoundError: Requested dataset cannot be found
"""
raise NotImplementedError
def __iter__(self):
return iter(self.get_datasets())
def __len__(self):
return len(self.get_datasets())
[docs]class DataSetConnector(BaseDataConnector):
"""
Base class of data connectors which provide access to a single dataset.
Metadata may be passed to the constructor if it has been collected from a previous source
otherwise attempting to retrieve metadata will raise NotImplementedError.
If you wish to connect to a source that provides metadata itself, you must create a new
connector class which inherits from this one.
"""
#: Does this data connector represent a data catalogue containing multiple datasets?
is_catalogue = False
#: Help string to be shown when a data provider is choosing a connector
description = (
'This connector is the default option and should be used when accessing an API at a single endpoint '
'which may or may not accept query parameters.'
)
def __init__(self, location: str,
api_key: typing.Optional[str] = None,
auth: typing.Optional[typing.Callable] = None,
metadata: typing.Optional = None):
super().__init__(location, api_key, auth=auth)
self._metadata = metadata
[docs] def get_data(self,
params: typing.Optional[typing.Mapping[str, str]] = None):
"""
Retrieve the data from this source.
If the data is JSON formatted it will be parsed into a dictionary - otherwise it will
be passed as plain text.
:param params: Query parameters to be passed through to the data source API
:return: Data source data
"""
response = self.get_response(params)
if 'json' in response.headers['Content-Type']:
return response.json()
return response.text