kuha_client

Kuha Client communicates with Document Store and provides a simple way of inserting, updating and deleting records by reading a batch of XML files stored in filesystem.

class kuha_client.SourceFile(path)[source]

Represents a file containing document store records.

Stores file path to path and file modification timestamp to timestamp.

Parameters:

path (str) – Absolute path to a file.

add_id(coll, _id)[source]

Add document store collection + id combination.

Attach Document Store ID + collection to this sourcefile, implying that the record in the collection is parsed from this file.

Parameters:
  • coll (str) – Document Store collection.

  • _id (str) – Document Store record ID.

list_ids(coll)[source]

List ids for certain collection in this sourcefile.

If this sourcefile does not contain any records from the collection, return an empty list.

Parameters:

coll (str) – Document store collection.

Returns:

List of Document Store IDs parsed from this file.

class kuha_client.Cache[source]

In-memory cache implementation

Cache keeps track of processed files and Document Store records parsed from the files. It contains a deque of loaded filepaths in _loaded_paths.

This cache does not store the cached results for later use. It only keeps them for the lifetime of the Cache instance. This cache is primarily used to warn if a record has already been processed (see result()) and print a summary of processed records (see print_summary()). The runner may also be interested in knowing if any files were processed in a run (see has_loaded_paths()) and what IDs were processed during a run (get_processed_ids_for_collection()).

Usage example. A new record (‘some_id’) is created to ‘studies’-collection from file ‘/some/path’:

>>> cache = Cache()
>>> cache.register_collection('studies')
>>> cache.load_file('/some/path')
# Process file and determine result
[...]
>>> cache.result('studies', 'some_id', cache.CREATE)
>>> cache.unload_file()

If a file cannot be processed, use forget_file():

>>> cache = Cache()
>>> cache.register_collection('studies')
>>> cache.load_file('/some/path')
# Attempt to process file, but fail to do so.
[...]
>>> cache.forget_file()
property has_loaded_paths

True if cache has loaded paths

Returns:

True if this instance has loaded any filepaths

property has_results

Return True if there are cached results

Returns:

True if this instance has cached results

async invalidate()[source]

Invalidate cache

Sets self.invalidated to True.

Invalidating a cache typically renders all previously cached results invalid. This cache implementation only caches results for the lifetime of the cache instance. It does not actually store the results for later use. If this cache implementation is used, there is typically no reason to call invalidate. This is mainly here to support cache invalidation in derived classes.

If there is need to empty the cached results, simply create a new Cache instance.

Cache instance cannot be invalidated, if it already has cached results. Creating a new instance effectively empties the cached results.

Raises:

ValueError if attempting to invalidate a cache that already has results.

register_collection(collection)[source]

Register collection

Collection must be registered to Cache before any results can be added to it. Collections cannot be registered after the cache instance has cached results.

Register collections that are subject to changes before making any operations that may alter these collections. Typically this is done before making any sync runs with BatchProcessor or similar implementation.

Seealso:

self.registered_collections() and self.result().

Parameters:

collection (str) – Collection to register

Raises:

ValueError if this cache instance already has cached results.

Raises:

ValueError if this cache instance already has registered this collection.

property registered_collections

Return registered collections

Returns:

Registered collections

Return type:

list

get_processed_ids_for_collection(collection)[source]

Get a list of every ID that was processed for a collection.

Parameters:

collection (str) – collection name

Returns:

List of processed ids

Return type:

list

async result(coll, _id, result)[source]

Add record’s processing result.

Call when processing of a file has lead to some result in Document Store.

Parameters:
  • coll (str) – Document Store collection.

  • _id (str) – Document Store record ID.

  • result (str) – Result. One of self._valid_results.

Raises:

ValueError – if result is not valid.

print_summary()[source]

Print a summary of operations recorded in this cache.

async load_file(path)[source]

Load file to process it. Return True if the file needs processing.

Parameters:

path (str) – File path.

Returns:

True

Return type:

bool

async unload_file()[source]

Unload current file.

This cache implementation does not need unloading, so this is a no-op

Returns:

True

async forget_file()[source]

Forget a file that was previously loaded.

class kuha_client.FileLoggingCache(path)[source]

File logging cache implementation

Logs the cache in a pickled file that can be loaded to speed-up processing for consecutive runs.

Parameters:

path (str) – Path to file. Will be loaded if exists, or created if does not exist.

remove_lost_files()[source]

Remove files loaded from self._path but not in current batch.

Compare filepaths in self._files and self._loaded_paths. Build a new list containing files common in self._files and self._loaded_paths and assign it to self._files

async invalidate()[source]

Invalidate cache

save()[source]

Save FileLog to self._path.

async result(coll, _id, result)[source]

Save ID to collection IDs of the currently processed file.

If the result was to delete the ID or if there is no file being processed at the moment, do not save the id.

