kuha_client

Kuha Client communicates with Document Store and provides a simple way of inserting, updating and deleting records by reading a batch of XML files stored in filesystem.

class kuha_client.SourceFile(path)[source]

Represents a file containing document store records.

Stores file path to path and file modification timestamp to timestamp.

Parameters

path (str) – Absolute path to a file.

add_id(coll, _id)[source]

Add document store collection + id combination.

Attach Document Store ID + collection to this sourcefile, implying that the record in the collection is parsed from this file.

Parameters
  • coll (str) – Document Store collection.

  • _id (str) – Document Store record ID.

list_ids(coll)[source]

List ids for certain collection in this sourcefile.

If this sourcefile does not contain any records from the collection, return and empty list.

Parameters

coll (str) – Document store collection.

Returns

List of Document Store IDs parsed from this file.

class kuha_client.Cache[source]

In-memory cache implementation

Cache keeps track of processed files and Document Store records parsed from the files. It contains a deque of loaded filepaths in loaded_paths.

register_collection(collection)[source]

Registers and formats a collection.

Can be called multiple times with same collection parameter, but will not overwrite previously registered collection.

Parameters

collection (str) – collection name

get_processed_ids_for_collection(collection)[source]

Get a list of every ID that was processed for a collection.

Parameters

collection (str) – collection name

Returns

List of processed ids

Return type

list

async result(coll, _id, result)[source]

Add record’s processing result.

Call when processing of a file has lead to some result in Document Store.

Parameters
  • coll (str) – Document Store collection.

  • _id (str) – Document Store record ID.

  • result (str) – Result. One of self._valid_results.

Raises

ValueError – if result is not valid.

print_summary()[source]

Print a summary of operations recorded in this cache.

async load_file(path)[source]

Load file to process it. Return True if the file needs processing.

Parameters

path (str) – File path.

Returns

True

Return type

bool

async unload_file()[source]

Unload current file.

This cache implementation does not need unloading, so this is a no-op

Returns

True

async forget_file()[source]

Forget a file that was previously loaded.

class kuha_client.FileLoggingCache(path)[source]

File logging cache implementation

Logs the cache in a pickled file that can be loaded to speed-up processing for consecutive runs.

Parameters

path (str) – Path to file. Will be loaded if exists, or created if does not exist.

remove_lost_files()[source]

Remove files loaded from self._path but not in current batch.

Compare filepaths in self.files and self.loaded_paths. Build a new list containing files common in self.files and self.loaded_paths and assign it to self.files

save()[source]

Save FileLog to self._path.

async result(coll, _id, result)[source]

Save ID to collection IDs of the currently processed file.

If the result was to delete the ID or if there is no file being processed at the moment, do not save the id.

Parameters
  • coll (str) – ID belongs to this collection

  • _id (str) – Record ID.

  • result (str) – Processing result.

async load_file(path)[source]

Load file to process it. Return True if the file needs processing.

Parameters

path (str) – File path.

Returns

True if file needs processing.

Return type

bool

async unload_file()[source]

Unload current file.

Returns

True

kuha_client.open_file_logging_cache(path, cache_class=None)[source]

Use file logging cache implementation in a context manager.

Handles loading of cache and removing lost files and saving upon completion of the context.

Parameters
  • path (str) – Path to file cache.

  • cache_class – file logging cache implementation. Defaults to FileLoggingCache

exception kuha_client.DocumentStoreHTTPError(error_response)[source]

Raise if DocumentStore response payload contains errors.

async kuha_client.send_create_record_request(collection, record_dict)[source]

Send HTTP request to create a new record.

Issue HTTP POST to Document Store using record_dict as request body. The record will be created to the specified collection.

Parameters
  • collection (str) – Specify collection.

  • record_dict (dict) – Record as a dict. Used as request body.

Returns

Newly created record ID.

Raises

DocumentStoreHTTPError if Document Store response contains an error message.

async kuha_client.send_update_record_request(collection, record_dict, record_id)[source]

Send HTTP request to update existing record.

Issue HTTP PUT to Document Store using record_dict ` as request body. The request will attempt to overwrite existing record identified by `record_id. The record will be updated to the specified collection.

