Source code for kuha_client

# Author(s): Toni Sissala
# Copyright 2022 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Kuha Client communicates with Document Store and provides
a simple way of inserting, updating and deleting records by
reading a batch of XML files stored in filesystem.
"""
import os.path
from glob import iglob
from collections import deque
from contextlib import contextmanager
import logging
import pickle
import time
import asyncio

from kuha_common import conf
from kuha_common.server import CorrelationID
from kuha_common.document_store.client import JSONStreamClient
from kuha_common.document_store.constants import REC_STATUS_CREATED
from kuha_common.document_store.mappings.exceptions import (
    UnknownXMLRoot,
    MappingError
)


_logger = logging.getLogger(__name__)


[docs]class SourceFile: """Represents a file containing document store records. Stores file path to :attr:`path` and file modification timestamp to :attr:`timestamp`. :param str path: Absolute path to a file. """ def __init__(self, path): self.path = path self.timestamp = os.path.getmtime(path) # {<collection>: [<id1>, <id2>, ...]} self._collections_ids = {}
[docs] def add_id(self, coll, _id): """Add document store collection + id combination. Attach Document Store ID + collection to this sourcefile, implying that the record in the collection is parsed from this file. :param str coll: Document Store collection. :param str _id: Document Store record ID. """ if coll not in self._collections_ids: self._collections_ids.update({coll: [_id]}) else: self._collections_ids[coll].append(_id)
[docs] def list_ids(self, coll): """List ids for certain collection in this sourcefile. If this sourcefile does not contain any records from the collection, return and empty list. :param str coll: Document store collection. :returns: List of Document Store IDs parsed from this file. """ if coll not in self._collections_ids: return [] return self._collections_ids[coll]
[docs]class Cache: """In-memory cache implementation Cache keeps track of processed files and Document Store records parsed from the files. It contains a :obj:`deque` of loaded filepaths in :attr:`loaded_paths`. """ CREATE = 'CREATE' UPDATE = 'UPDATE' DELETE = 'DELETE' IMPORT = 'IMPORT' NO_CHANGES = 'NO_CHANGES' _valid_results = (CREATE, UPDATE, DELETE, IMPORT, NO_CHANGES) def __init__(self): self._collections = [] self._processed_ids = {} self.loaded_paths = deque()
[docs] def register_collection(self, collection): """Registers and formats a collection. Can be called multiple times with same collection parameter, but will not overwrite previously registered collection. :param str collection: collection name """ if collection in self._collections: return self._collections.append(collection) self._processed_ids.update({collection: {}})
[docs] def get_processed_ids_for_collection(self, collection): """Get a list of every ID that was processed for a collection. :param str collection: collection name :returns: List of processed ids :rtype: list """ return list(self._processed_ids[collection].keys())
[docs] async def result(self, coll, _id, result): """Add record's processing result. Call when processing of a file has lead to some result in Document Store. :param str coll: Document Store collection. :param str _id: Document Store record ID. :param str result: Result. One of :attr:`self._valid_results`. :raises ValueError: if result is not valid. """ if result not in self._valid_results: raise ValueError("Invalid result. Got '%s', was expecting one of: %s" % (result, ', '.join(["'%s'" % (x,) for x in self._valid_results]))) if _id in self._processed_ids[coll]: _logger.warning("'%s' in collection '%s' already processed", _id, coll) self._processed_ids[coll].update({_id: result}) _logger.debug("Handling of '%s' in collection '%s' resulted in '%s'", _id, coll, result)
[docs] def print_summary(self): """Print a summary of operations recorded in this cache. """ for collection, ids_results in self._processed_ids.items(): ops_count = {result: 0 for result in self._valid_results} for _id, result in ids_results.items(): ops_count[result] += 1 _msg = "{collection:12}: " for oper in self._valid_results: _msg += "{oper}: {count:4} ".format(oper=oper, count=ops_count[oper]) print(_msg.format(collection=collection))
[docs] async def load_file(self, path): """Load file to process it. Return True if the file needs processing. :param str path: File path. :returns: True :rtype: bool """ self.loaded_paths.append(path) return True
[docs] async def unload_file(self): """Unload current file. This cache implementation does not need unloading, so this is a no-op :returns: True """ return True
[docs] async def forget_file(self): """Forget a file that was previously loaded. """ await self.unload_file() self.loaded_paths.pop()
[docs]class FileLoggingCache(Cache): """File logging cache implementation Logs the cache in a pickled file that can be loaded to speed-up processing for consecutive runs. :param str path: Path to file. Will be loaded if exists, or created if does not exist. """ def __init__(self, path): self._path = path self.current_file = None if os.path.exists(self._path): with open(self._path, 'rb') as _file: _contents = pickle.load(_file) self.timestamp = _contents['timestamp'] self.files = _contents['files'] else: self.timestamp = time.time() self.files = [] super().__init__()
[docs] def remove_lost_files(self): """Remove files loaded from :attr:`self._path` but not in current batch. Compare filepaths in :attr:`self.files` and :attr:`self.loaded_paths`. Build a new list containing files common in :attr:`self.files` and :attr:`self.loaded_paths` and assign it to :attr:`self.files` """ contained = {x.path for x in self.files} to_be_removed = contained - set(self.loaded_paths) self.files[:] = [x for x in self.files if x.path not in to_be_removed]
[docs] def save(self): """Save FileLog to :attr:`self._path`. """ _logger.debug("Saving filelog with files %s", [_file.path for _file in self.files]) _contents = {'files': self.files, 'timestamp': self.timestamp} with open(self._path, 'wb') as _file: pickle.dump(_contents, _file)
[docs] async def result(self, coll, _id, result): """Save ID to collection IDs of the currently processed file. If the result was to delete the ID or if there is no file being processed at the moment, do not save the id. :param str coll: ID belongs to this collection :param str _id: Record ID. :param str result: Processing result. """ await super().result(coll, _id, result) if result != self.DELETE and self.current_file is not None: # current_file is None if there currently no file being processed. self.current_file.add_id(coll, _id)
async def _copy_ids_to_processed(self, sourcefile): for coll in self._collections: for _id in sourcefile.list_ids(coll): await super().result(coll, _id, self.NO_CHANGES)
[docs] async def load_file(self, path): """Load file to process it. Return True if the file needs processing. :param str path: File path. :returns: True if file needs processing. :rtype: bool """ await super().load_file(path) sourcefile = SourceFile(path) for index, _file in enumerate(self.files): if _file.path == path: if _file.timestamp == sourcefile.timestamp: await self._copy_ids_to_processed(_file) self.current_file = _file return False self.files[index] = sourcefile self.current_file = sourcefile return True self.files.append(sourcefile) self.current_file = sourcefile return True
[docs] async def unload_file(self): """Unload current file. :returns: True """ await super().unload_file() self.current_file = None return True
[docs]@contextmanager def open_file_logging_cache(path, cache_class=None): """Use file logging cache implementation in a context manager. Handles loading of cache and removing lost files and saving upon completion of the context. :param path: Path to file cache. :type path: str :param cache_class: file logging cache implementation. Defaults to :class:`FileLoggingCache` """ cls = cache_class or FileLoggingCache cache = cls(path) yield cache cache.remove_lost_files() cache.save()
# ############ # # HTTP Traffic # # ############ #
[docs]class DocumentStoreHTTPError(Exception): """Raise if DocumentStore response payload contains errors. """ def __init__(self, error_response): self.error_response = error_response msg = 'DocumentStore responds with error: %s' % (self.error_response,) super().__init__(msg)
def _raise_response_error(response, exp_id=None): if 'error' in response and response['error']: _logger.error(response['error']) raise DocumentStoreHTTPError(response['error']) if exp_id and response['affected_resource'] != exp_id: raise ValueError("Expected document store response with affected_resource '%s' got '%s'" % (exp_id, response['affected_resource'])) # URL HELPERS def _get_document_store_url(): settings = conf.get_conf() if not settings.document_store_url: raise ValueError("settings.document_store_url not set") return settings.document_store_url def _get_record_url(collection, document_id=None): url = _get_document_store_url() url += '/{}'.format(collection) if document_id: url += '/{}'.format(document_id) return url _correlation_id = CorrelationID({})
[docs]async def send_create_record_request(collection, record_dict): """Send HTTP request to create a new record. Issue HTTP POST to Document Store using `record_dict` as request body. The record will be created to the specified collection. :param str collection: Specify collection. :param dict record_dict: Record as a dict. Used as request body. :returns: Newly created record ID. :raises: :exc:`DocumentStoreHTTPError` if Document Store response contains an error message. """ url = _get_record_url(collection) client = JSONStreamClient() response = {} try: await client.fetch(response.update, url, body=record_dict, headers=_correlation_id.as_header()) finally: _raise_response_error(response) return response['affected_resource']
[docs]async def send_update_record_request(collection, record_dict, record_id): """Send HTTP request to update existing record. Issue HTTP PUT to Document Store using `record_dict ` as request body. The request will attempt to overwrite existing record identified by `record_id`. The record will be updated to the specified collection. :param str collection: Specify collection. :param dict record_dict: Record as a dict. Used in request body. :param str record_id: Record ID. :returns: None :raises: :exc:`DocumentStoreHTTPError` if Document Store responds with an error message. """ url = _get_record_url(collection, record_id) client = JSONStreamClient() response = {} try: await client.fetch(response.update, url, body=record_dict, headers=_correlation_id.as_header(), method='PUT') finally: _raise_response_error(response, exp_id=record_id)
[docs]async def send_delete_record_request(collection, record_id=None, hard_delete=False): """Send HTTP request to delete existing record/records. Issue HTTP DELETE to Document Store collection. Use `record_id` to specify a single record to delete, or delete all records from the collection. Set `hard_delete` to True, to use physical deletions instead of logical deletions, which is the default. :param str collection: Specify collection to delete records from. :param str record_id: Delete by id. Leave None to delete all records from collection. :param bool hard_delete: True to issue physical delete request. False to logically delete records. :returns: Document Store HTTP response body. :rtype: dict :raises: :exc:`DocumentStoreHTTPError` if Document Store responds with an error message. """ url = '%s?delete_type=%s' % (_get_record_url(collection, record_id), 'hard' if hard_delete else 'soft') client = JSONStreamClient() response = {} try: await client.fetch(response.update, url, method='DELETE', headers=_correlation_id.as_header()) finally: _raise_response_error(response, exp_id=record_id) return response
# FILESYSTEM
[docs]def iterate_xml_directory(directory): """Recursively iterate over XML-files in directory. :param directory: Absolute path to directory. :type directory: str :returns: generator for iterating XML-files. """ _dir = '{}/**/*.xml'.format(directory) return iglob(_dir, recursive=True)
[docs]def iterate_xml_files_recursively(*paths): """Helper for batch processing XML-files. Check each path. If a path points to a file yield its absolute path. If it points to a directory, recursively iterate paths to each XML-file found and yield each file's absolute path. :param str path: Repeatable positional argument. Path to file or directory. :returns: generator for iterating absolute paths to xml-files """ for path in paths: abspath = path if os.path.isabs(path) else os.path.abspath(path) if os.path.isfile(abspath): yield abspath elif os.path.isdir(abspath): for _file in iterate_xml_directory(abspath): yield _file else: _logger.error("Path is neither file nor directory. Discarding: '%s'", path)
[docs]class CollectionMethods: """CollectionMethods base class. Base class for operations being performed against a particular collection. Use by inheriting and defining abstract methods. Every subclass must define :attr:`collection` class attribute, which gets checked on init. :param cache: Initialized cache implementation object. """ collection = None def __init__(self, cache): if self.collection is None: raise ValueError("Define 'collection' in subclass") cache.register_collection(self.collection) self._cache = cache
[docs] async def query_record(self, record): """Query record from Document Store. This method is called from upsert() method. If this method returns a falsy value then the upsert() will never call update_record(), but will call create_record() instead. :param record: Record to query for. :returns: Result of the query. :rtype: instance of record, or None """ raise NotImplementedError
[docs] async def query_distinct_ids(self): """Query distinct IDs from collection that are not deleted This method is used to lookup ids that are present in DocStore and that should be deleted. :returns: Distinct ids :rtype: set """ raise NotImplementedError
[docs] async def remove_records(self, _id=None, hard_delete=True): """Remove records from collection :param str or None _id: Submit to remove a single record by id. :returns: True on success """ # TODO What to do with cache? At least the file cache will be invalid after calling this. response = await send_delete_record_request(self.collection, record_id=_id, hard_delete=hard_delete) return response['result'] == 'delete_successful'
[docs] async def remove_record_by_id(self, _id): """Remove record from collection. :seealso: :func:`send_delete_record_request` :param _id: Id of the record to be removed. :returns: True on success, False on fail. :rtype: bool """ response = await send_delete_record_request(self.collection, record_id=_id) rval = response['result'] == 'delete_successful' if rval: await self._cache.result(self.collection, _id, self._cache.DELETE) return rval
[docs] async def upsert(self, record): """Update or insert record. If record already exists in Document Store, compare the old one with new. If they do not match, update new record's metadata with certain values from old and issue an update request to Document Store. If record does not exist in Document Store, create it. :param record: Document Store record. :returns: ID of the record in Document Store. """ old_rec = await self.query_record(record) coll = record.get_collection() if not old_rec: new_id = await self.create_record(record) await self._cache.result(coll, new_id, self._cache.CREATE) return new_id old_id = old_rec.get_id() if await self.update_record(record, old_rec): await self._cache.result(coll, old_id, self._cache.UPDATE) return old_id await self._cache.result(coll, old_id, self._cache.NO_CHANGES) return old_id
[docs] async def create_record(self, record): """Insert new record to Document Store. :seealso: :func:`send_create_record_request` :param record: Record to insert :returns: Inserted record ID. :rtype: str """ coll = record.get_collection() new_id = await send_create_record_request(coll, record.export_dict( include_metadata=False, include_id=False)) return new_id
@staticmethod async def _update_metadata_if_deleted(old): """Update old metadata if needed. Record metadata contains the deletion status and deleted timestamps. If the old record is deleted update its metadata with new records metadata. :param old: old record :returns: True if metadata is changed """ if old._metadata.attr_status.get_value() == REC_STATUS_CREATED\ and old._metadata.attr_deleted.get_value() is None: return False old.set_updated() old.set_status(REC_STATUS_CREATED) old._metadata.attr_deleted.set_value(None) return True
[docs] async def update_record(self, new, old): """Update existing Document Store record. :seealso: :func:`send_update_record_request` :param new: New record. :param old: Old record. :returns: False if record does not need updating. :rtype: bool """ new_dict = new.export_dict(include_metadata=False, include_id=False) old_dict = old.export_dict(include_metadata=False, include_id=False) if new_dict != old_dict: # Records don't match. Update with new one. await send_update_record_request(new.get_collection(), new_dict, old.get_id()) elif await self._update_metadata_if_deleted(old) is True: # Records match, but old one is deleted. Update metadata to docstore. await send_update_record_request(new.get_collection(), old.export_dict(include_metadata=True, include_id=False), old.get_id()) else: # Records match. Old is not deleted. No need to update. return False return True
[docs]class NoSuchCollectionMethod(Exception): """Explicitly raised when BatchProcessor cannot find a required collection. Use to catch such conditions in caller logic. """
[docs]class BatchProcessor: """Process a batch of files and sync them to Document Store. :param list collections_methods: CollectionMethods subclasses :param list parsers: XML parsers :param cache: Optional cache implementation. :param bool fail_on_parse: Fail on parsing errors or bypass file. """ def __init__(self, collections_methods, parsers=None, cache=None, fail_on_parse=True): self._collections_methods = collections_methods self._parsers = parsers self._cache = cache or Cache() self._fail_on_parse = fail_on_parse self._init_collection_methods() def _init_collection_methods(self): initialized = [] for methods in self._collections_methods: initialized.append(methods(self._cache)) self._collections_methods = initialized def _sourcefileparser(self, path): if self._parsers is None: raise ValueError("BatchProcessor has no parsers. Give parsers on init.") candidate = None for parser in self._parsers: _logger.debug("Trying to parse %s with %s", path, type(parser)) try: candidate = parser.from_file(path) except UnknownXMLRoot: pass else: break if candidate is None: _logger.error("Could not find suitable parser for file %s. Bypassing...", path) return candidate def _methods_for_collection(self, coll): for methods in self._collections_methods: if methods.collection == coll: return methods raise NoSuchCollectionMethod("Could not find methods for collection '%s'" % (coll,))
[docs] async def upsert_from_parser(self, parser): """Update/Insert records from parser. Iterate throught all `collection_methods` and parse records that belong to the collections. Call upsert(record) for each collection method. :param parser: Parser yielding records. """ for methods in self._collections_methods: for record in parser.select(methods.collection): await methods.upsert(record)
[docs] async def upsert_paths(self, *paths): r"""Upsert records found recursively from paths. :param \*paths: one or more paths to recurse to look for files to parse. """ for path in iterate_xml_files_recursively(*paths): if await self._cache.load_file(path) is False: _logger.info("Bypass path '%s'", path) continue try: parser = self._sourcefileparser(path) if parser is None: _logger.info("Bypass path '%s'", path) continue _logger.info("Process path '%s' with parser '%s'", path, parser) await self.upsert_from_parser(parser) except MappingError: await self._cache.forget_file() _logger.exception("Unable to parse file '%s'. Is the file valid?", path) if self._fail_on_parse: raise _logger.warning("Configured to continue on parse errors. Bypassing file '%s'", path) except Exception: await self._cache.forget_file() _logger.exception("Exception while parsing file '%s'", path) raise else: await self._cache.unload_file()
[docs] async def remove_absent(self, collection, methods): """Remove absent records from collection. Query every distinct ID from collection using methods.query_distinct_ids(). Compare these IDs to the ones that were processed in this batch. Remove every record that was not processed in this batch using methods.remove_record_by_id(). :param str collection: Currently processed collection. :param methods: CollectionMethods-subclass instance containing specialized methods for this collection. """ distinct_ids = await methods.query_distinct_ids() for id_to_remove in distinct_ids.difference(set(self._cache.get_processed_ids_for_collection(collection))): await methods.remove_record_by_id(id_to_remove) _logger.info("Removed '%s' from collection '%s'", id_to_remove, collection)
[docs] async def remove_absent_records(self): """Remove records that were not present in this batch. If cache does not contain any loaded filepaths, will not remove absent records, since it would remove all records in all collections. In that case logs out an error message and return False. If cache contains loaded paths, this method will iterate all _collection_methods and call remove_absent with each collection and collection_method. To remove all records from collection, use :meth:`self.remove_records`. :returns: False if no files were loaded for processing. :rtype: bool """ if not self._cache.loaded_paths: _logger.error("Did not process any files. Will not remove absent records.") return False for methods in self._collections_methods: await self.remove_absent(methods.collection, methods) return True
[docs] async def remove_records(self, rec_or_class=None, hard_delete=True): """Remove records using collection method for collection. Give record as `rec_or_class` parameter to remove a single record. Give record class as `rec_or_class` parameter to remove all records in collection. Leave `rec_or_class` None, to remove all records from all collections. Set `hard_delete` to False to use logical deletions instead of physical ones. :param rec_or_class: Document Store record instance or class. :param bool hard_delete: False to use logical deletions, True (default) to use physical. """ if rec_or_class is None: for methods in self._collections_methods: await methods.remove_records(hard_delete=hard_delete) return methods = self._methods_for_collection(rec_or_class.get_collection()) _id = None is_instance = True try: _id = rec_or_class.get_id() except TypeError: # class.get_id() raises TypeError: get_id() missing 1 required positional argument: 'self' is_instance = False if is_instance and _id is None: record = await methods.query_record(rec_or_class) if record is None: _logger.error("Unable to delete by query: no record found matching the conditions.") return _id = record.get_id() await methods.remove_records(_id=_id, hard_delete=hard_delete)
[docs] def upsert_run(self, lookup_paths, remove_absent=False): """Upsert run with batch or records. Run upsert_paths() in event loop. If remove_absent is True also run remove_absent_records() in event loop. When remove_absent is True, the processor will synchronize records from lookup_paths to Document Store. :param list lookup_paths: Paths to look for source files. :param bool remove_absent: Set to True to remove records that were not found from lookup_paths. """ _logger.info("Upsert from paths: %s", ", ".join(lookup_paths)) asyncio.run(self.upsert_paths(*lookup_paths)) if remove_absent: _logger.info("Remove absent records") asyncio.run(self.remove_absent_records())
[docs] def remove_run(self, rec_or_class=None, hard_delete=True): """Remove run removes records from Document Store. Run remove_records() in event loop. This method passes all parameters to remove_records(). See :meth:`remove_records` for parameter descriptions. :param rec_or_class: Document Store record or class. :param bool hard_delete: True to physically delete records. False to use logical deletions. """ _logger.info('Remove records') asyncio.run(self.remove_records( rec_or_class=rec_or_class, hard_delete=hard_delete))
[docs] def print_summary(self): """Print a summary of operations performed in this batch run. This is a proxy to :meth:`self._cache.print_summary`. """ self._cache.print_summary()