Parameters:
  • coll (str) – ID belongs to this collection

  • _id (str) – Record ID.

  • result (str) – Processing result.

async load_file(path)[source]

Load file to process it. Return True if the file needs processing.

Parameters:

path (str) – File path.

Returns:

True if file needs processing.

Return type:

bool

async unload_file()[source]

Unload current file.

Returns:

True

async has_cached_results_for_collection(collection)[source]

Return True if stored cache file has any results for collection

Parameters:

collection (str) – Collection to check for results

Returns:

True if cache file has any results for this collection

kuha_client.open_file_logging_cache(path, cache_class=None)[source]

Use file logging cache implementation in a context manager.

Handles loading of cache and removing lost files and saving upon completion of the context.

Parameters:
  • path (str) – Path to file cache.

  • cache_class – file logging cache implementation. Defaults to FileLoggingCache

exception kuha_client.DocumentStoreHTTPError(error_response)[source]

Raise if DocumentStore response payload contains errors.

async kuha_client.send_create_record_request(collection, record_dict)[source]

Send HTTP request to create a new record.

Issue HTTP POST to Document Store using record_dict as request body. The record will be created to the specified collection.

Parameters:
  • collection (str) – Specify collection.

  • record_dict (dict) – Record as a dict. Used as request body.

Returns:

Newly created record ID.

Raises:

DocumentStoreHTTPError if Document Store response contains an error message.

async kuha_client.send_update_record_request(collection, record_dict, record_id)[source]

Send HTTP request to update existing record.

Issue HTTP PUT to Document Store using record_dict ` as request body. The request will attempt to overwrite existing record identified by `record_id. The record will be updated to the specified collection.

Parameters:
  • collection (str) – Specify collection.

  • record_dict (dict) – Record as a dict. Used in request body.

  • record_id (str) – Record ID.

Returns:

None

Raises:

DocumentStoreHTTPError if Document Store responds with an error message.

async kuha_client.send_delete_record_request(collection, record_id=None, hard_delete=False)[source]

Send HTTP request to delete existing record/records.

Issue HTTP DELETE to Document Store collection. Use record_id to specify a single record to delete, or delete all records from the collection. Set hard_delete to True, to use physical deletions instead of logical deletions, which is the default.

Parameters:
  • collection (str) – Specify collection to delete records from.

  • record_id (str) – Delete by id. Leave None to delete all records from collection.

  • hard_delete (bool) – True to issue physical delete request. False to logically delete records.

Returns:

Document Store HTTP response body.

Return type:

dict

Raises:

DocumentStoreHTTPError if Document Store responds with an error message.

kuha_client.iterate_xml_directory(directory)[source]

Recursively iterate over XML-files in directory.

Parameters:

directory (str) – Absolute path to directory.

Returns:

generator for iterating XML-files.

kuha_client.iterate_xml_files_recursively(*paths)[source]

Helper for batch processing XML-files.

Check each path. If a path points to a file yield its absolute path. If it points to a directory, recursively iterate paths to each XML-file found and yield each file’s absolute path.

Parameters:

path (str) – Repeatable positional argument. Path to file or directory.

Returns:

generator for iterating absolute paths to xml-files

class kuha_client.CollectionMethods(cache)[source]

CollectionMethods base class.

Base class for operations being performed against a particular collection. Use by inheriting and defining abstract methods. Every subclass must define collection class attribute, which gets checked on init.

Parameters:

cache – Initialized cache implementation object.

async prepare_for_upsert_run()[source]

Prepare for upsert run

Invalidate cache if schema version in docstore differ from schema version in records and collections have not yet been registered to cache. Register collections to cache.

Note that this method will be responsible for possible data migrations upon schema version change in the future.

async query_record(record)[source]

Query record from Document Store.

This method is called from upsert() method. If this method returns a falsy value then the upsert() will never call update_record(), but will call create_record() instead.

Parameters:

record – Record to query for.

Returns:

Result of the query.

Return type:

instance of record, or None

async query_distinct_ids()[source]

Query distinct IDs from collection that are not deleted

This method is used to lookup ids that are present in DocStore and that should be deleted.

Returns:

Distinct ids

Return type:

set

async remove_records(_id=None, hard_delete=True)[source]

Remove records from collection

Parameters:

_id (str or None) – Submit to remove a single record by id.

Returns:

True on success

async remove_record_by_id(_id)[source]

Remove record from collection.

Seealso:

send_delete_record_request()

Parameters:

_id – Id of the record to be removed.

Returns:

True on success, False on fail.

Return type:

bool

async upsert(record)[source]

Update or insert record.

If record already exists in Document Store, compare the old one with new. If they do not match, update new record’s metadata with certain values from old and issue an update request to Document Store. If record does not exist in Document Store, create it.

