Source code for kuha_client.sync

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2022 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Define command line interface for syncronizing files to Document Store.

Example run from command line. Sync folder xml_files to Document Store::

    python -m kuha_client.sync --document-store-url=http://localhost:6001/v0 --file-cache cache.pickle xml_files

Print help::

    python -m kuha_client.sync -h

"""
import sys
from kuha_common import (
    conf,
    cli_setup
)
from kuha_common.document_store.mappings.ddi import (
    DDI122RecordParser,
    DDI122NesstarRecordParser,
    DDI25RecordParser,
    DDI31RecordParser,
    DDI33RecordParser
)
from kuha_client import (
    impl,
    open_file_logging_cache
)


[docs]def get_proc(collection_methods=None, parsers=None, **kw): """Convenience function to instantiate a StudyGroupsBatchProcessor with default parameters. Additional keyword arguments are passed to StudyGroupsBatchProcessor. :param list or None collection_methods: Collection methods parameter for StudyGroupsBatchProcessor. Defaults to None, which implies the use of all implemented collection methods from impl.py. :param list or None parsers: Parsers parameter for StudyGroupsBatchProcessor. Default to None, which implies to use all implemented Record Parsers from :mod:`kuha_common.document_store.mappings.ddi`. :returns: StudyGroupsBatchProcessor instance. :rtype: :obj:`StudyGroupsBatchProcessor` """ return impl.StudyGroupsBatchProcessor(collection_methods or impl.collection_methods(), parsers=parsers or [DDI122RecordParser, DDI122NesstarRecordParser, DDI25RecordParser, DDI31RecordParser, DDI33RecordParser], **kw)
[docs]def configure(): """Define configuration interface, return loaded settings. :returns: Loaded settings :rtype: :obj:`argparse.Namespace` """ conf.load('kuha_client.sync', package='kuha_client', env_var_prefix='KUHA_') conf.add_print_arg() conf.add_config_arg() conf.add('--file-cache', type=str, env_var='FILE_CACHE', help='Path to a cache file. Leave unset to not use file caching.') conf.add('--no-remove', action='store_true', env_var='NO_REMOVE', help="Don't remove records that were not found in this batch.") conf.add('--print-summary', action='store_true', env_var='PRINT_SUMMARY', help="Prints a summary of " "operations that were performed during the sync run.") conf.add('--no-fail-on-parse', action='store_true', env_var='NO_FAIL_ON_PARSE', help="Do not stop processing a batch when a file cannot be properly parsed. " "Instead log an error and bypass the file. Enabling this option makes the " "synchronization process more fault tolerant. Note that if a file is not " "parsed properly it's records are not stored correctly and the database " "content will not reflect the batch of files that were being processed.") conf.add('--collection', type=str, action='append', env_var='COLLECTION', help=('Specific collection to process. If not given, will process all collections.'), choices=impl.COLLECTIONS_METHODS.keys()) conf.add('paths', nargs='+', help="Paths to files to synchronize. If path points to a folder, it and its " "subfolders will be searched for '.xml'-suffixed files") return cli_setup.setup_common_modules(cli_setup.MOD_LOGGING, cli_setup.MOD_DS_CLIENT, cli_setup.MOD_DS_QUERY)
[docs]def cli(): """Run sync command line interface. Interpret configuration and call BatchProcessor implementation. """ settings = configure() collection_methods = impl.collection_methods(settings.collection) remove_absent = settings.no_remove is False fail_on_parse = settings.no_fail_on_parse is False if settings.file_cache: with open_file_logging_cache(settings.file_cache) as cache: proc = get_proc(collection_methods=collection_methods, cache=cache, fail_on_parse=fail_on_parse) proc.upsert_run(settings.paths, remove_absent=remove_absent) else: proc = get_proc(collection_methods=collection_methods, fail_on_parse=fail_on_parse) proc.upsert_run(settings.paths, remove_absent=remove_absent) if settings.print_summary: proc.print_summary()
if __name__ == '__main__': sys.exit(cli())