Parameters
  • collection (str) – Specify collection.

  • record_dict (dict) – Record as a dict. Used in request body.

  • record_id (str) – Record ID.

Returns

None

Raises

DocumentStoreHTTPError if Document Store responds with an error message.

async kuha_client.send_delete_record_request(collection, record_id=None, hard_delete=False)[source]

Send HTTP request to delete existing record/records.

Issue HTTP DELETE to Document Store collection. Use record_id to specify a single record to delete, or delete all records from the collection. Set hard_delete to True, to use physical deletions instead of logical deletions, which is the default.

Parameters
  • collection (str) – Specify collection to delete records from.

  • record_id (str) – Delete by id. Leave None to delete all records from collection.

  • hard_delete (bool) – True to issue physical delete request. False to logically delete records.

Returns

Document Store HTTP response body.

Return type

dict

Raises

DocumentStoreHTTPError if Document Store responds with an error message.

kuha_client.iterate_xml_directory(directory)[source]

Recursively iterate over XML-files in directory.

Parameters

directory (str) – Absolute path to directory.

Returns

generator for iterating XML-files.

kuha_client.iterate_xml_files_recursively(*paths)[source]

Helper for batch processing XML-files.

Check each path. If a path points to a file yield its absolute path. If it points to a directory, recursively iterate paths to each XML-file found and yield each file’s absolute path.

Parameters

path (str) – Repeatable positional argument. Path to file or directory.

Returns

generator for iterating absolute paths to xml-files

class kuha_client.CollectionMethods(cache)[source]

CollectionMethods base class.

Base class for operations being performed against a particular collection. Use by inheriting and defining abstract methods. Every subclass must define collection class attribute, which gets checked on init.

Parameters

cache – Initialized cache implementation object.

async query_record(record)[source]

Query record from Document Store.

This method is called from upsert() method. If this method returns a falsy value then the upsert() will never call update_record(), but will call create_record() instead.

Parameters

record – Record to query for.

Returns

Result of the query.

Return type

instance of record, or None

async query_distinct_ids()[source]

Query distinct IDs from collection that are not deleted

This method is used to lookup ids that are present in DocStore and that should be deleted.

Returns

Distinct ids

Return type

set

async remove_records(_id=None, hard_delete=True)[source]

Remove records from collection

Parameters

_id (str or None) – Submit to remove a single record by id.

Returns

True on success

async remove_record_by_id(_id)[source]

Remove record from collection.

Seealso

send_delete_record_request()

Parameters

_id – Id of the record to be removed.

Returns

True on success, False on fail.

Return type

bool

async upsert(record)[source]

Update or insert record.

If record already exists in Document Store, compare the old one with new. If they do not match, update new record’s metadata with certain values from old and issue an update request to Document Store. If record does not exist in Document Store, create it.

Parameters

record – Document Store record.

Returns

ID of the record in Document Store.

async create_record(record)[source]

Insert new record to Document Store.

Seealso

send_create_record_request()

Parameters

record – Record to insert

Returns

Inserted record ID.

Return type

str

async update_record(new, old)[source]

Update existing Document Store record.

Seealso

send_update_record_request()

Parameters
  • new – New record.

  • old – Old record.

Returns

False if record does not need updating.

Return type

bool

exception kuha_client.NoSuchCollectionMethod[source]

Explicitly raised when BatchProcessor cannot find a required collection. Use to catch such conditions in caller logic.

class kuha_client.BatchProcessor(collections_methods, parsers=None, cache=None, fail_on_parse=True)[source]

Process a batch of files and sync them to Document Store.

Parameters
  • collections_methods (list) – CollectionMethods subclasses

  • parsers (list) – XML parsers

  • cache – Optional cache implementation.

  • fail_on_parse (bool) – Fail on parsing errors or bypass file.

async upsert_from_parser(parser)[source]

Update/Insert records from parser.

Iterate throught all collection_methods and parse records that belong to the collections. Call upsert(record) for each collection method.

Parameters

parser – Parser yielding records.

async upsert_paths(*paths)[source]

Upsert records found recursively from paths.

