#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Defines the protocol
"""
import datetime
import collections
import urllib
from kuha_oai_pmh_repo_handler.oai import errors as oaierrors
from kuha_oai_pmh_repo_handler.oai.metadata_formats import (
DCMetadataFormat,
DDIMetadataFormat,
CDCDDI25MetadataFormat,
EAD3MetadataFormat,
)
from kuha_oai_pmh_repo_handler.oai.records import (
OAIHeaders,
)
from kuha_oai_pmh_repo_handler.oai.constants import (
OAI_RESPONSE_LIST_SIZE,
OAI_RESPONSE_TIMESTAMP_FORMAT,
OAI_RESPONSE_DATE_FORMAT,
OAI_REPO_NAME,
OAI_PROTOCOL_VERSION
)
[docs]def as_supported_datetime(datetime_str, raise_oai_exc=True):
"""Convert string representation of datetime to :obj:`datetime`.
:note: If the `datetime_str` does not come from HTTP-Request,
set `raise_oai_exc` to False.
:note: The legitimate formats are YYYY-MM-DD and YYYY-MM-DDThh:mm:ssZ.
:param datetime_str: datetime to convert
:type datetime_str: str
:param raise_oai_exc: Catch datetime.strptime errors and reraise as oai-error.
:type raise_oai_exc: bool
:returns: converted datetime.
:rtype: :obj:`datetime`
:raises: :exc:`kuha_oai_pmh_repo_hander.oai.errors.BadArgument` for invalid format if
`raise_oai_exc` is True.
"""
try:
fmt = {10: OAI_RESPONSE_DATE_FORMAT,
20: OAI_RESPONSE_TIMESTAMP_FORMAT}[len(datetime_str)]
return datetime.datetime.strptime(datetime_str, fmt)
except (KeyError, ValueError) as exc:
if raise_oai_exc:
# Mask as BadArgument to notify requester throught OAI error condition.
raise oaierrors.BadArgument(
"Invalid datetime format: {}".format(datetime_str)
) from exc
# otherwise its a programming error.
raise
[docs]def as_supported_datestring(datetime_obj):
"""Convert :obj:`datetime` to string representation.
The target format is YYYY-MM-DDThh:mm:ssZ
:param datetime_obj: datetime to convert.
:type datetime_obj: :obj:`datetime`
:returns: string representation of datetime_obj.
:rtype: str
"""
return datetime.datetime.strftime(datetime_obj, OAI_RESPONSE_TIMESTAMP_FORMAT)
[docs]def encode_uri(string):
"""Encode uri string.
Replace special characters in string using
:func:`urllib.parse.quote`. Return resulting
string.
:param string: value to encode.
:type string: str
:returns: encoded value
:rtype: str
"""
return urllib.parse.quote(string)
[docs]def decode_uri(uri):
"""Decode uri string.
Replace uri encoded special characters in string using
:func:`urllib.parse.unquote`. Return resulting
string.
:param string: value to decode.
:type string: str
:returns: decoded value
:rtype: str
"""
return urllib.parse.unquote(uri)
[docs]def min_increment_step(datetime_str):
"""Count smallest increment step from datetime string.
:param datetime_str: string representation of a datetime.
Datetime must be represented either
by day's precision or by second's precision.
:type datetime_str: str
:returns: smallest increment step.
:rype: :obj:`datetime.timedelta`
:raises: :exc:`ValueError` if string lenght is invalid.
"""
if len(datetime_str) == 10:
# day's precision
increment = datetime.timedelta(days=1)
elif len(datetime_str) == 20:
# second's precision
increment = datetime.timedelta(seconds=1)
else:
ValueError("Invalid datetime string: {}".format(datetime_str))
return increment
[docs]class ResumptionToken:
"""Class representing OAI-PMH Resumption Token.
Holds attributes of the resumption token. Creates a new
resumption token with initial values or takes a
dictionary of resumption token arguments. Validates the
token based on records list size. If the list
size has been changed between requests asserts that the
token is invalid by raising a :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken`
exception.
:note: Since :attr:`OAIArgument.set_` is not supported by resumption token,
changing the requested set may result in falsely valid
resumption token. But changing the requested set in the
middle of a list request sequence should be seen as bad
behaviour by the requester/harvester.
:param cursor: Optional parameter for the current position in list.
:type cursor: int
:param from_: Optional parameter for from datestamp.
Converted to :obj:`datetime.datetime` on init.
:type from_: str.
:param until: Optional parameter for until datestamp.
Converted to :obj:`datetime.datetime` on init.
:type until: str.
:param complete_list_size: Optional parameter for the umber of records in the complete list.
:type complete_list_size: int
:param metadata_prefix: Optional parameter for the requested metadata prefix.
:type metadata_prefix: str
:param set_: Optional parameter containing requested set information.
:type metadata_prefix: str
"""
#: Store ResumptionToken attribute keys and values.
Attribute = collections.namedtuple('Attribute', ['key', 'value'])
cursor = Attribute('cursor', 0)
from_ = Attribute('from', None)
until = Attribute('until', None)
set_ = Attribute('set', None)
complete_list_size = Attribute('completeListSize', None)
metadata_prefix = Attribute('metadataPrefix', None)
#: Configurable value for the size of the list response.
response_list_size = OAI_RESPONSE_LIST_SIZE
def __init__(self, cursor=0, from_=None, until=None, complete_list_size=None,
metadata_prefix=None, set_=None):
self.cursor = self.cursor._replace(value=int(cursor))
if from_:
# Convert to datetime.datetime
self.from_ = self.from_._replace(
value=as_supported_datetime(from_)
)
if until:
# Convert to datetime.datetime
self.until = self.until._replace(
value=as_supported_datetime(until)
)
increment_step = min_increment_step(until)
else:
now = datetime.datetime.utcnow()
self.until = self.until._replace(
value=now - datetime.timedelta(microseconds=now.microsecond)
)
increment_step = datetime.timedelta(seconds=1)
self.query_param_until = self.until.value + increment_step
if complete_list_size:
self.complete_list_size = self.complete_list_size._replace(
value=int(complete_list_size)
)
if set_:
self.set_ = self.set_._replace(value=set_)
self.metadata_prefix = self.metadata_prefix._replace(
value=metadata_prefix
)
[docs] @classmethod
def set_response_list_size(cls, size):
"""Configure response list size.
:param size: Number of records in list response.
:type size: int
"""
cls.response_list_size = size
@classmethod
def _load_dict(cls, resumption_token):
# resumption_token is a dict containing strings or NoneTypes
cursor = int(resumption_token[cls.cursor.key]) + cls.response_list_size
from_ = resumption_token.get(cls.from_.key, None)
until = resumption_token.get(cls.until.key, None)
complete_list_size = int(resumption_token[cls.complete_list_size.key])
metadata_prefix = resumption_token.get(cls.metadata_prefix.key)
set_ = resumption_token.get(cls.set_.key, None)
return cls(cursor=cursor, from_=from_,
until=until, complete_list_size=complete_list_size,
metadata_prefix=metadata_prefix, set_=set_)
[docs] @classmethod
def load_resumption_token_argument(cls, argument):
"""Create new resumption token from arguments.
Use to load resumption token from OAI request.
:param argument: Resumption token argument.
This comes from HTTP-request.
:type argument: str
:returns: New :obj:`ResumptionToken`
"""
result = {}
for _arg in argument.split('&'):
_key, _val = _arg.split(':', 1)
if _val == 'None':
_val = None
result.update({_key: _val})
return cls._load_dict(result)
def _validate(self, size):
# If size has changed between requests, the resumption token is no longer valid.
return self.complete_list_size.value == size
def _is_final_list(self):
return self.cursor.value + self.response_list_size >= self.complete_list_size.value
[docs] def set_complete_list_size(self, size):
"""Set the number of records in the complete query response.
:note: Resumption token is invalid if the number of records for the complete
query response has been changed between requests.
:param size: Number of records for the complete query response.
:type size: int
:raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken`
if list sizes don't match.
"""
if not self.complete_list_size.value:
self.complete_list_size = self.complete_list_size._replace(
value=int(size)
)
else:
if not self._validate(size):
_ctx = "Requested completeListSize: {}".format(
self.complete_list_size.value
)
_ctx += " Actual completeListSize: {}".format(size)
raise oaierrors.BadResumptionToken(context=_ctx)
[docs] def get_encoded(self):
"""Get encoded Resumption Token.
Returns uri-encoded representation of the resumption token if the
list request sequence is ongoing. If the list request sequence is over,
returns None.
:returns: uri-encoded represenation of the token, or None
:rtype: str or None
"""
if self._is_final_list():
return None
attrs = []
for attr in [self.cursor, self.from_, self.until, self.set_,
self.complete_list_size, self.metadata_prefix]:
key, value = attr
if value is not None and key in (self.until.key, self.from_.key):
value = as_supported_datestring(value)
attrs.append('{}:{}'.format(key, value))
_str = '&'.join(attrs)
return encode_uri(_str)
[docs]class OAIResponse:
"""Represents the response.
The response is stored in a dictionary which then
gets submitted to XML-templates. Thus it is required that
the dictionary built within this class is supported
by the templates.
:param request_url: Optional requested url. Leave empty to use
base url.
:type request_url: str or None
"""
repository_name = OAI_REPO_NAME
base_url = ''
protocol_version = OAI_PROTOCOL_VERSION
admin_email = []
def __init__(self, request_url=None):
if request_url is None:
request_url = self.base_url
self.records = []
self._oai_response = {
'response_metadata': {
'response_timestamp': datetime.datetime.utcnow().strftime(OAI_RESPONSE_TIMESTAMP_FORMAT),
'request_attrs': {},
'resumption_token': None,
'request_url': request_url
},
'error': {},
'data': {'records': self.records,
'base_url': self.base_url,
'admin_email': self.admin_email,
'protocol_version': self.protocol_version,
'repository_name': self.repository_name}
}
[docs] @classmethod
def set_repository_name(cls, name):
"""Set repository name.
:param name: repository name.
:type name: str
"""
cls.repository_name = name
[docs] @classmethod
def set_base_url(cls, url):
"""Set base url
:param url: url.
:type url: str
"""
cls.base_url = url
[docs] @classmethod
def set_admin_email(cls, email):
"""Set admin email address.
:param email: Admin email(s)
:type email: list
"""
cls.admin_email = email
[docs] @classmethod
def set_protocol_version(cls, version):
"""Set protocol version
:param version: OAI-PMH protocol version.
:type version: float
"""
cls.protocol_version = version
def _update_response(self, parent_key, key, value, overwrite=False):
existing_value = self._oai_response[parent_key].get(key)
if existing_value and not overwrite:
raise ValueError(
"Response has value {}, will not overwrite".format(
existing_value)
)
self._oai_response[parent_key][key] = value
def _set_response_data_key(self, key):
if key not in self._oai_response['data']:
self._update_response_data(key, [])
def _update_response_error(self, *args, **kwargs):
self._update_response('error', *args, **kwargs)
def _update_response_metadata(self, *args, **kwargs):
self._update_response('response_metadata', *args, **kwargs)
def _update_response_data(self, *args, **kwargs):
self._update_response('data', *args, **kwargs)
def _append_response_data(self, key, value):
self._set_response_data_key(key)
self._oai_response['data'][key].append(value)
def _extend_response_data(self, key, value):
self._set_response_data_key(key)
self._oai_response['data'][key].extend(value)
[docs] def add_record(self, record):
"""Add record to response
:param record: OAIRecord to add.
:type record: :obj:`kuha_oai_pmh_repo_handler.oai.records.OAIRecord`
"""
self.records.append(record)
[docs] def has_records(self):
"""Return True if response has records.
:rtype: bool
"""
return bool(self.records)
[docs] def assert_single_record(self):
"""Assert the response has a single record.
:raises: :exc:`AssertionError` if there is more or less than
a single record.
"""
assert len(self.records) == 1
[docs] def set_earliest_datestamp(self, datestamp):
"""Set earliest datestamp.
:param datestamp: datestamp in finest granularity ISO8601
:type datestamp: str
"""
self._update_response_data('earliest_datestamp', datestamp)
[docs] def set_deleted_records_declaration(self, declaration):
"""Set deleted records declaration.
:param declaration: declare support for deleted records
:type declaration: str
"""
self._update_response_data('deleted_records', declaration)
[docs] def set_granularity(self, granularity):
"""Set datestamp granularity.
:param granularity: datestamp format for finest granularity
supported by this repository.
:type granularity: str
"""
self._update_response_data('granularity', granularity)
[docs] def set_resumption_token(self, token):
"""Set resumption token.
:param token: resumption token.
:type token: :obj:`ResumptionToken`
"""
# Get necessary details from token.
# This gets read in templates.
token_dict = {'encoded': token.get_encoded(),
'complete_list_size': token.complete_list_size.value,
'cursor': token.cursor.value}
self._update_response_metadata('resumption_token', token_dict)
[docs] def set_error(self, oai_error):
"""Set OAI-PMH error.
:note: These are the errors that are defined in the OAI-protocol.
Programming errors are handled separately in higher levels.
:param oai_error: OAI error.
:type oai_error: Subclass of :obj:`kuha_oai_pmh_repo_handler.oai.errors.OAIError`
"""
self._update_response_error('code', oai_error.get_code())
self._update_response_error('msg', oai_error.get_contextual_message())
if isinstance(oai_error, (oaierrors.BadArgument, oaierrors.BadVerb)):
# When the response resulted in badVerb or badArgument, the
# repository must return the request only, no attributes.
self._update_response_metadata(
'request_attrs', {}, overwrite=True)
[docs] def add_sets_element(self, spec, name):
"""Add new sets element.
:param spec: setSpec-sublement value.
:type spec: str
:param name: setName-sublement value.
:type spec: str
"""
self._append_response_data('sets', {'setSpec': spec, 'setName': name})
[docs] def extend_sets_element(self, sets_list):
"""Add multiple sets elements
:note: Parameter may come directly from
:meth:`kuha_oai_pmh_repo_handler.oai.records.Sets.get_sets_list_from_records`
:param sets_list: list of sets-elements.
:type sets_list: list
"""
self._extend_response_data('sets', sets_list)
[docs] def set_request_params(self, oai_request):
"""Gather response parameters from request.
:note: These are common response parameters that
can be added to each response.
:param oai_request: Current OAI-request.
:type oai_request: :obj:`OAIRequest`
"""
md_format = oai_request.get_metadata_format()
self._update_response_data(
'metadata_format', md_format
)
request_attrs = {**{'metadataPrefix': md_format.get_prefix() if hasattr(md_format, 'get_prefix') else None},
**oai_request.request_attrs}
# request_attrs is used as is in oai_pmh_template.xml
self._update_response_metadata(
'request_attrs', request_attrs
)
[docs] def get_response(self):
"""Get dictionary representation of the response.
The response attributes are gathered in a dictionary that
is to be parsed in the templates.
:note: The dictionary will contain python objects, so it is not
serializable to JSON or arbitrary formats as is.
:returns: Response ready to pass to templates.
:rtype: dict
"""
return self._oai_response
[docs]class OAIArguments:
"""Arguments of OAI-protocol.
Store arguments. Convert datestamps string to datetime objects.
Validate arguments for each verb.
:param verb: requested OAI verb.
:type verb: str
:param resumption_token: requested resumption token.
:type resumption_token: str
:param identifier: requested identifier.
:type identifier: str
:param metadata_prefix: requested metadata prefix.
:type metadata_prefix: str
:param set_: requested set.
:type set_: str
:param from_: requested datestamp for from attribute.
:type from_: str
:param until: requested datestamp for until attribute.
:type until: str
:raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.OAIError` for OAI errors.
"""
verb_value_identify = 'Identify'
verb_value_list_sets = 'ListSets'
verb_value_list_metadata_formats = 'ListMetadataFormats'
verb_value_list_identifiers = 'ListIdentifiers'
verb_value_list_records = 'ListRecords'
verb_value_get_record = 'GetRecord'
#: Define supported verbs
supported_verbs = [
verb_value_identify,
verb_value_list_sets,
verb_value_list_metadata_formats,
verb_value_list_identifiers,
verb_value_list_records,
verb_value_get_record
]
#: Define resumption token verbs
resumable_verbs = [
verb_value_list_sets,
verb_value_list_identifiers,
verb_value_list_records
]
#: Define supported metadata formats
supported_metadata_formats = [
DCMetadataFormat,
DDIMetadataFormat,
CDCDDI25MetadataFormat,
EAD3MetadataFormat
]
def __init__(self,
verb,
resumption_token=None,
identifier=None,
metadata_prefix=None,
set_=None,
from_=None,
until=None):
if verb is None:
raise oaierrors.MissingVerb()
if verb not in self.supported_verbs:
raise oaierrors.BadVerb(context=verb)
self.verb = verb
self.identifier = identifier
self._requested_until = as_supported_datetime(until, raise_oai_exc=False) if until\
else None
self._resumption_token = self._init_resumption_token(
resumption_token, metadata_prefix, from_, until, set_)
self.metadata_format = self._init_metadata_format(metadata_prefix)
self._validate_verb_arguments()
def _init_metadata_format(self, metadata_prefix_arg):
metadata_format = None
metadata_prefix = self._resumption_token.metadata_prefix.value if\
self.is_verb_resumable() else\
metadata_prefix_arg
if metadata_prefix:
for _format in self.iterate_supported_metadata_formats():
if _format.prefix == metadata_prefix:
metadata_format = _format()
break
if self.verb in [self.verb_value_list_identifiers,
self.verb_value_list_records,
self.verb_value_get_record]:
if not metadata_prefix:
raise oaierrors.BadArgument(
msg='Missing argument',
context='\"metadataPrefix\"')
if not metadata_format:
raise oaierrors.CannotDisseminateFormat()
return metadata_format
def _init_resumption_token(self, resumption_token_arg,
metadata_prefix_arg,
from_arg, until_arg, set_):
if not self.is_verb_resumable():
if resumption_token_arg:
raise oaierrors.BadArgument(
msg="Got resumptionToken-parameter for verb which does not support it",
context=self.verb
)
return ResumptionToken()
if resumption_token_arg:
resumption_token = ResumptionToken.load_resumption_token_argument(
resumption_token_arg
)
if metadata_prefix_arg is not None and\
resumption_token.metadata_prefix.value != metadata_prefix_arg:
raise oaierrors.BadArgument(
msg="metadataPrefix in resumptionToken differs from metadataPrefix-parameter.",
context="%s != %s" % (resumption_token.metadata_prefix.value,
metadata_prefix_arg)
)
else:
resumption_token = ResumptionToken(from_=from_arg,
until=until_arg,
metadata_prefix=metadata_prefix_arg,
set_=set_)
return resumption_token
[docs] def is_verb_resumable(self):
"""Is the requested verb a resumable list request?
:returns: True if verb is resumable False otherwise
:rtype: bool
"""
return self.verb in self.resumable_verbs
[docs] def get_verb(self):
"""Get requested OAI-verb.
:returns: requested OAI-verb.
:rtype: str
"""
return self.verb
[docs] def get_resumption_token(self):
"""Get resumption token for request.
The resumption token is either submitted in
the request or created automatically.
:returns: resumption token.
:rtype: :obj:`ResumptionToken`
"""
return self._resumption_token
[docs] def get_cursor(self):
"""Get resumptionToken cursor
:returns: cursor value
:rtype: str or None
"""
return self._resumption_token.cursor.value
[docs] def get_from(self):
"""Get from argument.
:returns: from argument
:rtype: str or None
"""
return self._resumption_token.from_.value
[docs] def get_until(self):
"""Get until argument.
:returns: until argument.
:rtype: str or None
"""
return self._resumption_token.until.value
[docs] def get_query_param_until(self):
"""Get until datestamp for querying.
:note: This is until + smallest increment step.
:returns: datestamp of query_param_until attribute.
:rtype: :obj:`datetime.datetime`
"""
return self._resumption_token.query_param_until
def get_complete_list_size(self):
return self._resumption_token.complete_list_size.value
[docs] def get_identifier(self):
"""Get requested identifier.
:returns: requested identifier if any.
:rtype: str or None
"""
return self.identifier
[docs] def get_local_identifier(self):
"""Get requested local identifier.
Local identifier does not have prefixes for
oai and namespace. It is used to identify
records locally.
:returns: Local identifier if applicable for the request.
:rtype: str
:raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.IdDoesNotExist`
for invalid identifier.
"""
identifier = OAIHeaders.as_local_id(self.get_identifier())
if identifier is None:
raise oaierrors.IdDoesNotExist(msg='Invalid identifier structure',
context=self.get_identifier())
return identifier
[docs] def get_set(self):
"""Get requested set.
:returns: requested set.
:rtype: str
"""
return str(self._resumption_token.set_.value)
[docs] def is_selective(self):
"""Return True if request is selective.
Selective refers to selective harvesting supported by OAI-PMH.
:returns: True if selective, False if not.
:rtype: bool
"""
return any([self.has_set(),
self._resumption_token.from_.value,
self._requested_until])
[docs] def has_set(self):
"""Return True if the request contained set.
:rtype: bool
"""
return bool(self._resumption_token.set_.value)
def _validate_verb_arguments(self):
if self.verb == self.verb_value_get_record and not self.identifier:
raise oaierrors.BadArgument(
msg='Missing argument',
context='\"identifier\"')
from_, until = self.get_from(), self._requested_until
if from_ and until and from_ > until:
raise oaierrors.BadArgument(
msg='Invalid date range',
context='from argument is later than until argument')
[docs]class OAIRequest(OAIArguments):
"""Represents the OAI request.
Subclass of :class:`OAIArguments`.
Defines keys for OAI arguments.
"""
query_key_verb = 'verb'
query_key_resumption_token = 'resumptionToken'
query_key_identifier = 'identifier'
query_key_metadata_prefix = 'metadataPrefix'
query_key_set = 'set'
query_key_from = 'from'
query_key_until = 'until'
def __init__(self, args):
"""Create a request-object from requested arguments.
:param args: List of 2-item tuples [(key, value]] containing
request arguments.
:type args: list
:returns: :obj:`OAIRequest` object.
"""
args_keys = [item[0] for item in args]
# BadArgument should be risen if the request includes illegal arguments
unknown_keys = set(args_keys).difference({self.query_key_verb,
self.query_key_resumption_token,
self.query_key_identifier,
self.query_key_metadata_prefix,
self.query_key_set,
self.query_key_from,
self.query_key_until})
if unknown_keys != set():
raise oaierrors.BadArgument(msg="Illegal arguments",
context=', '.join(unknown_keys))
# BadArgument should be risen if the request contains repeated arguments
if len(set(args_keys)) != len(args_keys):
raise oaierrors.BadArgument(msg="Repeated arguments",
context=', '.join('{}={}'.format(k, v) for k, v in args))
# Convert to dict for convenience
args = dict(args)
super().__init__(args.get(OAIRequest.query_key_verb),
args.get(OAIRequest.query_key_resumption_token),
args.get(OAIRequest.query_key_identifier),
args.get(OAIRequest.query_key_metadata_prefix),
args.get(OAIRequest.query_key_set),
args.get(OAIRequest.query_key_from),
args.get(OAIRequest.query_key_until))
#: Request attributes untouched.
self.request_attrs = args