Source code for kuha_oai_pmh_repo_handler.oai.protocol

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Defines the protocol
"""

import datetime
import collections
import urllib

from kuha_oai_pmh_repo_handler.oai import errors as oaierrors
from kuha_oai_pmh_repo_handler.oai.metadata_formats import (
    DCMetadataFormat,
    DDIMetadataFormat,
    CDCDDI25MetadataFormat,
    EAD3MetadataFormat,
)
from kuha_oai_pmh_repo_handler.oai.records import (
    OAIHeaders,
)
from kuha_oai_pmh_repo_handler.oai.constants import (
    OAI_RESPONSE_LIST_SIZE,
    OAI_RESPONSE_TIMESTAMP_FORMAT,
    OAI_RESPONSE_DATE_FORMAT,
    OAI_REPO_NAME,
    OAI_PROTOCOL_VERSION
)


[docs]def as_supported_datetime(datetime_str, raise_oai_exc=True): """Convert string representation of datetime to :obj:`datetime`. :note: If the `datetime_str` does not come from HTTP-Request, set `raise_oai_exc` to False. :note: The legitimate formats are YYYY-MM-DD and YYYY-MM-DDThh:mm:ssZ. :param datetime_str: datetime to convert :type datetime_str: str :param raise_oai_exc: Catch datetime.strptime errors and reraise as oai-error. :type raise_oai_exc: bool :returns: converted datetime. :rtype: :obj:`datetime` :raises: :exc:`kuha_oai_pmh_repo_hander.oai.errors.BadArgument` for invalid format if `raise_oai_exc` is True. """ try: fmt = {10: OAI_RESPONSE_DATE_FORMAT, 20: OAI_RESPONSE_TIMESTAMP_FORMAT}[len(datetime_str)] return datetime.datetime.strptime(datetime_str, fmt) except (KeyError, ValueError) as exc: if raise_oai_exc: # Mask as BadArgument to notify requester throught OAI error condition. raise oaierrors.BadArgument( "Invalid datetime format: {}".format(datetime_str) ) from exc # otherwise its a programming error. raise
[docs]def as_supported_datestring(datetime_obj): """Convert :obj:`datetime` to string representation. The target format is YYYY-MM-DDThh:mm:ssZ :param datetime_obj: datetime to convert. :type datetime_obj: :obj:`datetime` :returns: string representation of datetime_obj. :rtype: str """ return datetime.datetime.strftime(datetime_obj, OAI_RESPONSE_TIMESTAMP_FORMAT)
[docs]def encode_uri(string): """Encode uri string. Replace special characters in string using :func:`urllib.parse.quote`. Return resulting string. :param string: value to encode. :type string: str :returns: encoded value :rtype: str """ return urllib.parse.quote(string)
[docs]def decode_uri(uri): """Decode uri string. Replace uri encoded special characters in string using :func:`urllib.parse.unquote`. Return resulting string. :param string: value to decode. :type string: str :returns: decoded value :rtype: str """ return urllib.parse.unquote(uri)
[docs]def min_increment_step(datetime_str): """Count smallest increment step from datetime string. :param datetime_str: string representation of a datetime. Datetime must be represented either by day's precision or by second's precision. :type datetime_str: str :returns: smallest increment step. :rype: :obj:`datetime.timedelta` :raises: :exc:`ValueError` if string lenght is invalid. """ if len(datetime_str) == 10: # day's precision increment = datetime.timedelta(days=1) elif len(datetime_str) == 20: # second's precision increment = datetime.timedelta(seconds=1) else: ValueError("Invalid datetime string: {}".format(datetime_str)) return increment
[docs]class ResumptionToken: """Class representing OAI-PMH Resumption Token. Holds attributes of the resumption token. Creates a new resumption token with initial values or takes a dictionary of resumption token arguments. Validates the token based on records list size. If the list size has been changed between requests asserts that the token is invalid by raising a :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken` exception. :note: Since :attr:`OAIArgument.set_` is not supported by resumption token, changing the requested set may result in falsely valid resumption token. But changing the requested set in the middle of a list request sequence should be seen as bad behaviour by the requester/harvester. :param cursor: Optional parameter for the current position in list. :type cursor: int :param from_: Optional parameter for from datestamp. Converted to :obj:`datetime.datetime` on init. :type from_: str. :param until: Optional parameter for until datestamp. Converted to :obj:`datetime.datetime` on init. :type until: str. :param complete_list_size: Optional parameter for the umber of records in the complete list. :type complete_list_size: int :param metadata_prefix: Optional parameter for the requested metadata prefix. :type metadata_prefix: str :param set_: Optional parameter containing requested set information. :type metadata_prefix: str """ #: Store ResumptionToken attribute keys and values. Attribute = collections.namedtuple('Attribute', ['key', 'value']) cursor = Attribute('cursor', 0) from_ = Attribute('from', None) until = Attribute('until', None) set_ = Attribute('set', None) complete_list_size = Attribute('completeListSize', None) metadata_prefix = Attribute('metadataPrefix', None) #: Configurable value for the size of the list response. response_list_size = OAI_RESPONSE_LIST_SIZE def __init__(self, cursor=0, from_=None, until=None, complete_list_size=None, metadata_prefix=None, set_=None): self.cursor = self.cursor._replace(value=int(cursor)) if from_: # Convert to datetime.datetime self.from_ = self.from_._replace( value=as_supported_datetime(from_) ) if until: # Convert to datetime.datetime self.until = self.until._replace( value=as_supported_datetime(until) ) increment_step = min_increment_step(until) else: now = datetime.datetime.utcnow() self.until = self.until._replace( value=now - datetime.timedelta(microseconds=now.microsecond) ) increment_step = datetime.timedelta(seconds=1) self.query_param_until = self.until.value + increment_step if complete_list_size: self.complete_list_size = self.complete_list_size._replace( value=int(complete_list_size) ) if set_: self.set_ = self.set_._replace(value=set_) self.metadata_prefix = self.metadata_prefix._replace( value=metadata_prefix )
[docs] @classmethod def set_response_list_size(cls, size): """Configure response list size. :param size: Number of records in list response. :type size: int """ cls.response_list_size = size
@classmethod def _load_dict(cls, resumption_token): # resumption_token is a dict containing strings or NoneTypes cursor = int(resumption_token[cls.cursor.key]) + cls.response_list_size from_ = resumption_token.get(cls.from_.key, None) until = resumption_token.get(cls.until.key, None) complete_list_size = int(resumption_token[cls.complete_list_size.key]) metadata_prefix = resumption_token.get(cls.metadata_prefix.key) set_ = resumption_token.get(cls.set_.key, None) return cls(cursor=cursor, from_=from_, until=until, complete_list_size=complete_list_size, metadata_prefix=metadata_prefix, set_=set_)
[docs] @classmethod def load_resumption_token_argument(cls, argument): """Create new resumption token from arguments. Use to load resumption token from OAI request. :param argument: Resumption token argument. This comes from HTTP-request. :type argument: str :returns: New :obj:`ResumptionToken` """ result = {} for _arg in argument.split('&'): _key, _val = _arg.split(':', 1) if _val == 'None': _val = None result.update({_key: _val}) return cls._load_dict(result)
def _validate(self, size): # If size has changed between requests, the resumption token is no longer valid. return self.complete_list_size.value == size def _is_final_list(self): return self.cursor.value + self.response_list_size >= self.complete_list_size.value
[docs] def set_complete_list_size(self, size): """Set the number of records in the complete query response. :note: Resumption token is invalid if the number of records for the complete query response has been changed between requests. :param size: Number of records for the complete query response. :type size: int :raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken` if list sizes don't match. """ if not self.complete_list_size.value: self.complete_list_size = self.complete_list_size._replace( value=int(size) ) else: if not self._validate(size): _ctx = "Requested completeListSize: {}".format( self.complete_list_size.value ) _ctx += " Actual completeListSize: {}".format(size) raise oaierrors.BadResumptionToken(context=_ctx)
[docs] def get_encoded(self): """Get encoded Resumption Token. Returns uri-encoded representation of the resumption token if the list request sequence is ongoing. If the list request sequence is over, returns None. :returns: uri-encoded represenation of the token, or None :rtype: str or None """ if self._is_final_list(): return None attrs = [] for attr in [self.cursor, self.from_, self.until, self.set_, self.complete_list_size, self.metadata_prefix]: key, value = attr if value is not None and key in (self.until.key, self.from_.key): value = as_supported_datestring(value) attrs.append('{}:{}'.format(key, value)) _str = '&'.join(attrs) return encode_uri(_str)
[docs]class OAIResponse: """Represents the response. The response is stored in a dictionary which then gets submitted to XML-templates. Thus it is required that the dictionary built within this class is supported by the templates. :param request_url: Optional requested url. Leave empty to use base url. :type request_url: str or None """ repository_name = OAI_REPO_NAME base_url = '' protocol_version = OAI_PROTOCOL_VERSION admin_email = [] def __init__(self, request_url=None): if request_url is None: request_url = self.base_url self.records = [] self._oai_response = { 'response_metadata': { 'response_timestamp': datetime.datetime.utcnow().strftime(OAI_RESPONSE_TIMESTAMP_FORMAT), 'request_attrs': {}, 'resumption_token': None, 'request_url': request_url }, 'error': {}, 'data': {'records': self.records, 'base_url': self.base_url, 'admin_email': self.admin_email, 'protocol_version': self.protocol_version, 'repository_name': self.repository_name} }
[docs] @classmethod def set_repository_name(cls, name): """Set repository name. :param name: repository name. :type name: str """ cls.repository_name = name
[docs] @classmethod def set_base_url(cls, url): """Set base url :param url: url. :type url: str """ cls.base_url = url
[docs] @classmethod def set_admin_email(cls, email): """Set admin email address. :param email: Admin email(s) :type email: list """ cls.admin_email = email
[docs] @classmethod def set_protocol_version(cls, version): """Set protocol version :param version: OAI-PMH protocol version. :type version: float """ cls.protocol_version = version
def _update_response(self, parent_key, key, value, overwrite=False): existing_value = self._oai_response[parent_key].get(key) if existing_value and not overwrite: raise ValueError( "Response has value {}, will not overwrite".format( existing_value) ) self._oai_response[parent_key][key] = value def _set_response_data_key(self, key): if key not in self._oai_response['data']: self._update_response_data(key, []) def _update_response_error(self, *args, **kwargs): self._update_response('error', *args, **kwargs) def _update_response_metadata(self, *args, **kwargs): self._update_response('response_metadata', *args, **kwargs) def _update_response_data(self, *args, **kwargs): self._update_response('data', *args, **kwargs) def _append_response_data(self, key, value): self._set_response_data_key(key) self._oai_response['data'][key].append(value) def _extend_response_data(self, key, value): self._set_response_data_key(key) self._oai_response['data'][key].extend(value)
[docs] def add_record(self, record): """Add record to response :param record: OAIRecord to add. :type record: :obj:`kuha_oai_pmh_repo_handler.oai.records.OAIRecord` """ self.records.append(record)
[docs] def has_records(self): """Return True if response has records. :rtype: bool """ return bool(self.records)
[docs] def assert_single_record(self): """Assert the response has a single record. :raises: :exc:`AssertionError` if there is more or less than a single record. """ assert len(self.records) == 1
[docs] def set_earliest_datestamp(self, datestamp): """Set earliest datestamp. :param datestamp: datestamp in finest granularity ISO8601 :type datestamp: str """ self._update_response_data('earliest_datestamp', datestamp)
[docs] def set_deleted_records_declaration(self, declaration): """Set deleted records declaration. :param declaration: declare support for deleted records :type declaration: str """ self._update_response_data('deleted_records', declaration)
[docs] def set_granularity(self, granularity): """Set datestamp granularity. :param granularity: datestamp format for finest granularity supported by this repository. :type granularity: str """ self._update_response_data('granularity', granularity)
[docs] def set_metadata_formats(self, metadata_formats): """Set supported metadata formats. :param metadata_formats: supported metadata formats :type metadata_formats: list """ self._update_response_data('metadata_formats', metadata_formats)
[docs] def set_resumption_token(self, token): """Set resumption token. :param token: resumption token. :type token: :obj:`ResumptionToken` """ # Get necessary details from token. # This gets read in templates. token_dict = {'encoded': token.get_encoded(), 'complete_list_size': token.complete_list_size.value, 'cursor': token.cursor.value} self._update_response_metadata('resumption_token', token_dict)
[docs] def set_error(self, oai_error): """Set OAI-PMH error. :note: These are the errors that are defined in the OAI-protocol. Programming errors are handled separately in higher levels. :param oai_error: OAI error. :type oai_error: Subclass of :obj:`kuha_oai_pmh_repo_handler.oai.errors.OAIError` """ self._update_response_error('code', oai_error.get_code()) self._update_response_error('msg', oai_error.get_contextual_message()) if isinstance(oai_error, (oaierrors.BadArgument, oaierrors.BadVerb)): # When the response resulted in badVerb or badArgument, the # repository must return the request only, no attributes. self._update_response_metadata( 'request_attrs', {}, overwrite=True)
[docs] def add_sets_element(self, spec, name): """Add new sets element. :param spec: setSpec-sublement value. :type spec: str :param name: setName-sublement value. :type spec: str """ self._append_response_data('sets', {'setSpec': spec, 'setName': name})
[docs] def extend_sets_element(self, sets_list): """Add multiple sets elements :note: Parameter may come directly from :meth:`kuha_oai_pmh_repo_handler.oai.records.Sets.get_sets_list_from_records` :param sets_list: list of sets-elements. :type sets_list: list """ self._extend_response_data('sets', sets_list)
[docs] def set_request_params(self, oai_request): """Gather response parameters from request. :note: These are common response parameters that can be added to each response. :param oai_request: Current OAI-request. :type oai_request: :obj:`OAIRequest` """ md_format = oai_request.get_metadata_format() self._update_response_data( 'metadata_format', md_format ) request_attrs = {**{'metadataPrefix': md_format.get_prefix() if hasattr(md_format, 'get_prefix') else None}, **oai_request.request_attrs} # request_attrs is used as is in oai_pmh_template.xml self._update_response_metadata( 'request_attrs', request_attrs )
[docs] def get_response(self): """Get dictionary representation of the response. The response attributes are gathered in a dictionary that is to be parsed in the templates. :note: The dictionary will contain python objects, so it is not serializable to JSON or arbitrary formats as is. :returns: Response ready to pass to templates. :rtype: dict """ return self._oai_response
[docs]class OAIArguments: """Arguments of OAI-protocol. Store arguments. Convert datestamps string to datetime objects. Validate arguments for each verb. :param verb: requested OAI verb. :type verb: str :param resumption_token: requested resumption token. :type resumption_token: str :param identifier: requested identifier. :type identifier: str :param metadata_prefix: requested metadata prefix. :type metadata_prefix: str :param set_: requested set. :type set_: str :param from_: requested datestamp for from attribute. :type from_: str :param until: requested datestamp for until attribute. :type until: str :raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.OAIError` for OAI errors. """ verb_value_identify = 'Identify' verb_value_list_sets = 'ListSets' verb_value_list_metadata_formats = 'ListMetadataFormats' verb_value_list_identifiers = 'ListIdentifiers' verb_value_list_records = 'ListRecords' verb_value_get_record = 'GetRecord' #: Define supported verbs supported_verbs = [ verb_value_identify, verb_value_list_sets, verb_value_list_metadata_formats, verb_value_list_identifiers, verb_value_list_records, verb_value_get_record ] #: Define resumption token verbs resumable_verbs = [ verb_value_list_sets, verb_value_list_identifiers, verb_value_list_records ] #: Define supported metadata formats supported_metadata_formats = [ DCMetadataFormat, DDIMetadataFormat, CDCDDI25MetadataFormat, EAD3MetadataFormat ] def __init__(self, verb, resumption_token=None, identifier=None, metadata_prefix=None, set_=None, from_=None, until=None): if verb is None: raise oaierrors.MissingVerb() if verb not in self.supported_verbs: raise oaierrors.BadVerb(context=verb) self.verb = verb self.identifier = identifier self._requested_until = as_supported_datetime(until, raise_oai_exc=False) if until\ else None self._resumption_token = self._init_resumption_token( resumption_token, metadata_prefix, from_, until, set_) self.metadata_format = self._init_metadata_format(metadata_prefix) self._validate_verb_arguments() def _init_metadata_format(self, metadata_prefix_arg): metadata_format = None metadata_prefix = self._resumption_token.metadata_prefix.value if\ self.is_verb_resumable() else\ metadata_prefix_arg if metadata_prefix: for _format in self.iterate_supported_metadata_formats(): if _format.prefix == metadata_prefix: metadata_format = _format() break if self.verb in [self.verb_value_list_identifiers, self.verb_value_list_records, self.verb_value_get_record]: if not metadata_prefix: raise oaierrors.BadArgument( msg='Missing argument', context='\"metadataPrefix\"') if not metadata_format: raise oaierrors.CannotDisseminateFormat() return metadata_format def _init_resumption_token(self, resumption_token_arg, metadata_prefix_arg, from_arg, until_arg, set_): if not self.is_verb_resumable(): if resumption_token_arg: raise oaierrors.BadArgument( msg="Got resumptionToken-parameter for verb which does not support it", context=self.verb ) return ResumptionToken() if resumption_token_arg: resumption_token = ResumptionToken.load_resumption_token_argument( resumption_token_arg ) if metadata_prefix_arg is not None and\ resumption_token.metadata_prefix.value != metadata_prefix_arg: raise oaierrors.BadArgument( msg="metadataPrefix in resumptionToken differs from metadataPrefix-parameter.", context="%s != %s" % (resumption_token.metadata_prefix.value, metadata_prefix_arg) ) else: resumption_token = ResumptionToken(from_=from_arg, until=until_arg, metadata_prefix=metadata_prefix_arg, set_=set_) return resumption_token
[docs] def is_verb_resumable(self): """Is the requested verb a resumable list request? :returns: True if verb is resumable False otherwise :rtype: bool """ return self.verb in self.resumable_verbs
[docs] def get_verb(self): """Get requested OAI-verb. :returns: requested OAI-verb. :rtype: str """ return self.verb
[docs] def get_resumption_token(self): """Get resumption token for request. The resumption token is either submitted in the request or created automatically. :returns: resumption token. :rtype: :obj:`ResumptionToken` """ return self._resumption_token
[docs] def get_cursor(self): """Get resumptionToken cursor :returns: cursor value :rtype: str or None """ return self._resumption_token.cursor.value
[docs] def get_from(self): """Get from argument. :returns: from argument :rtype: str or None """ return self._resumption_token.from_.value
[docs] def get_until(self): """Get until argument. :returns: until argument. :rtype: str or None """ return self._resumption_token.until.value
[docs] def get_query_param_until(self): """Get until datestamp for querying. :note: This is until + smallest increment step. :returns: datestamp of query_param_until attribute. :rtype: :obj:`datetime.datetime` """ return self._resumption_token.query_param_until
def get_complete_list_size(self): return self._resumption_token.complete_list_size.value
[docs] def get_identifier(self): """Get requested identifier. :returns: requested identifier if any. :rtype: str or None """ return self.identifier
[docs] def get_local_identifier(self): """Get requested local identifier. Local identifier does not have prefixes for oai and namespace. It is used to identify records locally. :returns: Local identifier if applicable for the request. :rtype: str :raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.IdDoesNotExist` for invalid identifier. """ identifier = OAIHeaders.as_local_id(self.get_identifier()) if identifier is None: raise oaierrors.IdDoesNotExist(msg='Invalid identifier structure', context=self.get_identifier()) return identifier
[docs] def get_metadata_format(self): """Get requested metadata format. This is one of the supported metadata formats defined in :attr:`OAIArguments.supported_metadata_formats` :returns: requested metadata format if any. :rtype: Subclass of :obj:`kuha_oai_pmh_repo_handler.oai.metadata_formats.MetadataFormatBase` or None """ return self.metadata_format
[docs] def get_set(self): """Get requested set. :returns: requested set. :rtype: str """ return str(self._resumption_token.set_.value)
[docs] def is_selective(self): """Return True if request is selective. Selective refers to selective harvesting supported by OAI-PMH. :returns: True if selective, False if not. :rtype: bool """ return any([self.has_set(), self._resumption_token.from_.value, self._requested_until])
[docs] def has_set(self): """Return True if the request contained set. :rtype: bool """ return bool(self._resumption_token.set_.value)
def _validate_verb_arguments(self): if self.verb == self.verb_value_get_record and not self.identifier: raise oaierrors.BadArgument( msg='Missing argument', context='\"identifier\"') from_, until = self.get_from(), self._requested_until if from_ and until and from_ > until: raise oaierrors.BadArgument( msg='Invalid date range', context='from argument is later than until argument')
[docs] def iterate_supported_metadata_formats(self): """Generator for iterating throught supported metadata formats. :returns: Generator object for iterating supported metadata formats. """ for _format in self.supported_metadata_formats: yield _format
[docs]class OAIRequest(OAIArguments): """Represents the OAI request. Subclass of :class:`OAIArguments`. Defines keys for OAI arguments. """ query_key_verb = 'verb' query_key_resumption_token = 'resumptionToken' query_key_identifier = 'identifier' query_key_metadata_prefix = 'metadataPrefix' query_key_set = 'set' query_key_from = 'from' query_key_until = 'until' def __init__(self, args): """Create a request-object from requested arguments. :param args: List of 2-item tuples [(key, value]] containing request arguments. :type args: list :returns: :obj:`OAIRequest` object. """ args_keys = [item[0] for item in args] # BadArgument should be risen if the request includes illegal arguments unknown_keys = set(args_keys).difference({self.query_key_verb, self.query_key_resumption_token, self.query_key_identifier, self.query_key_metadata_prefix, self.query_key_set, self.query_key_from, self.query_key_until}) if unknown_keys != set(): raise oaierrors.BadArgument(msg="Illegal arguments", context=', '.join(unknown_keys)) # BadArgument should be risen if the request contains repeated arguments if len(set(args_keys)) != len(args_keys): raise oaierrors.BadArgument(msg="Repeated arguments", context=', '.join('{}={}'.format(k, v) for k, v in args)) # Convert to dict for convenience args = dict(args) super().__init__(args.get(OAIRequest.query_key_verb), args.get(OAIRequest.query_key_resumption_token), args.get(OAIRequest.query_key_identifier), args.get(OAIRequest.query_key_metadata_prefix), args.get(OAIRequest.query_key_set), args.get(OAIRequest.query_key_from), args.get(OAIRequest.query_key_until)) #: Request attributes untouched. self.request_attrs = args