Parameters

*paths – one or more paths to recurse to look for files to parse.

async remove_absent(collection, methods)[source]

Remove absent records from collection.

Query every distinct ID from collection using methods.query_distinct_ids(). Compare these IDs to the ones that were processed in this batch. Remove every record that was not processed in this batch using methods.remove_record_by_id().

Parameters
  • collection (str) – Currently processed collection.

  • methods – CollectionMethods-subclass instance containing specialized methods for this collection.

async remove_absent_records()[source]

Remove records that were not present in this batch.

If cache does not contain any loaded filepaths, will not remove absent records, since it would remove all records in all collections. In that case logs out an error message and return False.

If cache contains loaded paths, this method will iterate all _collection_methods and call remove_absent with each collection and collection_method.

To remove all records from collection, use self.remove_records().

Returns

False if no files were loaded for processing.

Return type

bool

async remove_records(rec_or_class=None, hard_delete=True)[source]

Remove records using collection method for collection.

Give record as rec_or_class parameter to remove a single record. Give record class as rec_or_class parameter to remove all records in collection. Leave rec_or_class None, to remove all records from all collections. Set hard_delete to False to use logical deletions instead of physical ones.

Parameters
  • rec_or_class – Document Store record instance or class.

  • hard_delete (bool) – False to use logical deletions, True (default) to use physical.

upsert_run(lookup_paths, remove_absent=False)[source]

Upsert run with batch or records.

Run upsert_paths() in event loop. If remove_absent is True also run remove_absent_records() in event loop. When remove_absent is True, the processor will synchronize records from lookup_paths to Document Store.

Parameters
  • lookup_paths (list) – Paths to look for source files.

  • remove_absent (bool) – Set to True to remove records that were not found from lookup_paths.

remove_run(rec_or_class=None, hard_delete=True)[source]

Remove run removes records from Document Store.

Run remove_records() in event loop. This method passes all parameters to remove_records(). See remove_records() for parameter descriptions.

Parameters
  • rec_or_class – Document Store record or class.

  • hard_delete (bool) – True to physically delete records. False to use logical deletions.

print_summary()[source]

Print a summary of operations performed in this batch run.

This is a proxy to self._cache.print_summary().

impl.py

Implementations for client collection methods.

Define CollectionMethods subclasses for supported collections. Subclass BatchProcessor to support StudyGroups gathered from multiple source files.

class kuha_client.impl.StudyMethods(cache)[source]

Define StudyMethods

Parameters

cache – Initialized cache implementation.

async query_record(record)[source]

Query Study record.

:param kuha_common.document_store.records.Study record:

Study record to query for.

Returns

Study found from Document Store or None.

Return type

kuha_common.document_store.records.Study or None

async query_distinct_ids()[source]

Get distinct ids from Study collection.

The query filters out logically deleted Study ids.

Returns

Distinct ids from Study collection

Return type

set

async remove_record_by_id(_id)[source]

Remove Study and relatives by Study id

Will remove Study and its relative records. Also removes the reference to this study from relative StudyGroups.

Parameters

_id (str) – Id of the record to remove

Raises

ValueError if Study delete request returns an unexpected response.

class kuha_client.impl.VariableMethods(cache)[source]

Define VariableMethods

async query_record(record)[source]

Query Variable record.

:param kuha_common.document_store.records.Variable record:

Variable record to query for.

Returns

Variable found from Document Store or None.

Return type

kuha_common.document_store.records.Variable or None

async query_distinct_ids()[source]

Query for distinct Variable ids

The query filters out logically deleted Variables.

Returns

Distinct ids from Variable collection.

Return type

set

class kuha_client.impl.QuestionMethods(cache)[source]

Define QuestionMethods

async query_record(record)[source]

Query Question record.

:param kuha_common.document_store.records.Question record:

Question record to query for.

Returns

Question found from Document Store.

Return type

kuha_common.document_store.records.Question

async query_distinct_ids()[source]

Query distinct ids from Question collection.

The query filters out logically deleted questions.

Returns

Distinct ids from Questions collection.

Return type

set