Parameters:

record – Document Store record.

Returns:

ID of the record in Document Store.

async create_record(record)[source]

Insert new record to Document Store.

Seealso:

send_create_record_request()

Parameters:

record – Record to insert

Returns:

Inserted record ID.

Return type:

str

async update_record(new, old)[source]

Update existing Document Store record.

Seealso:

send_update_record_request()

Parameters:
  • new – New record.

  • old – Old record.

Returns:

False if record does not need updating.

Return type:

bool

exception kuha_client.NoSuchCollectionMethod[source]

Explicitly raised when BatchProcessor cannot find a required collection. Use to catch such conditions in caller logic.

class kuha_client.BatchProcessor(collections_methods, parsers=None, cache=None, fail_on_parse=True)[source]

Process a batch of files and sync them to Document Store.

Parameters:
  • collections_methods (list) – CollectionMethods subclasses

  • parsers (list) – XML parsers

  • cache – Optional cache implementation.

  • fail_on_parse (bool) – Fail on parsing errors or bypass file.

async upsert_from_parser(parser)[source]

Update/Insert records from parser.

Iterate throught all collection_methods and parse records that belong to the collections. Call upsert(record) for each collection method.

Parameters:

parser – Parser yielding records.

async upsert_paths(*paths)[source]

Upsert records found recursively from paths.

Parameters:

*paths – one or more paths to recurse to look for files to parse.

async remove_absent(collection, methods)[source]

Remove absent records from collection.

Query every distinct ID from collection using methods.query_distinct_ids(). Compare these IDs to the ones that were processed in this batch. Remove every record that was not processed in this batch using methods.remove_record_by_id().

Parameters:
  • collection (str) – Currently processed collection.

  • methods – CollectionMethods-subclass instance containing specialized methods for this collection.

async remove_absent_records()[source]

Remove records that were not present in this batch.

If cache does not contain any loaded filepaths, will not remove absent records, since it would remove all records in all collections. In that case logs out an error message and return False.

If cache contains loaded paths, this method will iterate all _collection_methods and call remove_absent with each collection and collection_method.

To remove all records from collection, use self.remove_records().

Returns:

False if no files were loaded for processing.

Return type:

bool

async remove_records(rec_or_class=None, hard_delete=True)[source]

Remove records using collection method for collection.

Give record as rec_or_class parameter to remove a single record. Give record class as rec_or_class parameter to remove all records in collection. Leave rec_or_class None, to remove all records from all collections. Set hard_delete to False to use logical deletions instead of physical ones.

Parameters:
  • rec_or_class – Document Store record instance or class.

  • hard_delete (bool) – False to use logical deletions, True (default) to use physical.

upsert_run(lookup_paths, remove_absent=False)[source]

Upsert run with batch or records.

Run upsert_paths() in event loop. If remove_absent is True also run remove_absent_records() in event loop. When remove_absent is True, the processor will synchronize records from lookup_paths to Document Store.

Parameters:
  • lookup_paths (list) – Paths to look for source files.

  • remove_absent (bool) – Set to True to remove records that were not found from lookup_paths.

remove_run(rec_or_class=None, hard_delete=True)[source]

Remove run removes records from Document Store.

Run remove_records() in event loop. This method passes all parameters to remove_records(). See remove_records() for parameter descriptions.

Parameters:
  • rec_or_class – Document Store record or class.

  • hard_delete (bool) – True to physically delete records. False to use logical deletions.

print_summary()[source]

Print a summary of operations performed in this batch run.

This is a proxy to self._cache.print_summary().

impl.py

Implementations for client collection methods.

Define CollectionMethods subclasses for supported collections. Subclass BatchProcessor to support StudyGroups gathered from multiple source files.

class kuha_client.impl.StudyMethods(cache)[source]

Define StudyMethods

Parameters:

cache – Initialized cache implementation.

async query_record(record)[source]

Query Study record.

:param kuha_common.document_store.records.Study record:

Study record to query for.

Returns:

Study found from Document Store or None.

Return type:

kuha_common.document_store.records.Study or None

async query_distinct_ids()[source]

Get distinct ids from Study collection.

The query filters out logically deleted Study ids.

Returns:

Distinct ids from Study collection

Return type:

set

async remove_record_by_id(_id)[source]

Remove Study and relatives by Study id

Will remove Study and its relative records. Also removes the reference to this study from relative StudyGroups.

Parameters:

_id (str) – Id of the record to remove

Raises:

ValueError if Study delete request returns an unexpected response.

class kuha_client.impl.VariableMethods(cache)[source]

Define VariableMethods

async query_record(record)[source]

Query Variable record.

:param kuha_common.document_store.records.Variable record:

Variable record to query for.

Returns:

Variable found from Document Store or None.

Return type:

kuha_common.document_store.records.Variable or None

