Source code for kuha_oai_pmh_repo_handler.list_records

"""Run list records sequence on-demand against an OAI-PMH Repo Handler.

Helper script runs through the entire list records sequence with a given metadataPrefix and
conditions. Can be used to ensure that all records within a repository are good to serve by
catching timeouts from Document Store Client and non-serializable Document Store records.

Logs out the time it takes to complete the full sequence. Prints out all identifiers found
by the requested conditions.

If any error conditions are encountered, the best place to look for the cause is
the Kuha OAI-PMH Repo Handler log output and Kuha Document Store log output.
"""
import sys
import time
import argparse
import logging
from xml.etree import ElementTree as ET
from urllib.parse import quote
from contextlib import contextmanager

from tornado.httpclient import HTTPClient


NAMESPACES = {'oai': 'http://www.openarchives.org/OAI/2.0/',
              'xml': 'http://www.w3.org/XML/1998/namespace'}
DEFAULT_ARGS = {'base_url': "http://localhost:6003/v0/oai",
                'loglevel': 'INFO',
                'timeout': 30}


[docs]class InvalidOAIResponse(Exception): """The response was not expected. Raised when: * HTTP response code is invalid * Result cannot be parsed as XML * OAI response has error <error> element """
@contextmanager def _writer(output=None): _filehandle = sys.stdout if output is None else open(output, 'w') try: yield lambda line: print(line, file=_filehandle) finally: if output: _filehandle.close() def _report_identifiers(record_identifiers, output): output("Found %i distinct records of which %i are deleted." % (len(record_identifiers['all']), len(record_identifiers['deleted']))) output("Distinct record identifiers (all records):") for identifier in sorted(record_identifiers['all']): output(identifier) if not record_identifiers['deleted']: return output("Deleted record identifiers:") for identifier in sorted(record_identifiers['deleted']): output(identifier) class _Request: """Request and interpret response.""" def __init__(self, base_url, metadata_prefix, url_params=None, **kwargs): self._base_url = base_url self._metadata_prefix = metadata_prefix self._url_params = url_params self._fetch_args = kwargs self._client = HTTPClient() # Encountered identifiers self.record_identifiers = {'all': [], 'deleted': []} # Time in seconds self.sequence_time = None def _request(self, url): response = self._client.fetch(url, **self._fetch_args) logging.debug(response.body) if response.code != 200: raise InvalidOAIResponse( "Got invalid HTTP response code (%d != 200) from url %s" % (response.code, url)) try: xmlroot = ET.fromstring(response.body) except ET.ParseError as exc: raise InvalidOAIResponse( "Unable to parse response body as XML. url: %s" % (url,)) from exc oai_error_el = xmlroot.find('./oai:error', NAMESPACES) if oai_error_el is not None: code, cdata = oai_error_el.get('code'), ''.join(oai_error_el.itertext()) raise InvalidOAIResponse("Error code %s (%s) in OAI response from url: %s" % (code, cdata, url)) token = None token_element = xmlroot.find('./oai:ListRecords/oai:resumptionToken', NAMESPACES) if token_element is None: logging.warning( "No resumptiontoken element from url %s. " "If target of requests is a Kuha OAI-PMH Repo Handler the response " "for ListRecords should always contain a resumptionToken element.", url) elif token_element.text is not None: token = ''.join(token_element.itertext()) return xmlroot, token def _store_identifiers(self, root): for header_el in root.findall('./oai:ListRecords/oai:record/oai:header', NAMESPACES): identifier = ''.join(header_el.find('./oai:identifier', NAMESPACES).itertext()) self.record_identifiers['all'].append(identifier) if header_el.get('status') == 'deleted': self.record_identifiers['deleted'].append(identifier) def run_sequence(self): """Run ListRecords sequence, store encountered identifiers and time the full sequence. Identifiers will be stored in :attr:`record_identifiers`. Time in seconds will be stored in :attr:`sequence_time`. """ url = self._base_url + '?verb=ListRecords&metadataPrefix={}'.format(self._metadata_prefix) if self._url_params: url += '&' + '&'.join(['%s=%s' % (k, v) for k, v in self._url_params.items()]) logging.debug("Requesting url %s ...", url) start_time = time.time() xmlroot, resumption_token = self._request(url) while resumption_token is not None: self._store_identifiers(xmlroot) url = self._base_url + '?verb=ListRecords&resumptionToken={}'.format(quote(resumption_token)) logging.debug("Resuming sequence via url %s ...", url) xmlroot, resumption_token = self._request(url) self.sequence_time = time.time() - start_time logging.debug("No resumptionToken from url %s", url) self._store_identifiers(xmlroot) def _cli_args(): parser = argparse.ArgumentParser( description="Run ListRecords sequence on-demand against an OAI-PMH repo handler to " "test that all records are harvestable. If any error conditions are encountered, " "the best place to look for the cause is the Kuha OAI-PMH Repo Handler log output " "and Kuha Document Store log output. By default the script outputs every identifier " "it encounters to stdout. See --help for more options.") parser.add_argument('metadata_prefix', type=str, help="MetadataPrefix used in requests") parser.add_argument('-b', '--base-url', type=str, required=False, help="Base url to oai repo handler (default: %s)" % (DEFAULT_ARGS['base_url'],), default=DEFAULT_ARGS['base_url']) parser.add_argument('-t', '--request-timeout', type=int, required=False, help="HTTP client request timeout. (default: %s)" % (DEFAULT_ARGS['timeout'],), default=DEFAULT_ARGS['timeout']) parser.add_argument('-s', '--set', type=str, required=False, dest='oai_set', help="Harvest selectively using a set parameter") parser.add_argument('-f', '--from', type=str, required=False, dest='oai_from', help="Harvest selectively using from parameter") parser.add_argument('-u', '--until', type=str, required=False, dest='oai_until', help="Harvest selectively using until parameter") parser.add_argument('-o', '--output', type=str, required=False, help="Output found identifiers to file instead of stdout.") parser.add_argument('-k', '--insecure', action='store_true', help='Server SSL connection certificate chain is verified to be secure ' 'by default. This option allows to operate on connections considered insecure.') parser.add_argument('--auth-username', type=str, required=False, help='HTTP Basic Authentication username') parser.add_argument('--auth-password', type=str, required=False, help='HTTP Basic Authentication password') parser.add_argument('--loglevel', type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], help="Loglevel (default: %s). Set to DEBUG to log out every response " "body" % (DEFAULT_ARGS['loglevel'],), default=DEFAULT_ARGS['loglevel']) return parser.parse_args() def _cli_run(request, output_arg): """Wrap to simplify exception handling in cli()""" logging.info("Begin issuing ListRecords requests. This may take a while...") request.run_sequence() logging.info("ListRecords sequence completed without errors in %f seconds", request.sequence_time) with _writer(output_arg) as output: _report_identifiers(request.record_identifiers, output)
[docs]def main(): """Command line interface entry point. Gather configuration. Setup application. Run sequence and report encountered identifiers. :returns: 0 on success :rtype: int """ args = _cli_args() logging.basicConfig(format='%(asctime)s %(levelname)s : %(message)s', datefmt='%Y-%m-%dT%H:%M:%SZ', level=getattr(logging, args.loglevel)) url_params = {} for url_key, value in (('set', args.oai_set), ('from', args.oai_from), ('until', args.oai_until)): if value is not None: url_params[url_key] = value request = _Request(args.base_url, args.metadata_prefix, request_timeout=args.request_timeout, url_params=url_params, validate_cert=args.insecure is False, auth_username=args.auth_username, auth_password=args.auth_password) try: _cli_run(request, args.output) except KeyboardInterrupt: logging.warning("Interrupt by CTRL-C") except InvalidOAIResponse: logging.exception("Caught InvalidOAIResponse while running the sequence. " "Increase logging level to see response body") raise except: logging.exception("Caught critical error. Exiting...") raise return 0
if __name__ == '__main__': sys.exit(main())