Source code for kuha_client.kuha_client

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Kuha Client communicates with Document Store and provides
a simple way of importing, updating and deleting records by
reading a batch of XML files stored in filesystem.

.. todo: Separate ProcessorLog from BatchProcessor and FileLog.
         Re-think sourcefiles handling.

"""

import os.path
from glob import iglob
import logging
import time
import pickle
from contextlib import contextmanager

from tornado import ioloop

from kuha_common import cli_setup
from kuha_common.query import QueryController
from kuha_common.server import CorrelationID
from kuha_common.document_store.client import JSONStreamClient
from kuha_common.document_store.mappings.ddi import (
    DDI122RecordParser,
    DDI25RecordParser,
    DDI31RecordParser
)
from kuha_common.document_store.records import (
    Study,
    Variable,
    Question,
    StudyGroup,
    COLLECTIONS
)


_LOG = logging.getLogger(__name__)


# ############### #
# MANAGE FILE LOG #
# ############### #


[docs]class SourceFile: """File used as a source for Document Store records. :param path: Path to source file """ def __init__(self, path): self.path = path self.timestamp = os.path.getmtime(path) self.study_ids = [] self.variable_ids = [] self.question_ids = [] self.study_group_ids = []
[docs]class FileLog: """Keep track of processed files. :param path: Path to filelog """ def __init__(self, path): self.path = path self._current_file = None if os.path.exists(path): self.load() else: self.timestamp = time.time() self.files = [] self._pending_study_group_files = {}
[docs] def set_current(self, _file): """Set current source file. :param _file: source file currently processed. :type _file: :obj:`SourceFile` """ self._current_file = _file
[docs] def add_pending_study_group(self, study_group_identifier): """Add StudyGroup record to queue waiting to be processed. :param study_group_identifier: Id of pending StudyGroup """ if study_group_identifier in self._pending_study_group_files: self._pending_study_group_files[study_group_identifier].append(self._current_file) else: self._pending_study_group_files.update({study_group_identifier: [self._current_file]})
[docs] def pop_pending_study_group_files(self, study_group_identifier): """Return and remove source files containing references to ``study_group_identifier``. :param study_group_identifier: StudyGroup identifier. :returns: source files referencing ``study_group_identifier`` :rtype: list """ return self._pending_study_group_files.pop(study_group_identifier)
[docs] def add_id(self, collection, _id): """Add id to current source file's collection of Document Store record IDs. :param collection: Document Store collection the ID belongs to. :type collection: str :param _id: ObjectId (ID in Document Store) of the Record. """ _LOG.debug("Add id %s to collection %s for filepath %s", _id, collection, self._current_file.path) { Study.get_collection(): self._current_file.study_ids, Variable.get_collection(): self._current_file.variable_ids, Question.get_collection(): self._current_file.question_ids, StudyGroup.get_collection(): self._current_file.study_group_ids }[collection].append(_id)
[docs] def get_ids(self, collection): """Return list of ObjectIds for ``collection`` in current file. :param collection: Document Store collection. :type collection: str :returns: ObjectIds :rtype: list """ results = { Study.get_collection(): self._current_file.study_ids, Variable.get_collection(): self._current_file.variable_ids, Question.get_collection(): self._current_file.question_ids, StudyGroup.get_collection(): self._current_file.study_group_ids }[collection] _LOG.debug("Return ids for %s collection: %s", collection, results) return results
[docs] def get_filepaths(self): """Get paths from :attr:`self.files` Iterate throught each :obj:`SourceFile` in :attr:`self.files` and gather their paths. Return the paths. :returns: List of filepaths :rtype: list """ return [x.path for x in self.files]
[docs] def load(self): """Load FileLog from :attr:`self.path`. Populates :attr:`self.timestamp` and :attr:`self.files`. """ with open(self.path, 'rb') as _file: _contents = pickle.load(_file) self.timestamp = _contents['timestamp'] self.files = _contents['files']
[docs] def save(self): """Save FileLog to :attr:`self.path`. """ _LOG.debug("Saving filelog with files %s", [_file.path for _file in self.files]) _contents = {'files': self.files, 'timestamp': self.timestamp} with open(self.path, 'wb') as _file: pickle.dump(_contents, _file)
[docs] def has_match(self, path): """Does the sourcefile found from ``path`` have a match with path and modified timestamp in this filelog. :param path: Path to source file. :returns: True if path and timestamps match. :rtype: bool """ source_file = SourceFile(path) for index, _file in enumerate(self.files): if _file.path == path: if _file.timestamp == source_file.timestamp: self.set_current(_file) return True self.files[index] = source_file self.set_current(source_file) return False self.files.append(source_file) self.set_current(source_file) return False
[docs] def remove_files_by_path_difference(self, paths): """Remove each :obj:`SourceFile` from :attr:`self.files` whose path is not in ``paths``. Compare difference in contained source file paths to ``paths``. Remove sourcefiles from :attr:`self.files` whose paths are not found. Every sourcefile whose paths is not in paths gets removed. :param paths: list of filepaths to compare. """ contained_paths = self.get_filepaths() to_be_removed = set(contained_paths) - set(paths) self.files[:] = [x for x in self.files if x.path not in to_be_removed]
# ############ # # HTTP Traffic # # ############ #
[docs]class DocumentStoreHTTPError(Exception): """Raise if DocumentStore response payload contains errors. """ def __init__(self, error_response): self.error_response = error_response msg = 'DocumentStore responds with error: %s' % (self.error_response,) super().__init__(msg)
def _raise_response_error(response): if 'error' in response and response['error']: _LOG.error(response['error']) raise DocumentStoreHTTPError(response['error']) # URL HELPERS def _get_document_store_url(): settings = cli_setup.get_settings() if not settings.document_store_url: raise ValueError("settings.document_store_url not set") return settings.document_store_url
[docs]def get_import_url(collection=None, importer=None): """Construct URL to Document Store import endpoint. :param collection: Optional parameter to limit the import to certain collection. :type collection: str :param importer: Optional parameter to set importer. Defaults to 'ddi_c' :type importer: str :returns: Constructed URL :rtype: str """ if importer is None: importer = 'ddi_c' # If Document store endpoint should have different suffix in path # regarding importer, the mapping to the suffix should be done here. url_items = [_get_document_store_url(), 'import', importer] if collection: url_items.append(collection) return '/'.join(url_items)
def _get_record_url(collection, document_id=None): url = _get_document_store_url() url += '/{}'.format(collection) if document_id: url += '/{}'.format(document_id) return url _correlation_id = CorrelationID({}) async def _send_create_record_request(collection, record_dict): url = _get_record_url(collection) client = JSONStreamClient() response = {} try: await client.fetch(response.update, url, body=record_dict, headers=_correlation_id.as_header()) finally: _raise_response_error(response) return response['affected_resource'] async def _send_update_record_request(collection, record_dict, record_id): url = _get_record_url(collection, record_id) client = JSONStreamClient() response = {} try: await client.fetch(response.update, url, body=record_dict, headers=_correlation_id.as_header(), method='PUT') finally: _raise_response_error(response) async def _send_delete_record_request(collection, record_id=None): url = _get_record_url(collection, record_id) client = JSONStreamClient() response = {} try: await client.fetch(response.update, url, method='DELETE', headers=_correlation_id.as_header()) finally: _raise_response_error(response) async def _send_xml_post_request(xml, url): """Submit XML as HTTP-POST body. :param xml: full xml document as string. :type xml: str or bytes :param url: url to submit to. :type url: str """ if not hasattr(xml, 'decode'): xml = xml.encode('utf-8') client = JSONStreamClient() response = {} headers = _correlation_id.as_header() headers.update({'Content-Type': 'text/xml'}) try: await client.fetch(response.update, url, method='POST', headers=headers, body=xml) finally: _raise_response_error(response) return response # ############### # # PROCESS RECORDS # # ############### #
[docs]async def query_record(record): """Query single record by unique fields. :param record: record to query. :type record: :obj:`kuha_common.document_store.records.Study` or :obj:`kuha_common.document_store.records.Variable` or :obj:`kuha_common.document_store.records.Question` or :obj:`kuha_common.document_store.records.StudyGroup` :returns: found record if any. :rtype: :obj:`kuha_common.document_store.records.Study` or :obj:`kuha_common.document_store.records.Variable` or :obj:`kuha_common.document_store.records.Question` or :obj:`kuha_common.document_store.records.StudyGroup` """ async def query_study(): """Query Document Store study. :returns: Found Study. :rtype: :obj:`kuha_common.document_store.records.Study` """ return await QueryController().query_single( Study, _filter={Study.study_number: record.study_number.get_value()}) async def query_variable(): """Query Document Store variable. :returns: Found Variable. :rtype: :obj:`kuha_common.document_store.records.Variable` """ return await QueryController().query_single( Variable, _filter={Variable.study_number: record.study_number.get_value(), Variable.variable_name: record.variable_name.get_value()}) async def query_question(): """Query Document Store question. :returns: Found Question. :rtype: :obj:`kuha_common.document_store.records.Question` """ return await QueryController().query_single( Question, _filter={Question.study_number: record.study_number.get_value(), Question.question_identifier: record.question_identifier.get_value()}) async def query_studygroup(): """Query Document Store StudyGroup. :returns: Found StudyGroup. :rtype: :obj:`kuha_common.document_store.records.StudyGroup` """ return await QueryController().query_single( StudyGroup, _filter={StudyGroup.study_group_identifier: record.study_group_identifier.get_value()}) func = {Study.get_collection(): query_study, Variable.get_collection(): query_variable, Question.get_collection(): query_question, StudyGroup.get_collection(): query_studygroup}[record.get_collection()] return await func()
[docs]async def query_distinct_ids(collection): """Query collection for distinct ObjectIds :param collection: record's collection. :type collection: str :returns: set of distinct ids. :rtype: set """ async def distinct_study_ids(): """Query distinct Study ids :returns: distinct study ids :rtype: set """ ids = await QueryController().query_distinct(Study, fieldname=Study._id) return set(ids[Study._id.path]) async def distinct_variable_ids(): """Query distinct Variable ids :returns: distinct variable ids :rtype: set """ ids = await QueryController().query_distinct(Variable, fieldname=Variable._id) return set(ids[Variable._id.path]) async def distinct_question_ids(): """Query distinct Question ids :returns: distinct question ids :rtype: set """ ids = await QueryController().query_distinct(Question, fieldname=Question._id) return set(ids[Question._id.path]) async def distinct_studygroup_ids(): """Query distinct StudyGroup ids :returns: distinct study group ids :rtype: set """ ids = await QueryController().query_distinct(StudyGroup, fieldname=StudyGroup._id) return set(ids[StudyGroup._id.path]) func = {Study.get_collection(): distinct_study_ids, Variable.get_collection(): distinct_variable_ids, Question.get_collection(): distinct_question_ids, StudyGroup.get_collection(): distinct_studygroup_ids}[collection] return await func()
[docs]def iterate_xml_directory(directory): """Recursively iterate over XML-files in directory. :param directory: Absolute path to directory. :type directory: str :returns: generator for iterating XML-files. """ _dir = '{}/**/*.xml'.format(directory) return iglob(_dir, recursive=True)
[docs]def iterate_xml_files_recursively(*paths): """Helper for batch processing XML-files. Check each path. If a path points to a file yield its absolute path. If it points to a directory, recursively iterate paths to each XML-file found and yield each file's absolute path. :param paths: Paths to file or directory. :type paths: list :returns: generator for iterating absolute paths to xml-files """ for path in paths: abspath = path if os.path.isabs(path) else os.path.abspath(path) if os.path.isfile(abspath): yield abspath elif os.path.isdir(abspath): for _file in iterate_xml_directory(abspath): yield _file else: _LOG.error("Path is neither file nor directory. Discarding: %s", path)
[docs]class BatchProcessor: """Processor for operations perfomed in a single run. Keep record of what gets done. Collect StudyGroups from records and update accordingly. Facilitate access to operations needed to perform tasks against Document Store API. :param collections: List of collections to process. Use None to process all collections. :type collections: list or None :param file_log: Keep track of processed source files and records ObjectIDs related to them. :type file_log: :obj:`FileLog` :param sourcefiletype: Controls how the mapping from sourcefile to Document Store records is done. None sets the default :attr:`SOURCEFILETYPE_DDIC` :type sourcefiletype: str or None """ _CREATE = 'CREATE' _UPDATE = 'UPDATE' _DELETE = 'DELETE' _IMPORT = 'IMPORT' _NO_CHANGES = 'NO_CHANGES' SOURCEFILETYPE_DDIC = 'ddi_c' SOURCEFILETYPE_DDI_31 = 'ddi_31' SOURCEFILETYPE_DDI_122_NESSTAR = 'ddi_122_nesstar' def __init__(self, collections=None, file_log=None, sourcefiletype=None): self.collections = collections if collections else COLLECTIONS self.file_log = file_log self.sourcefiletype = self.SOURCEFILETYPE_DDIC if sourcefiletype is None else sourcefiletype self.study_group_records = {} self.processed_ids = {Study.get_collection(): {}, Variable.get_collection(): {}, Question.get_collection(): {}, StudyGroup.get_collection(): {}} self.sourcefiles = [] if self.collections == COLLECTIONS: self.import_urls = [get_import_url(importer=sourcefiletype)] else: self.import_urls = [] for coll in self.collections: self.import_urls.append(get_import_url(collection=coll, importer=sourcefiletype))
[docs] @classmethod def get_supported_sourcefiletypes(cls): """Get supported source file types. :returns: supported source file types. :rtype: list """ return [cls.SOURCEFILETYPE_DDI_122_NESSTAR, cls.SOURCEFILETYPE_DDI_31, cls.SOURCEFILETYPE_DDIC]
[docs] @classmethod @contextmanager def with_file_log(cls, file_log_path, collections=None, sourcefiletype=None): """Initiate BatchProcessor with File Log. :param file_log_path: path to file log. :type file_log_path: str :param collections: collection to process. :type collections: list or None :param sourcefiletype: file type of source file. :type sourcefiletype: str or None """ file_log = FileLog(file_log_path) processor = cls(collections, file_log, sourcefiletype) yield processor file_log.remove_files_by_path_difference(processor.sourcefiles) file_log.save()
[docs] def sourcefileparser(self, path): """Initiate sourcefileparser, which depends on :attr:`self.sourcefiletype` :param path: path to source file to be parsed. :returns: iterative parser """ parser = {self.SOURCEFILETYPE_DDI_122_NESSTAR: DDI122RecordParser, self.SOURCEFILETYPE_DDIC: DDI25RecordParser, self.SOURCEFILETYPE_DDI_31: DDI31RecordParser}[self.sourcefiletype] return parser.from_file(path).select
async def _log_result(self, collection, _id, result): if _id in self.processed_ids[collection]: _LOG.warning("COLLECTION: %s ID: %s already processed", collection, _id) self.processed_ids[collection].update({_id: result}) if self.file_log and result != self._DELETE: self.file_log.add_id(collection, _id) _LOG.debug("%s %s %s", result, collection, _id) async def _process_file(self, path): self.sourcefiles.append(path) if self.file_log and self.file_log.has_match(path): # Add ids to processed_ids for coll in self.collections: self.processed_ids[coll].update({_id: self._NO_CHANGES for _id in self.file_log.get_ids(coll)}) _LOG.debug("No need to process %s", path) return False _LOG.debug("Must process %s", path) return True
[docs] async def create(self, record): """Create new record. :param record: populated record instance which gets created. :type record: :obj:`kuha_common.document_store.records.Study` or :obj:`kuha_common.document_store.records.Variable` or :obj:`kuha_common.document_store.records.Question` or :obj:`kuha_common.document_store.records.StudyGroup` :returns: ObjectId of the new record. :rtype: str """ record_dict = record.export_dict(include_metadata=False, include_id=False) new_id = await _send_create_record_request(record.get_collection(), record_dict) await self._log_result(record.get_collection(), new_id, self._CREATE) return new_id
[docs] async def upsert(self, record): """Upsert record. If record exists, compare with existing. If records differ, discard the existing record and store the new one to DocumentStore with the existing ObjectId. If record does not exist, insert it to DocumentStore. :param record: populated record instance which gets created. :type record: :obj:`kuha_common.document_store.records.Study` or :obj:`kuha_common.document_store.records.Variable` or :obj:`kuha_common.document_store.records.Question` or :obj:`kuha_common.document_store.records.StudyGroup` :returns: ObjectId of the record. :rtype: str """ old_record = await query_record(record) if not old_record: # New record new_id = await self.create(record) return new_id old_id = old_record.get_id() rec_dict = record.export_dict(include_metadata=False, include_id=False) if rec_dict == old_record.export_dict(include_metadata=False, include_id=False): # Nothing changed. await self._log_result(record.get_collection(), old_id, self._NO_CHANGES) return old_id # Update record await _send_update_record_request(record.get_collection(), rec_dict, old_id) await self._log_result(record.get_collection(), old_id, self._UPDATE) return old_id
[docs] async def upsert_from_parser(self, parser): """Upsert records to :attr:`self.collections` from parser. :param parser: Parser generates Document Store records. """ for coll in self.collections: for record in parser(coll): if coll == StudyGroup.get_collection(): await self.add_study_group(record) continue await self.upsert(record)
[docs] async def upsert_paths(self, *paths): r"""Upsert records found recursively from paths. :param \*paths: one or more paths to recurse to look for files to parse. """ for path in iterate_xml_files_recursively(*paths): if await self._process_file(path): _LOG.info("Process path %s", path) parser = self.sourcefileparser(path) await self.upsert_from_parser(parser) else: _LOG.info("Bypass path %s", path)
[docs] async def upsert_study_groups(self): """Upsert collected StudyGroups. """ for study_group in self.study_group_records.values(): if self.file_log: files = self.file_log.pop_pending_study_group_files( study_group.study_group_identifier.get_value()) self.file_log.set_current(files.pop()) _id = await self.upsert(study_group) for _file in files: self.file_log.set_current(_file) self.file_log.add_id(study_group.get_collection(), _id) else: await self.upsert(study_group)
[docs] async def add_study_group(self, study_group): """Add StudyGroup for later processing. Lookup the StudyGroup if it has been stored before and update its contents. If it's not found, store it as a new one. :param study_group: StudyGroup to add for later processing. :type study_group: :obj:`kuha_common.document_store.records.StudyGroup` """ if study_group.study_group_identifier.get_value() not in self.study_group_records: self.study_group_records.update({study_group.study_group_identifier.get_value(): study_group}) else: self.study_group_records[study_group.study_group_identifier.get_value()].updates(study_group) if self.file_log: self.file_log.add_pending_study_group(study_group.study_group_identifier.get_value())
[docs] async def import_source(self, source_data): """Import source data to Document Store. """ for url in self.import_urls: response = await _send_xml_post_request(source_data, url) for doc in response['imported_docs']: if 'study_group_identifier' in doc: coll = StudyGroup.get_collection() elif 'question_identifier' in doc: coll = Question.get_collection() elif 'variable_name' in doc: coll = Variable.get_collection() elif 'study_number' in doc: coll = Study.get_collection() else: _LOG.error('Unable to interpret Document Store collection from response: %s', response) raise ValueError('Unable to determine collection from response') await self._log_result(coll, doc['id'], self._IMPORT)
[docs] async def import_source_files(self, *paths): r"""Import files from paths. :param \*paths: one or more paths to lookup for source files. """ for path in iterate_xml_files_recursively(*paths): if await self._process_file(path): _LOG.info("Process path %s", path) with open(path, 'r') as _file: source_data = _file.read() await self.import_source(source_data) else: _LOG.info("Bypass path %s", path)
[docs] async def remove_absent(self, collection): """Remove records from collection which were not present in current upsert run. :param collections: collection to process. :type collections: str """ distinct_ids_in_db = await query_distinct_ids(collection) processed = list(self.processed_ids[collection].keys()) ids_to_remove = distinct_ids_in_db.difference(set(processed)) for _id in ids_to_remove: if collection == Study.get_collection(): await self.remove_study_and_relatives_by_studyid(_id) else: await _send_delete_record_request(collection, _id) await self._log_result(collection, _id, self._DELETE)
[docs] async def remove_absent_records(self): """Remove records which were not present in current upsert run. """ for coll in self.collections: await self.remove_absent(coll)
[docs] async def remove_record(self, record): """Remove record or records. If record is an instance of DocumentStore record, remove it from DocumentStore. If record is a record class, remove all records from it's collection. :param record: Record to remove or class whose records will be removed. :type record: DocumentStore record instance or class. """ is_instance = True _id = None try: _id = record.get_id() except TypeError: is_instance = False if not is_instance: # Remova all records from collection. await _send_delete_record_request(record.get_collection()) return if not _id: record = await query_record(record) _id = record.get_id() # Remove single record. await _send_delete_record_request(record.get_collection(), _id) await self._log_result(record.get_collection(), _id, self._DELETE)
[docs] async def remove_study_and_relatives_by_studyid(self, study_id): """Remove study and relative records. For a single study the process should remove: * Actual Study, * Variable referenced from the Study, * Questions referenced from the Study, * Remove references to the Study from StudyGroups. :note: Does not remove StudyGroup even if all references to studies are removed. :param study_id: ObjectId of the study to remove. :type study_id: str """ study = await QueryController().query_single( Study, _filter={Study._id: study_id}, fields=Study.study_number) study_number = study.study_number.get_value() await _send_delete_record_request(Study.get_collection(), study_id) await QueryController().query_multiple( Variable, self.remove_record, _filter={Variable.study_number: study_number}, fields=Variable._id) await QueryController().query_multiple( Question, self.remove_record, _filter={Question.study_number: study_number}, fields=Question._id) async def remove_study_number_from_study_group(study_group): """Remove study number from StudyGroup.study_numbers. :param study_group: StudyGroup :type study_group: :obj:`kuha_common.document_store.records.StudyGroup` """ _id = study_group.get_id() _dict = study_group.export_dict(include_id=False, include_metadata=False) vals = StudyGroup.study_numbers.name.value_from_dict(_dict) vals.remove(study_number) _dict[StudyGroup.study_numbers.path] = vals _LOG.debug("Removing study number %s from study group %s", study_number, _id) await _send_update_record_request(StudyGroup.get_collection(), _dict, _id) await QueryController().query_multiple( StudyGroup, remove_study_number_from_study_group, _filter={StudyGroup.study_numbers: study_number} )
[docs] def import_run(self, lookup_paths): """Main entry point for import run. :param lookup_paths: list of paths to lookup for source files. :type lookup_paths: list """ ioloop.IOLoop.current().run_sync(lambda: self.import_source_files(*lookup_paths))
[docs] def upsert_run(self, lookup_paths, remove_absent=False): """Main entry point for upsert run. Upsert records found from :attr:`lookup_paths`. Remove absent records if :attr:`remove_absent` is True. :param lookup_paths: list of paths to lookup for source files. :type lookup_paths: sequence :param remove_absent: True will remove all records not found from lookup_paths. :type remove_absent: bool """ _LOG.info("Upsert from paths: %s", ", ".join(lookup_paths)) ioloop.IOLoop.current().run_sync(lambda: self.upsert_paths(*lookup_paths)) if StudyGroup.get_collection() in self.collections: _LOG.info("Upsert StudyGroups") ioloop.IOLoop.current().run_sync(lambda: self.upsert_study_groups()) if remove_absent: _LOG.info("Remove absent records") ioloop.IOLoop.current().run_sync(lambda: self.remove_absent_records())
[docs] def remove_run(self, record_or_class=None): """Main entry point for remove run. :param record_or_class: Record or RecordClass to remove. If None will remove every record in every collection. """ if record_or_class: ioloop.IOLoop.current().run_sync(lambda: self.remove_record(record_or_class)) else: for _class in [Study, Variable, Question, StudyGroup]: ioloop.IOLoop.current().run_sync(lambda: self.remove_record(_class))