async query_distinct_ids()[source]

Query for distinct Variable ids

The query filters out logically deleted Variables.

Returns:

Distinct ids from Variable collection.

Return type:

set

class kuha_client.impl.QuestionMethods(cache)[source]

Define QuestionMethods

async query_record(record)[source]

Query Question record.

:param kuha_common.document_store.records.Question record:

Question record to query for.

Returns:

Question found from Document Store.

Return type:

kuha_common.document_store.records.Question

async query_distinct_ids()[source]

Query distinct ids from Question collection.

The query filters out logically deleted questions.

Returns:

Distinct ids from Questions collection.

Return type:

set

class kuha_client.impl.StudyGroupMethods(*args, **kwargs)[source]

Define StudyGroup methods

Keeps track of found study groups and does not issue requests to Document Store right away. Implements method really_upsert() which actually performs the requests to Document Store.

This implementation must be used with a compatible BatchProcessor implementation that understands the behaviour. Mainly the upsert_paths() method must call really_upsert() after all files in batch have been processed. See StudyGroupsBatchProcessor for compatible BatchProcessor implementation details.

async query_record(record)[source]

Query StudyGroup record.

:param kuha_common.document_store.records.StudyGroup record:

StudyGroup record to query for.

Returns:

StudyGroup found from Document Store or None.

Return type:

kuha_common.document_store.records.StudyGroup or None

async query_distinct_ids()[source]

Query distinct StudyGroup ids.

Filters out all logically deleted StudyGroups.

Returns:

Distinct StudyGroup ids.

Return type:

set

async really_upsert()[source]

Special method to actually perform requests to Document Store.

Iterates thought all pending study groups and calls upsert(study_group) for each. If cache implements a filecache calls add_id(‘study_groups’, obj_id) for each file that has a reference to the study_group.

async upsert(record)[source]

Override to handle special case for StudyGroups

StudyGroups are are gathered from all sourcefiles and should be upserted at the end of the run.

:param kuha_common.document_store.records.StudyGroup record:

StudyGroup record.

async remove_records_with_no_study_numbers()[source]

Remove StudyGroup records that do not have any references to Studies

Filters out StudyGroups that have been logically deleted.

kuha_client.impl.collection_methods(collections=None)[source]

Get implemented collection methods by collection names

Parameters:

collections (list or None) – Returns collection methods for these collections. Leave None to return all collection methods.

Returns:

Implemented collection methods

Return type:

list

class kuha_client.impl.StudyGroupsBatchProcessor(collections_methods, parsers=None, cache=None, fail_on_parse=True)[source]

Subclass BatchProcessor to handle StudyGroups.

Define a BatchProcessor implementation which understands the processing of StudyGroups in StudyGroupMethods

async upsert_paths(*paths)[source]

Upsert records from paths.

Calls parent upsert_paths() first and calls StudyGroupMethods.really_upsert() after, to make sure the StudyGroups are actually updated and inserted.

async remove_absent_records()[source]

Remove records that were not found in this batch.

Removes StudyGroups that do not contain any references to studies.

Returns:

False if no filepaths were processed (see parent method). Otherwise True.

sync.py

Define command line interface for syncronizing files to Document Store.

Example run from command line. Sync folder xml_files to Document Store:

python -m kuha_client.sync --document-store-url=http://localhost:6001/v0 --file-cache cache.pickle xml_files

Print help:

python -m kuha_client.sync -h
kuha_client.sync.get_proc(collection_methods=None, parsers=None, **kw)[source]

Convenience function to instantiate a StudyGroupsBatchProcessor with default parameters.

Additional keyword arguments are passed to StudyGroupsBatchProcessor.

Parameters:
  • collection_methods (list or None) – Collection methods parameter for StudyGroupsBatchProcessor. Defaults to None, which implies the use of all implemented collection methods from impl.py.

  • parsers (list or None) – Parsers parameter for StudyGroupsBatchProcessor. Default to None, which implies to use all implemented Record Parsers from kuha_common.document_store.mappings.ddi.

Returns:

StudyGroupsBatchProcessor instance.

Return type:

StudyGroupsBatchProcessor

kuha_client.sync.configure()[source]

Define configuration interface, return loaded settings.

Returns:

Loaded settings

Return type:

argparse.Namespace

kuha_client.sync.cli()[source]

Run sync command line interface.

Interpret configuration and call BatchProcessor implementation.

kuha_delete.py

Callable module serves as entry poin to delete records from DocumentStore.

Example run from command line. Delete study with ID:

python -m kuha_client.kuha_delete --document-store-url=http://localhost:6001/v0 studies 5afa741d6fb71d7b2d333982

Print help:

python -m kuha_client.kuha_delete -h
kuha_client.kuha_delete.cli()[source]

Parse command line arguments and call BatchProcessor.remove_run()