class kuha_client.impl.StudyGroupMethods(*args, **kwargs)[source]

Define StudyGroup methods

Keeps track of found study groups and does not issue requests to Document Store right away. Implements method really_upsert() which actually performs the requests to Document Store.

This implementation must be used with a compatible BatchProcessor implementation that understands the behaviour. Mainly the upsert_paths() method must call really_upsert() after all files in batch have been processed. See StudyGroupsBatchProcessor for compatible BatchProcessor implementation details.

async query_record(record)[source]

Query StudyGroup record.

:param kuha_common.document_store.records.StudyGroup record:

StudyGroup record to query for.

Returns

StudyGroup found from Document Store or None.

Return type

kuha_common.document_store.records.StudyGroup or None

async query_distinct_ids()[source]

Query distinct StudyGroup ids.

Filters out all logically deleted StudyGroups.

Returns

Distinct StudyGroup ids.

Return type

set

async really_upsert()[source]

Special method to actually perform requests to Document Store.

Iterates thought all pending study groups and calls upsert(study_group) for each. If cache implements a filecache calls add_id(‘study_groups’, obj_id) for each file that has a reference to the study_group.

async upsert(record)[source]

Override to handle special case for StudyGroups

StudyGroups are are gathered from all sourcefiles and should be upserted at the end of the run.

:param kuha_common.document_store.records.StudyGroup record:

StudyGroup record.

async remove_records_with_no_study_numbers()[source]

Remove StudyGroup records that do not have any references to Studies

Filters out StudyGroups that have been logically deleted.

kuha_client.impl.collection_methods(collections=None)[source]

Get implemented collection methods by collection names

Parameters

collections (list or None) – Returns collection methods for these collections. Leave None to return all collection methods.

Returns

Implemented collection methods

Return type

list

class kuha_client.impl.StudyGroupsBatchProcessor(collections_methods, parsers=None, cache=None, fail_on_parse=True)[source]

Subclass BatchProcessor to handle StudyGroups.

Define a BatchProcessor implementation which understands the processing of StudyGroups in StudyGroupMethods

async upsert_paths(*paths)[source]

Upsert records from paths.

Calls parent upsert_paths() first and calls StudyGroupMethods.really_upsert() after, to make sure the StudyGroups are actually updated and inserted.

async remove_absent_records()[source]

Remove records that were not found in this batch.

Removes StudyGroups that do not contain any references to studies.

Returns

False if no filepaths were processed (see parent method). Otherwise True.

sync.py

Define command line interface for syncronizing files to Document Store.

Example run from command line. Sync folder xml_files to Document Store:

python -m kuha_client.sync --document-store-url=http://localhost:6001/v0 --file-cache cache.pickle xml_files

Print help:

python -m kuha_client.sync -h
kuha_client.sync.get_proc(collection_methods=None, parsers=None, **kw)[source]

Convenience function to instantiate a StudyGroupsBatchProcessor with default parameters.

Additional keyword arguments are passed to StudyGroupsBatchProcessor.

Parameters
  • collection_methods (list or None) – Collection methods parameter for StudyGroupsBatchProcessor. Defaults to None, which implies the use of all implemented collection methods from impl.py.

  • parsers (list or None) – Parsers parameter for StudyGroupsBatchProcessor. Default to None, which implies to use all implemented Record Parsers from kuha_common.document_store.mappings.ddi.

Returns

StudyGroupsBatchProcessor instance.

Return type

StudyGroupsBatchProcessor

kuha_client.sync.configure()[source]

Define configuration interface, return loaded settings.

Returns

Loaded settings

Return type

argparse.Namespace

kuha_client.sync.cli()[source]

Run sync command line interface.

Interpret configuration and call BatchProcessor implementation.

kuha_delete.py

Callable module serves as entry poin to delete records from DocumentStore.

Example run from command line. Delete study with ID:

python -m kuha_client.kuha_delete --document-store-url=http://localhost:6001/v0 studies 5afa741d6fb71d7b2d333982

Print help:

python -m kuha_client.kuha_delete -h
kuha_client.kuha_delete.cli()[source]

Parse command line arguments and call BatchProcessor.remove_run()