#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Defines the protocol
"""
import logging
import re
import datetime
import collections
from urllib.parse import quote
from kuha_oai_pmh_repo_handler.oai import errors as oaierrors
from kuha_oai_pmh_repo_handler.oai.constants import (
OAI_RESPONSE_DATETIME_FORMAT,
OAI_RESPONSE_DATE_FORMAT,
OAI_REPO_NAME,
OAI_PROTOCOL_VERSION,
OAI_REC_NAMESPACE_IDENTIFIER,
OAI_REC_IDENTIFIER_PREFIX,
OAI_DATESTAMP_GRANULARITY_DATE,
OAI_DATESTAMP_GRANULARITY_DATETIME,
OAI_DEL_RECORDS_DECL_NO,
OAI_DEL_RECORDS_DECL_TRANSIENT,
OAI_DEL_RECORDS_DECL_PERSISTENT,
REGEX_OAI_IDENTIFIER,
REGEX_LOCAL_IDENTIFIER,
REGEX_SETSPEC
)
_logger = logging.getLogger(__name__)
#: Validation regex for setspec
REGEX_VALID_SETSPEC = re.compile(REGEX_SETSPEC)
EXC_MSG_INVALID_RESUMPTIONTOKEN = "Invalid resumptionToken"
def _validate_with_regex(regex, candidate):
try:
return regex.fullmatch(candidate) is not None
except TypeError as exc:
# fullmatch() TypeError only tells what it expects:
# "expected string or bytes-like object". Get more information
# about the invalid type
raise TypeError("Invalid type: '%s'. Was expecting a string."
% type(candidate,)) from exc
[docs]def is_valid_setspec(candidate):
"""Validates setSpec value.
:param str candidate: setSpec value to validate.
:returns: True if valid, False if not.
:rtype: bool
"""
return _validate_with_regex(REGEX_VALID_SETSPEC, candidate)
[docs]def as_supported_datetime(datetime_str, raise_oai_exc=True):
"""Convert string representation of datetime to :obj:`datetime`.
:note: If the `datetime_str` does not come from HTTP-Request,
set `raise_oai_exc` to False.
:note: The legitimate formats are YYYY-MM-DD and YYYY-MM-DDThh:mm:ssZ.
:param datetime_str: datetime to convert
:type datetime_str: str
:param raise_oai_exc: Catch datetime.strptime errors and reraise as oai-error.
:type raise_oai_exc: bool
:returns: converted datetime.
:rtype: :obj:`datetime`
:raises: :exc:`kuha_oai_pmh_repo_hander.oai.errors.BadArgument` for invalid format if
`raise_oai_exc` is True.
"""
try:
fmt = {10: OAI_RESPONSE_DATE_FORMAT,
20: OAI_RESPONSE_DATETIME_FORMAT}[len(datetime_str)]
return datetime.datetime.strptime(datetime_str, fmt)
except (KeyError, ValueError) as exc:
if raise_oai_exc:
# Mask as BadArgument to notify requester throught OAI error condition.
raise oaierrors.BadArgument(
"Invalid datetime format: {}".format(datetime_str)
) from exc
# otherwise its a programming error.
raise
[docs]def as_supported_datestring(datetime_obj, fmt=OAI_RESPONSE_DATETIME_FORMAT):
"""Convert :obj:`datetime` to string representation.
The target format is YYYY-MM-DDThh:mm:ssZ
:param datetime_obj: datetime to convert.
:type datetime_obj: :obj:`datetime`
:returns: string representation of datetime_obj.
:rtype: str
"""
return datetime_obj.strftime(fmt)
class Headers:
#: Namespace identifier used to construct an OAI-Identifier
#: Use None if wish to use local identifiers in OAI-responses.
namespace_identifier = OAI_REC_NAMESPACE_IDENTIFIER
#: Prefix for all identifiers when constructing an OAI-Identifier.
identifier_oai_prefix = OAI_REC_IDENTIFIER_PREFIX
identifier_separator = ':'
#: Validation regex for OAI-Identifier
valid_oai_identifier = re.compile(REGEX_OAI_IDENTIFIER)
#: Validation regex for local identifier (a subset of oai-identifier)
valid_identifier = re.compile(REGEX_LOCAL_IDENTIFIER)
def __init__(self, identifier, datestamp, deleted):
self.identifier = self._create_identifier(identifier)
self.datestamp = datestamp
self.deleted = deleted
self.set_specs = {}
@classmethod
def set_namespace_identifier(cls, ns_id):
"""Set namespace identifier for all instances.
:param ns_id: namespace identifier
:type ns_id: str
"""
cls.namespace_identifier = ns_id
@classmethod
def _is_valid_oai_identifier(cls, candidate):
return _validate_with_regex(cls.valid_oai_identifier, candidate)
@classmethod
def _is_valid_identifier(cls, candidate):
return _validate_with_regex(cls.valid_identifier, candidate)
def _create_identifier(self, identifier):
"""Set identifier.
If namespace_identifier is not None, will build an OAI-Identifier.
The identifier will be validated and :exc:`ValueError` will be raised
if the validation fails.
:param str identifier: Record's local identifier.
:raises: :exc:`ValueError` if validation fails.
"""
if self.namespace_identifier:
candidate = self.identifier_separator.join([self.identifier_oai_prefix,
self.namespace_identifier,
identifier])
if not self._is_valid_oai_identifier(candidate):
raise ValueError("Invalid OAI-Identifier: '{}'".format(candidate))
return candidate
if not self._is_valid_identifier(identifier):
raise ValueError("Invalid identifier: '{}'".format(identifier))
return identifier
def iterate_set_specs(self):
"""Iterate over setSpec key-value pairs.
:returns: Generator object for iterating over setSpec
key-value pairs.
:rtype: Generator
"""
for spec_key, specs in self.set_specs.items():
for spec in specs:
yield spec_key, spec
def add_set_spec(self, key, value=None):
if value is not None and not is_valid_setspec(value):
_logger.warning("Discarding invalid setSpec value: '%s'", value)
return
if key not in self.set_specs:
self.set_specs[key] = [value]
return
self.set_specs[key].append(value)
@classmethod
def as_local_id(cls, identifier):
"""Get local identifier part of OAI-Identifier.
:param identifier: records identifier.
:type identifier: str
:returns: local identifier or None for invalid identifier.
:rtype: str or None
"""
if cls.namespace_identifier:
if not cls._is_valid_oai_identifier(identifier):
return None
discard = cls.identifier_oai_prefix +\
cls.identifier_separator +\
cls.namespace_identifier +\
cls.identifier_separator
return identifier.replace(discard, '', 1)
if not cls._is_valid_identifier(identifier):
return None
return identifier
[docs]class Response:
"""Represents the response.
The response is stored in a dictionary which then
gets submitted to XML-templates. Thus it is required that
the dictionary built within this class is supported
by the templates.
:param str request_url: Requested url.
"""
repository_name = OAI_REPO_NAME
base_url = ''
protocol_version = OAI_PROTOCOL_VERSION
admin_email = []
def __init__(self, request_url=None):
request_url = request_url or self.base_url
self._resumption_token = None
self.records = []
self.context = {
'response_metadata': {
'response_timestamp': datetime.datetime.now(datetime.timezone.utc).strftime(
OAI_RESPONSE_DATETIME_FORMAT),
'request_attrs': {},
'resumption_token': None,
'request_url': request_url
},
'error': {},
'data': {'base_url': self.base_url,
'admin_email': self.admin_email,
'protocol_version': self.protocol_version,
'repository_name': self.repository_name},
'metadata': {'formats': []}
}
[docs] @classmethod
def set_repository_name(cls, name):
"""Set repository name.
:param name: repository name.
:type name: str
"""
cls.repository_name = name
[docs] @classmethod
def set_base_url(cls, url):
"""Set base url
:param url: url.
:type url: str
"""
cls.base_url = url
[docs] @classmethod
def set_admin_email(cls, email):
"""Set admin email address.
:param email: Admin email(s)
:type email: list
"""
cls.admin_email = email
[docs] @classmethod
def set_protocol_version(cls, version):
"""Set protocol version
:param version: OAI-PMH protocol version.
:type version: float
"""
cls.protocol_version = version
def set_resumption_token(self, token):
if self._resumption_token:
raise ValueError("ResumptionToken already set")
self._resumption_token = token
def _update_response(self, parent_key, key, value, overwrite=False):
existing_value = self.context[parent_key].get(key)
if existing_value and not overwrite:
raise ValueError(
"Response has value {}, will not overwrite".format(
existing_value)
)
self.context[parent_key][key] = value
def _update_response_metadata(self, key, value, overwrite=False):
self._update_response('response_metadata', key, value, overwrite=overwrite)
def _update_data(self, key, value):
self._update_response('data', key, value)
def _update_metadata(self, key, value):
self._update_response('metadata', key, value)
def _load_resumption_token(self):
"""Set resumption token.
"""
token_dict = {'encoded': self._resumption_token.encoded,
'complete_list_size': self._resumption_token.complete_list_size,
'cursor': self._resumption_token.cursor}
self._update_response_metadata('resumption_token', token_dict)
[docs] async def identify_response(self, earliest_datestamp=None,
deleted_records=OAI_DEL_RECORDS_DECL_NO,
granularity=OAI_DATESTAMP_GRANULARITY_DATETIME):
"""Prepare and return context for OAI verb Identify.
:param :obj:`datetime.datetime` earliest_datestamp: Repository earliest datestamp.
None if docstore contains no records.
:param str deleted_records: Repository support for deleter records. Must be one of 'no', 'transient',
'persistent'. Defaults to 'no'.
:param str granularity: Datestamp granularity. Must be 'YYYY-MM-DD' or 'YYYY-MM-DDThh:mm:ssZ'.
Defaults to 'YYYY-MM-DDThh:mm:ssZ'
:returns: Response context
:rtype: dict
"""
_valid_del_decls = [OAI_DEL_RECORDS_DECL_NO, OAI_DEL_RECORDS_DECL_TRANSIENT, OAI_DEL_RECORDS_DECL_PERSISTENT]
granularitys_fmts = {OAI_DATESTAMP_GRANULARITY_DATE: OAI_RESPONSE_DATE_FORMAT,
OAI_DATESTAMP_GRANULARITY_DATETIME: OAI_RESPONSE_DATETIME_FORMAT}
_valid_granularitys = granularitys_fmts.keys()
if deleted_records not in _valid_del_decls:
raise ValueError("Invalid deleted records declaration '%s'. Supported values are %s"
% (deleted_records, ', '.join(("'%s'" % (x,) for x in _valid_del_decls))))
if granularity not in _valid_granularitys:
raise ValueError("Invalid datestamp granularity '%s'. Supported values are %s"
% (granularity, ', '.join(("'%s'" % (x,) for x in _valid_granularitys))))
fmt = granularitys_fmts[granularity]
if earliest_datestamp is not None:
earliest_datestamp = as_supported_datestring(earliest_datestamp, fmt=fmt)
self._update_data('earliest_datestamp', earliest_datestamp)
self._update_data('deleted_records', deleted_records)
self._update_data('granularity', granularity)
return self.context
async def get_record_response(self):
assert len(self.records) == 1
self._update_metadata('record', self.records.pop())
return self.context
async def list_records_response(self):
self._update_metadata('records', self.records)
self._load_resumption_token()
return self.context
async def list_identifiers_response(self):
self._update_metadata('records', self.records)
self._load_resumption_token()
return self.context
async def list_sets_response(self):
assert 'sets' in self.context['data']
return self.context
async def set_metadata_format(self, schema, namespace):
self._update_metadata('schema', schema)
self._update_metadata('namespace', namespace)
[docs] def set_error(self, oai_error):
"""Set OAI-PMH error.
:note: These are the errors that are defined in the OAI-protocol.
Programming errors are handled separately in higher levels.
:param oai_error: OAI error.
:type oai_error: Subclass of :obj:`kuha_oai_pmh_repo_handler.oai.errors.OAIError`
"""
self._update_response('error', 'code', oai_error.get_code())
self._update_response('error', 'msg', oai_error.get_contextual_message())
if isinstance(oai_error, (oaierrors.BadArgument, oaierrors.BadVerb)):
# When the response resulted in badVerb or badArgument, the
# repository must return the request only, no attributes.
self._update_response_metadata(
'request_attrs', {}, overwrite=True)
[docs] async def add_sets_element(self, spec, name=None, description=None):
"""Add sets elements.
:param str spec: setSpec-sublement value.
:param str or None name: setName-sublement value.
:param str or None description: setDescription-subelement value.
"""
if not is_valid_setspec(spec):
_logger.warning("Discarding invalid setSpec value: '%s'", spec)
return
if 'sets' in self.context['data']:
self.context['data']['sets'].append({
'setSpec': spec,
'setName': name,
'setDescription': description})
else:
self._update_data('sets', [{
'setSpec': spec,
'setName': name,
'setDescription': description}])
[docs] def set_request_args(self, args):
"""Request arguments are added to each succesfull OAI response.
If a request would result in OAI Error, these are not added to response.
These are read in oai_pmh_template.xml.
:param list args: List of 2-item tuples [(key, value]] containing
request arguments.
"""
self._update_response_metadata('request_attrs', dict(args))
[docs]class ResumptionToken:
"""Class representing OAI-PMH Resumption Token.
Holds attributes of the resumption token. Creates a new
resumption token with initial values or takes a
dictionary of resumption token arguments. Validates the
token based on records list size. If the list
size has been changed between requests asserts that the
token is invalid by raising a :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken`
exception.
:note: Since :attr:`OAIArgument.set_` is not supported by resumption token,
changing the requested set may result in falsely valid
resumption token. But changing the requested set in the
middle of a list request sequence should be seen as bad
behaviour by the requester/harvester.
:param int cursor: Optional parameter for the current position in list.
:param str from_: Optional parameter for from datestamp.
Converted to :obj:`datetime.datetime` on init.
:param str until: Optional parameter for until datestamp.
Converted to :obj:`datetime.datetime` on init.
:param int complete_list_size: Optional parameter for the umber of records in
the complete list.
:param str metadata_prefix: Optional parameter for the requested metadata prefix.
:param str set_: Optional parameter containing requested set information.
"""
#: Store ResumptionToken attribute keys and values.
Attribute = collections.namedtuple('Attribute', ['key', 'value'])
_cursor = Attribute('cursor', 0)
_from = Attribute('from', None)
_until = Attribute('until', None)
_set = Attribute('set', None)
_complete_list_size = Attribute('completeListSize', None)
_metadata_prefix = Attribute('metadataPrefix', None)
def __init__(self, cursor=0, from_=None, until=None, complete_list_size=None,
metadata_prefix=None, set_=None, from_req=False):
self._cursor = self._cursor._replace(value=int(cursor))
self.from_str = None
if from_:
# Convert to datetime.datetime
self._from = self._from._replace(
value=as_supported_datetime(from_)
)
self.from_str = from_
if until:
# Convert to datetime.datetime
self._until = self._until._replace(
value=as_supported_datetime(until)
)
self.until_str = until
else:
utcnow = datetime.datetime.now(datetime.timezone.utc)
# Remove microseconds
until = utcnow - datetime.timedelta(microseconds=utcnow.microsecond)
self._until = self._until._replace(value=until)
self.until_str = as_supported_datestring(until)
if complete_list_size:
self._complete_list_size = self._complete_list_size._replace(
value=int(complete_list_size)
)
if set_:
self._set = self._set._replace(value=set_)
self._metadata_prefix = self._metadata_prefix._replace(
value=metadata_prefix
)
# request_arg is True if the resumption_token was created from a
# HTTP Request.
self._from_req = from_req
# Response list size is used to calculate the final response & cursor position
self._response_list_size = None
def _is_final_list(self):
if self.response_list_size is None:
raise ValueError("Unable to determine final list: response_list_size is None")
return self._cursor.value + self.response_list_size >= self._complete_list_size.value
@classmethod
def _load_dict(cls, dct):
# dct is a dict containing strings or NoneTypes
validated = {}
for key, required, cast in (
(cls._cursor.key, True, int),
(cls._from.key, False, None),
(cls._until.key, True, None),
(cls._complete_list_size.key, True, int),
(cls._metadata_prefix.key, True, None),
(cls._set.key, False, None)
):
if key not in dct:
_logger.warning("Resumptiontoken missing key: '%s'", key)
raise oaierrors.BadResumptionToken(EXC_MSG_INVALID_RESUMPTIONTOKEN)
value = dct[key]
if required and value is None:
_logger.warning("Resumptiontoken missing mandatory value for %s", key)
raise oaierrors.BadResumptionToken(EXC_MSG_INVALID_RESUMPTIONTOKEN)
if cast:
try:
value = cast(value)
except ValueError as exc:
_logger.warning("Invalid value '%s' for resumptiontoken key %s", value, key)
raise oaierrors.BadResumptionToken(EXC_MSG_INVALID_RESUMPTIONTOKEN) from exc
validated[key] = value
token = cls(cursor=validated[cls._cursor.key],
from_=validated[cls._from.key],
until=validated[cls._until.key],
complete_list_size=validated[cls._complete_list_size.key],
metadata_prefix=validated[cls._metadata_prefix.key],
set_=validated[cls._set.key],
from_req=True)
return token
[docs] @classmethod
def load_arg(cls, argument):
"""Create new resumption token from request arguments.
Use to load resumption token from OAI request.
:param str argument: Resumption token argument.
This comes from HTTP-request.
:returns: New :obj:`ResumptionToken`
"""
result = {}
for _arg in argument.split('&'):
values = _arg.split(':', 1)
if len(values) != 2:
raise oaierrors.BadResumptionToken(EXC_MSG_INVALID_RESUMPTIONTOKEN)
_key, _val = values
if _val == 'None':
_val = None
result.update({_key: _val})
return cls._load_dict(result)
@property
def response_list_size(self):
return self._response_list_size
@response_list_size.setter
def response_list_size(self, size):
if self._from_req:
self._cursor = self._cursor._replace(value=int(self._cursor.value + size))
self._response_list_size = size
@property
def cursor(self):
if self._from_req and self.response_list_size is None:
raise ValueError("Unable to determine cursor: response_list_size is None")
return self._cursor.value
@property
def from_(self):
return self._from.value
@property
def until(self):
return self._until.value
@property
def set_(self):
return self._set.value
@property
def metadata_prefix(self):
return self._metadata_prefix.value
@property
def complete_list_size(self):
return self._complete_list_size.value
@complete_list_size.setter
def complete_list_size(self, size):
"""Set the number of records in the complete query response.
:note: Resumption token is invalid if the number of records for the complete
query response has been changed between requests.
:param int size: Number of records for the complete query response.
:raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.BadResumptionToken`
if list sizes don't match.
"""
if not self._complete_list_size.value:
self._complete_list_size = self._complete_list_size._replace(value=int(size))
elif not self._complete_list_size.value == size:
# If size has changed between requests, the resumption token is no longer valid.
_ctx = "Requested completeListSize: {}".format(self._complete_list_size.value)
_ctx += " Actual completeListSize: {}".format(size)
raise oaierrors.BadResumptionToken(context=_ctx)
@property
def encoded(self):
"""Get encoded Resumption Token.
Returns uri-encoded representation of the resumption token if the
list request sequence is ongoing. If the list request sequence is over,
returns None.
:returns: uri-encoded represenation of the token, or None
:rtype: str or None
"""
if self._is_final_list():
return None
attrs = []
for attr in [self._cursor, self._from, self._until, self._set,
self._complete_list_size, self._metadata_prefix]:
key, value = attr
if value is not None and key in (self._until.key, self._from.key):
value = as_supported_datestring(value)
attrs.append('{}:{}'.format(key, value))
_str = '&'.join(attrs)
return quote(_str)
[docs]class Arguments:
"""Arguments of OAI-protocol.
Store arguments. Convert datestamps string to datetime objects.
Validate arguments for each verb.
:param str verb: requested OAI verb.
:param str resumption_token: requested resumption token.
:param str identifier: requested identifier.
:param str metadata_prefix: requested metadata prefix.
:param str set_: requested set.
:param str from_: requested datestamp for from attribute.
:param str until: requested datestamp for until attribute.
:raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.OAIError` for OAI errors.
"""
verb_value_identify = 'Identify'
verb_value_list_sets = 'ListSets'
verb_value_list_metadata_formats = 'ListMetadataFormats'
verb_value_list_identifiers = 'ListIdentifiers'
verb_value_list_records = 'ListRecords'
verb_value_get_record = 'GetRecord'
query_key_verb = 'verb'
query_key_resumption_token = 'resumptionToken'
query_key_identifier = 'identifier'
query_key_metadata_prefix = 'metadataPrefix'
query_key_set = 'set'
query_key_from = 'from'
query_key_until = 'until'
#: Define supported verbs
supported_verbs = [
verb_value_identify,
verb_value_list_sets,
verb_value_list_metadata_formats,
verb_value_list_identifiers,
verb_value_list_records,
verb_value_get_record
]
#: Define resumption token verbs
resumable_verbs = [
verb_value_list_sets,
verb_value_list_identifiers,
verb_value_list_records
]
def __init__(self,
verb,
resumption_token=None,
identifier=None,
metadata_prefix=None,
set_=None,
from_=None,
until=None):
if verb is None:
raise oaierrors.MissingVerb()
if verb not in self.supported_verbs:
raise oaierrors.BadVerb(context=verb)
self.verb = verb
self.identifier = identifier
self._requested_until = as_supported_datetime(until, raise_oai_exc=False) if until\
else None
self.resumption_token = self._init_resumption_token(
resumption_token, metadata_prefix, from_, until, set_)
self.metadata_prefix = self._get_metadata_prefix(metadata_prefix)
self._validate_verb_arguments()
@classmethod
def _load_dict(cls, dct):
return cls(dct.get(cls.query_key_verb),
resumption_token=dct.get(cls.query_key_resumption_token),
identifier=dct.get(cls.query_key_identifier),
metadata_prefix=dct.get(cls.query_key_metadata_prefix),
set_=dct.get(cls.query_key_set),
from_=dct.get(cls.query_key_from),
until=dct.get(cls.query_key_until))
@classmethod
def load_args(cls, args):
args_keys = [item[0] for item in args]
# BadArgument should be risen if the request includes illegal arguments
unknown_keys = set(args_keys).difference({
Arguments.query_key_verb,
Arguments.query_key_resumption_token,
Arguments.query_key_identifier,
Arguments.query_key_metadata_prefix,
Arguments.query_key_set,
Arguments.query_key_from,
Arguments.query_key_until})
if unknown_keys != set():
raise oaierrors.BadArgument(msg="Illegal arguments",
context=', '.join(unknown_keys))
# BadArgument should be risen if the request contains repeated arguments
if len(set(args_keys)) != len(args_keys):
raise oaierrors.BadArgument(msg="Repeated arguments",
context=', '.join(
'{}={}'.format(k, v) for k, v in args))
# Convert to dict for convenience
args = dict(args)
return cls._load_dict(args)
def _get_metadata_prefix(self, metadata_prefix_arg):
metadata_prefix = self.resumption_token.metadata_prefix if\
self.is_verb_resumable() else\
metadata_prefix_arg
if self.verb in [self.verb_value_list_identifiers,
self.verb_value_list_records,
self.verb_value_get_record]:
if not metadata_prefix:
raise oaierrors.BadArgument(
msg='Missing argument',
context='\"metadataPrefix\"')
return metadata_prefix
def _init_resumption_token(self, resumption_token_arg,
metadata_prefix_arg,
from_arg, until_arg, set_):
if not self.is_verb_resumable():
if resumption_token_arg:
raise oaierrors.BadArgument(
msg="Got resumptionToken-parameter for verb which does not support it",
context=self.verb
)
return None
if resumption_token_arg:
resumption_token = ResumptionToken.load_arg(resumption_token_arg)
if metadata_prefix_arg is not None and\
resumption_token.metadata_prefix != metadata_prefix_arg:
raise oaierrors.BadArgument(
msg="metadataPrefix in resumptionToken differs from metadataPrefix-parameter.",
context="%s != %s"
% (resumption_token.metadata_prefix, metadata_prefix_arg)
)
else:
resumption_token = ResumptionToken(from_=from_arg,
until=until_arg,
metadata_prefix=metadata_prefix_arg,
set_=set_)
return resumption_token
[docs] def is_verb_resumable(self):
"""Is the requested verb a resumable list request?
:returns: True if verb is resumable False otherwise
:rtype: bool
"""
return self.verb in self.resumable_verbs
[docs] def get_local_identifier(self):
"""Get requested local identifier.
Local identifier does not have prefixes for
oai and namespace. It is used to identify
records locally.
:returns: Local identifier if applicable for the request.
:rtype: str
:raises: :exc:`kuha_oai_pmh_repo_handler.oai.errors.IdDoesNotExist`
for invalid identifier.
"""
identifier = Headers.as_local_id(self.identifier)
if identifier is None:
raise oaierrors.IdDoesNotExist(msg='Invalid identifier structure',
context=self.identifier)
return identifier
[docs] def is_selective(self):
"""Return True if request is selective.
Selective refers to selective harvesting supported by OAI-PMH.
:returns: True if selective, False if not.
:rtype: bool
"""
return any([self.resumption_token.set_,
self.resumption_token.from_,
self._requested_until])
def _validate_verb_arguments(self):
if self.verb == self.verb_value_get_record and not self.identifier:
raise oaierrors.BadArgument(
msg='Missing argument',
context='\"identifier\"')
from_ = self.resumption_token.from_ if self.is_verb_resumable() else None
until = self._requested_until
if from_ and until and from_ > until:
raise oaierrors.BadArgument(
msg='Invalid date range',
context='from argument is later than until argument')