Source code for kuha_oai_pmh_repo_handler.oai.protocol

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Defines the protocol
"""

import logging
import re
import datetime
import collections
from urllib.parse import quote

from kuha_oai_pmh_repo_handler.oai import errors as oaierrors
from kuha_oai_pmh_repo_handler.oai.constants import (
    OAI_RESPONSE_DATETIME_FORMAT,
    OAI_RESPONSE_DATE_FORMAT,
    OAI_REPO_NAME,
    OAI_PROTOCOL_VERSION,
    OAI_REC_NAMESPACE_IDENTIFIER,
    OAI_REC_IDENTIFIER_PREFIX,
    OAI_DATESTAMP_GRANULARITY_DATE,
    OAI_DATESTAMP_GRANULARITY_DATETIME,
    OAI_DEL_RECORDS_DECL_NO,
    OAI_DEL_RECORDS_DECL_TRANSIENT,
    OAI_DEL_RECORDS_DECL_PERSISTENT,
    REGEX_OAI_IDENTIFIER,
    REGEX_LOCAL_IDENTIFIER,
    REGEX_SETSPEC
)


_logger = logging.getLogger(__name__)

#: Validation regex for setspec
REGEX_VALID_SETSPEC = re.compile(REGEX_SETSPEC)


def _validate_with_regex(regex, candidate):
    try:
        return regex.fullmatch(candidate) is not None
    except TypeError as exc:
        # fullmatch() TypeError only tells what it expects:
        # "expected string or bytes-like object". Get more information
        # about the invalid type
        raise TypeError("Invalid type: '%s'. Was expecting a string."
                        % type(candidate,)) from exc


[docs]def is_valid_setspec(candidate): """Validates setSpec value. :param str candidate: setSpec value to validate. :returns: True if valid, False if not. :rtype: bool """ return _validate_with_regex(REGEX_VALID_SETSPEC, candidate)
[docs]def as_supported_datetime(datetime_str, raise_oai_exc=True): """Convert string representation of datetime to :obj:`datetime`. :note: If the `datetime_str` does not come from HTTP-Request, set `raise_oai_exc` to False. :note: The legitimate formats are YYYY-MM-DD and YYYY-MM-DDThh:mm:ssZ. :param datetime_str: datetime to convert :type datetime_str: str :param raise_oai_exc: Catch datetime.strptime errors and reraise as oai-error. :type raise_oai_exc: bool :returns: converted datetime. :rtype: :obj:`datetime` :raises: :exc:`kuha_oai_pmh_repo_hander.oai.errors.BadArgument` for invalid format if `raise_oai_exc` is True. """ try: fmt = {10: OAI_RESPONSE_DATE_FORMAT, 20: OAI_RESPONSE_DATETIME_FORMAT}[len(datetime_str)] return datetime.datetime.strptime(datetime_str, fmt) except (KeyError, ValueError) as exc: if raise_oai_exc: # Mask as BadArgument to notify requester throught OAI error condition. raise oaierrors.BadArgument( "Invalid datetime format: {}".format(datetime_str) ) from exc # otherwise its a programming error. raise
[docs]def as_supported_datestring(datetime_obj, fmt=OAI_RESPONSE_DATETIME_FORMAT): """Convert :obj:`datetime` to string representation. The target format is YYYY-MM-DDThh:mm:ssZ :param datetime_obj: datetime to convert. :type datetime_obj: :obj:`datetime` :returns: string representation of datetime_obj. :rtype: str """ return datetime_obj.strftime(fmt)
class Headers: #: Namespace identifier used to construct an OAI-Identifier #: Use None if wish to use local identifiers in OAI-responses. namespace_identifier = OAI_REC_NAMESPACE_IDENTIFIER #: Prefix for all identifiers when constructing an OAI-Identifier. identifier_oai_prefix = OAI_REC_IDENTIFIER_PREFIX identifier_separator = ':' #: Validation regex for OAI-Identifier valid_oai_identifier = re.compile(REGEX_OAI_IDENTIFIER) #: Validation regex for local identifier (a subset of oai-identifier) valid_identifier = re.compile(REGEX_LOCAL_IDENTIFIER) def __init__(self, identifier, datestamp, deleted): self.identifier = self._create_identifier(identifier) self.datestamp = datestamp self.deleted = deleted self.set_specs = {} @classmethod def set_namespace_identifier(cls, ns_id): """Set namespace identifier for all instances. :param ns_id: namespace identifier :type ns_id: str """ cls.namespace_identifier = ns_id @classmethod def _is_valid_oai_identifier(cls, candidate): return _validate_with_regex(cls.valid_oai_identifier, candidate) @classmethod def _is_valid_identifier(cls, candidate): return _validate_with_regex(cls.valid_identifier, candidate) def _create_identifier(self, identifier): """Set identifier. If namespace_identifier is not None, will build an OAI-Identifier. The identifier will be validated and :exc:`ValueError` will be raised if the validation fails. :param str identifier: Record's local identifier. :raises: :exc:`ValueError` if validation fails. """ if self.namespace_identifier: candidate = self.identifier_separator.join([self.identifier_oai_prefix, self.namespace_identifier, identifier]) if not self._is_valid_oai_identifier(candidate): raise ValueError("Invalid OAI-Identifier: '{}'".format(candidate)) return candidate if not self._is_valid_identifier(identifier): raise ValueError("Invalid identifier: '{}'".format(identifier)) return identifier def iterate_set_specs(self): """Iterate over setSpec key-value pairs. :returns: Generator object for iterating over setSpec key-value pairs. :rtype: Generator """ for spec_key, specs in self.set_specs.items(): for spec in specs: yield spec_key, spec def add_set_spec(self, key, value=None): if value is not None and not is_valid_setspec(value): _logger.warning("Discarding invalid setSpec value: '%s'", value) return if key not in self.set_specs: self.set_specs[key] = [value] return self.set_specs[key].append(value) @classmethod def as_local_id(cls, identifier): """Get local identifier part of OAI-Identifier. :param identifier: records identifier. :type identifier: str :returns: local identifier or None for invalid identifier. :rtype: str or None """ if cls.namespace_identifier: if not cls._is_valid_oai_identifier(identifier): return None discard = cls.identifier_oai_prefix +\ cls.identifier_separator +\ cls.namespace_identifier +\ cls.identifier_separator return identifier.replace(discard, '', 1) if not cls._is_valid_identifier(identifier): return None return identifier
[docs]class Response: """Represents the response. The response is stored in a dictionary which then gets submitted to XML-templates. Thus it is required that the dictionary built within this class is supported by the templates. :param str request_url: Requested url. """ repository_name = OAI_REPO_NAME base_url = '' protocol_version = OAI_PROTOCOL_VERSION admin_email = [] def __init__(self, request_url=None): request_url = request_url or self.base_url self._resumption_token = None self.records = [] self.context = { 'response_metadata': { 'response_timestamp': datetime.datetime.utcnow().strftime(OAI_RESPONSE_DATETIME_FORMAT), 'request_attrs': {}, 'resumption_token': None, 'request_url': request_url }, 'error': {}, 'data': {'base_url': self.base_url, 'admin_email': self.admin_email, 'protocol_version': self.protocol_version, 'repository_name': self.repository_name}, 'metadata': {'formats': []} }
[docs] @classmethod def set_repository_name(cls, name): """Set repository name. :param name: repository name. :type name: str """ cls.repository_name = name
[docs] @classmethod def set_base_url(cls, url): """Set base url :param url: url. :type url: str """ cls.base_url = url
[docs] @classmethod def set_admin_email(cls, email): """Set admin email address. :param email: Admin email(s) :type email: list """ cls.admin_email = email
[docs] @classmethod def set_protocol_version(cls, version): """Set protocol version :param version: OAI-PMH protocol version. :type version: float """ cls.protocol_version = version
def set_resumption_token(self, token): if self._resumption_token: raise ValueError("ResumptionToken already set") self._resumption_token = token def _update_response(self, parent_key, key, value, overwrite=False): existing_value = self.context[parent_key].get(key) if existing_value and not overwrite: raise ValueError( "Response has value {}, will not overwrite".format( existing_value) ) self.context[parent_key][key] = value def _update_response_metadata(self, key, value, overwrite=False): self._update_response('response_metadata', key, value, overwrite=overwrite) def _update_data(self, key, value): self._update_response('data', key, value) def _update_metadata(self, key, value): self._update_response('metadata', key, value) def _load_resumption_token(self): """Set resumption token. """ token_dict = {'encoded': self._resumption_token.encoded, 'complete_list_size': self._resumption_token.complete_list_size, 'cursor': self._resumption_token.cursor} self._update_response_metadata('resumption_token', token_dict)
[docs] async def identify_response(self, earliest_datestamp=None, deleted_records=OAI_DEL_RECORDS_DECL_NO, granularity=OAI_DATESTAMP_GRANULARITY_DATETIME): """Prepare and return context for OAI verb Identify. :param :obj:`datetime.datetime` earliest_datestamp: Repository earliest datestamp. None if docstore contains no records. :param str deleted_records: Repository support for deleter records. Must be one of 'no', 'transient', 'persistent'. Defaults to 'no'. :param str granularity: Datestamp granularity. Must be 'YYYY-MM-DD' or 'YYYY-MM-DDThh:mm:ssZ'. Defaults to 'YYYY-MM-DDThh:mm:ssZ' :returns: Response context :rtype: dict """ _valid_del_decls = [OAI_DEL_RECORDS_DECL_NO, OAI_DEL_RECORDS_DECL_TRANSIENT, OAI_DEL_RECORDS_DECL_PERSISTENT] granularitys_fmts = {OAI_DATESTAMP_GRANULARITY_DATE: OAI_RESPONSE_DATE_FORMAT, OAI_DATESTAMP_GRANULARITY_DATETIME: OAI_RESPONSE_DATETIME_FORMAT} _valid_granularitys = granularitys_fmts.keys() if deleted_records not in _valid_del_decls: raise ValueError("Invalid deleted records declaration '%s'. Supported values are %s" % (deleted_records, ', '.join(("'%s'" % (x,) for x in _valid_del_decls)))) if granularity not in _valid_granularitys: raise ValueError("Invalid datestamp granularity '%s'. Supported values are %s" % (granularity, ', '.join(("'%s'" % (x,) for x in _valid_granularitys)))) fmt = granularitys_fmts[granularity] if earliest_datestamp is not None: earliest_datestamp = as_supported_datestring(earliest_datestamp, fmt=fmt) self._update_data('earliest_datestamp', earliest_datestamp) self._update_data('deleted_records', deleted_records) self._update_data('granularity', granularity) return self.context
async def get_record_response(self): assert len(self.records) == 1 self._update_metadata('record', self.records.pop()) return self.context async def list_records_response(self): self._update_metadata('records', self.records) self._load_resumption_token() return self.context async def list_identifiers_response(self): self._update_metadata('records', self.records) self._load_resumption_token() return self.context async def list_sets_response(self): assert 'sets' in self.context['data'] return self.context async def set_metadata_format(self, schema, namespace): self._update_metadata('schema', schema) self._update_metadata('namespace', namespace)
[docs] async def add_available_metadata_format(self, prefix, schema, namespace): """Set supported metadata format. """ self.context['metadata']['formats'].append({ 'prefix': prefix, 'schema': schema, 'namespace': namespace})
[docs] def set_error(self, oai_error): """Set OAI-PMH error. :note: These are the errors that are defined in the OAI-protocol. Programming errors are handled separately in higher levels. :param oai_error: OAI error. :type oai_error: Subclass of :obj:`kuha_oai_pmh_repo_handler.oai.errors.OAIError` """ self._update_response('error', 'code', oai_error.get_code()) self._update_response('error', 'msg', oai_error.get_contextual_message()) if isinstance(oai_error, (oaierrors.BadArgument, oaierrors.BadVerb)): # When the response resulted in badVerb or badArgument, the # repository must return the request only, no attributes. self._update_response_metadata( 'request_attrs', {}, overwrite=True)
[docs] async def add_sets_element(self, spec, name=None, description=None): """Add sets elements. :param str spec: setSpec-sublement value. :param str or None name: setName-sublement value. :param str or None description: setDescription-subelement value. """ if not is_valid_setspec(spec): _logger.warning("Discarding invalid setSpec value: '%s'", spec) return if 'sets' in self.context['data']: self.context['data']['sets'].append({ 'setSpec': spec, 'setName': name, 'setDescription': description}) else: self._update_data('sets', [{ 'setSpec': spec, 'setName': name, 'setDescription': description}])
[docs] def set_request_args(self, args): """Request arguments are added to each succesfull OAI response. If a request would result in OAI Error, these are not added to response. These are read in oai_pmh_template.xml. :param list args: List of 2-item tuples [(key, value]] containing request arguments. """ self._update_response_metadata('request_attrs', dict(args))
[docs]class ResumptionToken: """Class representing OAI-PMH Resumption Token. Holds attributes of the resumption token. Creates a new resumption token with initial values or takes a dictionary of resumption token arguments. Validates the token based on records list size. If the list size has been changed between requests asserts that the token is invalid by raising a :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken` exception. :note: Since :attr:`OAIArgument.set_` is not supported by resumption token, changing the requested set may result in falsely valid resumption token. But changing the requested set in the middle of a list request sequence should be seen as bad behaviour by the requester/harvester. :param int cursor: Optional parameter for the current position in list. :param str from_: Optional parameter for from datestamp. Converted to :obj:`datetime.datetime` on init. :param str until: Optional parameter for until datestamp. Converted to :obj:`datetime.datetime` on init. :param int complete_list_size: Optional parameter for the umber of records in the complete list. :param str metadata_prefix: Optional parameter for the requested metadata prefix. :param str set_: Optional parameter containing requested set information. """ #: Store ResumptionToken attribute keys and values. Attribute = collections.namedtuple('Attribute', ['key', 'value']) _cursor = Attribute('cursor', 0) _from = Attribute('from', None) _until = Attribute('until', None) _set = Attribute('set', None) _complete_list_size = Attribute('completeListSize', None) _metadata_prefix = Attribute('metadataPrefix', None) def __init__(self, cursor=0, from_=None, until=None, complete_list_size=None, metadata_prefix=None, set_=None, from_req=False): self._cursor = self._cursor._replace(value=int(cursor)) self.from_str = None if from_: # Convert to datetime.datetime self._from = self._from._replace( value=as_supported_datetime(from_) ) self.from_str = from_ if until: # Convert to datetime.datetime self._until = self._until._replace( value=as_supported_datetime(until) ) self.until_str = until else: utcnow = datetime.datetime.utcnow() # Remove microseconds until = utcnow - datetime.timedelta(microseconds=utcnow.microsecond) self._until = self._until._replace(value=until) self.until_str = as_supported_datestring(until) if complete_list_size: self._complete_list_size = self._complete_list_size._replace( value=int(complete_list_size) ) if set_: self._set = self._set._replace(value=set_) self._metadata_prefix = self._metadata_prefix._replace( value=metadata_prefix ) # request_arg is True if the resumption_token was created from a # HTTP Request. self._from_req = from_req # Response list size is used to calculate the final response & cursor position self._response_list_size = None def _is_final_list(self): if self.response_list_size is None: raise ValueError("Unable to determine final list: response_list_size is None") return self._cursor.value + self.response_list_size >= self._complete_list_size.value @classmethod def _load_dict(cls, dct): # dct is a dict containing strings or NoneTypes cursor = int(dct[cls._cursor.key]) from_ = dct.get(cls._from.key, None) until = dct.get(cls._until.key, None) complete_list_size = int(dct[cls._complete_list_size.key]) metadata_prefix = dct.get(cls._metadata_prefix.key) set_ = dct.get(cls._set.key, None) token = cls(cursor=cursor, from_=from_, until=until, complete_list_size=complete_list_size, metadata_prefix=metadata_prefix, set_=set_, from_req=True) return token
[docs] @classmethod def load_arg(cls, argument): """Create new resumption token from request arguments. Use to load resumption token from OAI request. :param str argument: Resumption token argument. This comes from HTTP-request. :returns: New :obj:`ResumptionToken` """ result = {} for _arg in argument.split('&'): _key, _val = _arg.split(':', 1) if _val == 'None': _val = None result.update({_key: _val}) return cls._load_dict(result)
@property def response_list_size(self): return self._response_list_size @response_list_size.setter def response_list_size(self, size): if self._from_req: self._cursor = self._cursor._replace(value=int(self._cursor.value + size)) self._response_list_size = size @property def cursor(self): if self._from_req and self.response_list_size is None: raise ValueError("Unable to determine cursor: response_list_size is None") return self._cursor.value @property def from_(self): return self._from.value @property def until(self): return self._until.value @property def set_(self): return self._set.value @property def metadata_prefix(self): return self._metadata_prefix.value @property def complete_list_size(self): return self._complete_list_size.value @complete_list_size.setter def complete_list_size(self, size): """Set the number of records in the complete query response. :note: Resumption token is invalid if the number of records for the complete query response has been changed between requests. :param int size: Number of records for the complete query response. :raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken` if list sizes don't match. """ if not self._complete_list_size.value: self._complete_list_size = self._complete_list_size._replace(value=int(size)) elif not self._complete_list_size.value == size: # If size has changed between requests, the resumption token is no longer valid. _ctx = "Requested completeListSize: {}".format(self._complete_list_size.value) _ctx += " Actual completeListSize: {}".format(size) raise oaierrors.BadResumptionToken(context=_ctx) @property def encoded(self): """Get encoded Resumption Token. Returns uri-encoded representation of the resumption token if the list request sequence is ongoing. If the list request sequence is over, returns None. :returns: uri-encoded represenation of the token, or None :rtype: str or None """ if self._is_final_list(): return None attrs = [] for attr in [self._cursor, self._from, self._until, self._set, self._complete_list_size, self._metadata_prefix]: key, value = attr if value is not None and key in (self._until.key, self._from.key): value = as_supported_datestring(value) attrs.append('{}:{}'.format(key, value)) _str = '&'.join(attrs) return quote(_str)
[docs]class Arguments: """Arguments of OAI-protocol. Store arguments. Convert datestamps string to datetime objects. Validate arguments for each verb. :param str verb: requested OAI verb. :param str resumption_token: requested resumption token. :param str identifier: requested identifier. :param str metadata_prefix: requested metadata prefix. :param str set_: requested set. :param str from_: requested datestamp for from attribute. :param str until: requested datestamp for until attribute. :raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.OAIError` for OAI errors. """ verb_value_identify = 'Identify' verb_value_list_sets = 'ListSets' verb_value_list_metadata_formats = 'ListMetadataFormats' verb_value_list_identifiers = 'ListIdentifiers' verb_value_list_records = 'ListRecords' verb_value_get_record = 'GetRecord' query_key_verb = 'verb' query_key_resumption_token = 'resumptionToken' query_key_identifier = 'identifier' query_key_metadata_prefix = 'metadataPrefix' query_key_set = 'set' query_key_from = 'from' query_key_until = 'until' #: Define supported verbs supported_verbs = [ verb_value_identify, verb_value_list_sets, verb_value_list_metadata_formats, verb_value_list_identifiers, verb_value_list_records, verb_value_get_record ] #: Define resumption token verbs resumable_verbs = [ verb_value_list_sets, verb_value_list_identifiers, verb_value_list_records ] def __init__(self, verb, resumption_token=None, identifier=None, metadata_prefix=None, set_=None, from_=None, until=None): if verb is None: raise oaierrors.MissingVerb() if verb not in self.supported_verbs: raise oaierrors.BadVerb(context=verb) self.verb = verb self.identifier = identifier self._requested_until = as_supported_datetime(until, raise_oai_exc=False) if until\ else None self.resumption_token = self._init_resumption_token( resumption_token, metadata_prefix, from_, until, set_) self.metadata_prefix = self._get_metadata_prefix(metadata_prefix) self._validate_verb_arguments() @classmethod def _load_dict(cls, dct): return cls(dct.get(cls.query_key_verb), resumption_token=dct.get(cls.query_key_resumption_token), identifier=dct.get(cls.query_key_identifier), metadata_prefix=dct.get(cls.query_key_metadata_prefix), set_=dct.get(cls.query_key_set), from_=dct.get(cls.query_key_from), until=dct.get(cls.query_key_until)) @classmethod def load_args(cls, args): args_keys = [item[0] for item in args] # BadArgument should be risen if the request includes illegal arguments unknown_keys = set(args_keys).difference({ Arguments.query_key_verb, Arguments.query_key_resumption_token, Arguments.query_key_identifier, Arguments.query_key_metadata_prefix, Arguments.query_key_set, Arguments.query_key_from, Arguments.query_key_until}) if unknown_keys != set(): raise oaierrors.BadArgument(msg="Illegal arguments", context=', '.join(unknown_keys)) # BadArgument should be risen if the request contains repeated arguments if len(set(args_keys)) != len(args_keys): raise oaierrors.BadArgument(msg="Repeated arguments", context=', '.join( '{}={}'.format(k, v) for k, v in args)) # Convert to dict for convenience args = dict(args) return cls._load_dict(args) def _get_metadata_prefix(self, metadata_prefix_arg): metadata_prefix = self.resumption_token.metadata_prefix if\ self.is_verb_resumable() else\ metadata_prefix_arg if self.verb in [self.verb_value_list_identifiers, self.verb_value_list_records, self.verb_value_get_record]: if not metadata_prefix: raise oaierrors.BadArgument( msg='Missing argument', context='\"metadataPrefix\"') return metadata_prefix def _init_resumption_token(self, resumption_token_arg, metadata_prefix_arg, from_arg, until_arg, set_): if not self.is_verb_resumable(): if resumption_token_arg: raise oaierrors.BadArgument( msg="Got resumptionToken-parameter for verb which does not support it", context=self.verb ) return None if resumption_token_arg: resumption_token = ResumptionToken.load_arg(resumption_token_arg) if metadata_prefix_arg is not None and\ resumption_token.metadata_prefix != metadata_prefix_arg: raise oaierrors.BadArgument( msg="metadataPrefix in resumptionToken differs from metadataPrefix-parameter.", context="%s != %s" % (resumption_token.metadata_prefix, metadata_prefix_arg) ) else: resumption_token = ResumptionToken(from_=from_arg, until=until_arg, metadata_prefix=metadata_prefix_arg, set_=set_) return resumption_token
[docs] def is_verb_resumable(self): """Is the requested verb a resumable list request? :returns: True if verb is resumable False otherwise :rtype: bool """ return self.verb in self.resumable_verbs
[docs] def get_local_identifier(self): """Get requested local identifier. Local identifier does not have prefixes for oai and namespace. It is used to identify records locally. :returns: Local identifier if applicable for the request. :rtype: str :raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.IdDoesNotExist` for invalid identifier. """ identifier = Headers.as_local_id(self.identifier) if identifier is None: raise oaierrors.IdDoesNotExist(msg='Invalid identifier structure', context=self.identifier) return identifier
[docs] def is_selective(self): """Return True if request is selective. Selective refers to selective harvesting supported by OAI-PMH. :returns: True if selective, False if not. :rtype: bool """ return any([self.resumption_token.set_, self.resumption_token.from_, self._requested_until])
def _validate_verb_arguments(self): if self.verb == self.verb_value_get_record and not self.identifier: raise oaierrors.BadArgument( msg='Missing argument', context='\"identifier\"') from_ = self.resumption_token.from_ if self.is_verb_resumable() else None until = self._requested_until if from_ and until and from_ > until: raise oaierrors.BadArgument( msg='Invalid date range', context='from argument is later than until argument')