kuha_client
Kuha Client communicates with Document Store and provides a simple way of inserting, updating and deleting records by reading a batch of XML files stored in filesystem.
- class kuha_client.SourceFile(path)[source]
Represents a file containing document store records.
Stores file path to
pathand file modification timestamp totimestamp.- Parameters:
path (str) – Absolute path to a file.
- class kuha_client.Cache[source]
In-memory cache implementation
Cache keeps track of processed files and Document Store records parsed from the files. It contains a
dequeof loaded filepaths in_loaded_paths.This cache does not store the cached results for later use. It only keeps them for the lifetime of the Cache instance. This cache is primarily used to warn if a record has already been processed (see
result()) and print a summary of processed records (seeprint_summary()). The runner may also be interested in knowing if any files were processed in a run (seehas_loaded_paths()) and what IDs were processed during a run (get_processed_ids_for_collection()).Usage example. A new record (‘some_id’) is created to ‘studies’-collection from file ‘/some/path’:
>>> cache = Cache() >>> cache.register_collection('studies') >>> cache.load_file('/some/path') # Process file and determine result [...] >>> cache.result('studies', 'some_id', cache.CREATE) >>> cache.unload_file()
If a file cannot be processed, use
forget_file():>>> cache = Cache() >>> cache.register_collection('studies') >>> cache.load_file('/some/path') # Attempt to process file, but fail to do so. [...] >>> cache.forget_file()
- property has_loaded_paths
True if cache has loaded paths
- Returns:
True if this instance has loaded any filepaths
- property has_results
Return True if there are cached results
- Returns:
True if this instance has cached results
- async invalidate()[source]
Invalidate cache
Sets self.invalidated to True.
Invalidating a cache typically renders all previously cached results invalid. This cache implementation only caches results for the lifetime of the cache instance. It does not actually store the results for later use. If this cache implementation is used, there is typically no reason to call invalidate. This is mainly here to support cache invalidation in derived classes.
If there is need to empty the cached results, simply create a new Cache instance.
Cache instance cannot be invalidated, if it already has cached results. Creating a new instance effectively empties the cached results.
- Raises:
ValueErrorif attempting to invalidate a cache that already has results.
- register_collection(collection)[source]
Register collection
Collection must be registered to Cache before any results can be added to it. Collections cannot be registered after the cache instance has cached results.
Register collections that are subject to changes before making any operations that may alter these collections. Typically this is done before making any sync runs with BatchProcessor or similar implementation.
- Seealso:
self.registered_collections()andself.result().- Parameters:
collection (str) – Collection to register
- Raises:
ValueErrorif this cache instance already has cached results.- Raises:
ValueErrorif this cache instance already has registered this collection.
- property registered_collections
Return registered collections
- Returns:
Registered collections
- Return type:
- get_processed_ids_for_collection(collection)[source]
Get a list of every ID that was processed for a collection.
- async result(coll, _id, result)[source]
Add record’s processing result.
Call when processing of a file has lead to some result in Document Store.
- Parameters:
- Raises:
ValueError – if result is not valid.
- class kuha_client.FileLoggingCache(path)[source]
File logging cache implementation
Logs the cache in a pickled file that can be loaded to speed-up processing for consecutive runs.
- Parameters:
path (str) – Path to file. Will be loaded if exists, or created if does not exist.
- remove_lost_files()[source]
Remove files loaded from
self._pathbut not in current batch.Compare filepaths in
self._filesandself._loaded_paths. Build a new list containing files common inself._filesandself._loaded_pathsand assign it toself._files
- async result(coll, _id, result)[source]
Save ID to collection IDs of the currently processed file.
If the result was to delete the ID or if there is no file being processed at the moment, do not save the id.
- kuha_client.open_file_logging_cache(path, cache_class=None)[source]
Use file logging cache implementation in a context manager.
Handles loading of cache and removing lost files and saving upon completion of the context.
- Parameters:
path (str) – Path to file cache.
cache_class – file logging cache implementation. Defaults to
FileLoggingCache
- exception kuha_client.DocumentStoreHTTPError(error_response)[source]
Raise if DocumentStore response payload contains errors.
- async kuha_client.send_create_record_request(collection, record_dict)[source]
Send HTTP request to create a new record.
Issue HTTP POST to Document Store using record_dict as request body. The record will be created to the specified collection.
- Parameters:
- Returns:
Newly created record ID.
- Raises:
DocumentStoreHTTPErrorif Document Store response contains an error message.
- async kuha_client.send_update_record_request(collection, record_dict, record_id)[source]
Send HTTP request to update existing record.
Issue HTTP PUT to Document Store using record_dict ` as request body. The request will attempt to overwrite existing record identified by `record_id. The record will be updated to the specified collection.
- Parameters:
- Returns:
None
- Raises:
DocumentStoreHTTPErrorif Document Store responds with an error message.
- async kuha_client.send_delete_record_request(collection, record_id=None, hard_delete=False)[source]
Send HTTP request to delete existing record/records.
Issue HTTP DELETE to Document Store collection. Use record_id to specify a single record to delete, or delete all records from the collection. Set hard_delete to True, to use physical deletions instead of logical deletions, which is the default.
- Parameters:
- Returns:
Document Store HTTP response body.
- Return type:
- Raises:
DocumentStoreHTTPErrorif Document Store responds with an error message.
- kuha_client.iterate_xml_directory(directory)[source]
Recursively iterate over XML-files in directory.
- Parameters:
directory (str) – Absolute path to directory.
- Returns:
generator for iterating XML-files.
- kuha_client.iterate_xml_files_recursively(*paths)[source]
Helper for batch processing XML-files.
Check each path. If a path points to a file yield its absolute path. If it points to a directory, recursively iterate paths to each XML-file found and yield each file’s absolute path.
- Parameters:
path (str) – Repeatable positional argument. Path to file or directory.
- Returns:
generator for iterating absolute paths to xml-files
- class kuha_client.CollectionMethods(cache)[source]
CollectionMethods base class.
Base class for operations being performed against a particular collection. Use by inheriting and defining abstract methods. Every subclass must define
collectionclass attribute, which gets checked on init.- Parameters:
cache – Initialized cache implementation object.
- async prepare_for_upsert_run()[source]
Prepare for upsert run
Invalidate cache if schema version in docstore differ from schema version in records and collections have not yet been registered to cache. Register collections to cache.
Note that this method will be responsible for possible data migrations upon schema version change in the future.
- async query_record(record)[source]
Query record from Document Store.
This method is called from upsert() method. If this method returns a falsy value then the upsert() will never call update_record(), but will call create_record() instead.
- Parameters:
record – Record to query for.
- Returns:
Result of the query.
- Return type:
instance of record, or None
- async query_distinct_ids()[source]
Query distinct IDs from collection that are not deleted
This method is used to lookup ids that are present in DocStore and that should be deleted.
- Returns:
Distinct ids
- Return type:
- async remove_records(_id=None, hard_delete=True)[source]
Remove records from collection
- Parameters:
_id (str or None) – Submit to remove a single record by id.
- Returns:
True on success
- async remove_record_by_id(_id)[source]
Remove record from collection.
- Seealso:
- Parameters:
_id – Id of the record to be removed.
- Returns:
True on success, False on fail.
- Return type:
- async upsert(record)[source]
Update or insert record.
If record already exists in Document Store, compare the old one with new. If they do not match, update new record’s metadata with certain values from old and issue an update request to Document Store. If record does not exist in Document Store, create it.
- Parameters:
record – Document Store record.
- Returns:
ID of the record in Document Store.
- async create_record(record)[source]
Insert new record to Document Store.
- Seealso:
- Parameters:
record – Record to insert
- Returns:
Inserted record ID.
- Return type:
- exception kuha_client.NoSuchCollectionMethod[source]
Explicitly raised when BatchProcessor cannot find a required collection. Use to catch such conditions in caller logic.
- class kuha_client.BatchProcessor(collections_methods, parsers=None, cache=None, fail_on_parse=True)[source]
Process a batch of files and sync them to Document Store.
- Parameters:
- async upsert_from_parser(parser)[source]
Update/Insert records from parser.
Iterate throught all collection_methods and parse records that belong to the collections. Call upsert(record) for each collection method.
- Parameters:
parser – Parser yielding records.
- async upsert_paths(*paths)[source]
Upsert records found recursively from paths.
- Parameters:
*paths – one or more paths to recurse to look for files to parse.
- async remove_absent(collection, methods)[source]
Remove absent records from collection.
Query every distinct ID from collection using methods.query_distinct_ids(). Compare these IDs to the ones that were processed in this batch. Remove every record that was not processed in this batch using methods.remove_record_by_id().
- Parameters:
collection (str) – Currently processed collection.
methods – CollectionMethods-subclass instance containing specialized methods for this collection.
- async remove_absent_records()[source]
Remove records that were not present in this batch.
If cache does not contain any loaded filepaths, will not remove absent records, since it would remove all records in all collections. In that case logs out an error message and return False.
If cache contains loaded paths, this method will iterate all _collection_methods and call remove_absent with each collection and collection_method.
To remove all records from collection, use
self.remove_records().- Returns:
False if no files were loaded for processing.
- Return type:
- async remove_records(rec_or_class=None, hard_delete=True)[source]
Remove records using collection method for collection.
Give record as rec_or_class parameter to remove a single record. Give record class as rec_or_class parameter to remove all records in collection. Leave rec_or_class None, to remove all records from all collections. Set hard_delete to False to use logical deletions instead of physical ones.
- Parameters:
rec_or_class – Document Store record instance or class.
hard_delete (bool) – False to use logical deletions, True (default) to use physical.
- upsert_run(lookup_paths, remove_absent=False)[source]
Upsert run with batch or records.
Run upsert_paths() in event loop. If remove_absent is True also run remove_absent_records() in event loop. When remove_absent is True, the processor will synchronize records from lookup_paths to Document Store.
- remove_run(rec_or_class=None, hard_delete=True)[source]
Remove run removes records from Document Store.
Run remove_records() in event loop. This method passes all parameters to remove_records(). See
remove_records()for parameter descriptions.- Parameters:
rec_or_class – Document Store record or class.
hard_delete (bool) – True to physically delete records. False to use logical deletions.
impl.py
Implementations for client collection methods.
Define CollectionMethods subclasses for supported collections. Subclass BatchProcessor to support StudyGroups gathered from multiple source files.
- class kuha_client.impl.StudyMethods(cache)[source]
Define StudyMethods
- Parameters:
cache – Initialized cache implementation.
- async query_record(record)[source]
Query Study record.
- :param
kuha_common.document_store.records.Studyrecord: Study record to query for.
- Returns:
Study found from Document Store or None.
- Return type:
- :param
- async query_distinct_ids()[source]
Get distinct ids from Study collection.
The query filters out logically deleted Study ids.
- Returns:
Distinct ids from Study collection
- Return type:
- async remove_record_by_id(_id)[source]
Remove Study and relatives by Study id
Will remove Study and its relative records. Also removes the reference to this study from relative StudyGroups.
- Parameters:
_id (str) – Id of the record to remove
- Raises:
ValueErrorif Study delete request returns an unexpected response.
- class kuha_client.impl.VariableMethods(cache)[source]
Define VariableMethods
- async query_record(record)[source]
Query Variable record.
- :param
kuha_common.document_store.records.Variablerecord: Variable record to query for.
- Returns:
Variable found from Document Store or None.
- Return type:
- :param
- class kuha_client.impl.QuestionMethods(cache)[source]
Define QuestionMethods
- async query_record(record)[source]
Query Question record.
- :param
kuha_common.document_store.records.Questionrecord: Question record to query for.
- Returns:
Question found from Document Store.
- Return type:
- :param
- class kuha_client.impl.StudyGroupMethods(*args, **kwargs)[source]
Define StudyGroup methods
Keeps track of found study groups and does not issue requests to Document Store right away. Implements method
really_upsert()which actually performs the requests to Document Store.This implementation must be used with a compatible BatchProcessor implementation that understands the behaviour. Mainly the upsert_paths() method must call
really_upsert()after all files in batch have been processed. SeeStudyGroupsBatchProcessorfor compatible BatchProcessor implementation details.- async query_record(record)[source]
Query StudyGroup record.
- :param
kuha_common.document_store.records.StudyGrouprecord: StudyGroup record to query for.
- Returns:
StudyGroup found from Document Store or None.
- Return type:
- :param
- async query_distinct_ids()[source]
Query distinct StudyGroup ids.
Filters out all logically deleted StudyGroups.
- Returns:
Distinct StudyGroup ids.
- Return type:
- async really_upsert()[source]
Special method to actually perform requests to Document Store.
Iterates thought all pending study groups and calls upsert(study_group) for each. If cache implements a filecache calls add_id(‘study_groups’, obj_id) for each file that has a reference to the study_group.
- async upsert(record)[source]
Override to handle special case for StudyGroups
StudyGroups are are gathered from all sourcefiles and should be upserted at the end of the run.
- :param
kuha_common.document_store.records.StudyGrouprecord: StudyGroup record.
- :param
- kuha_client.impl.collection_methods(collections=None)[source]
Get implemented collection methods by collection names
- class kuha_client.impl.StudyGroupsBatchProcessor(collections_methods, parsers=None, cache=None, fail_on_parse=True)[source]
Subclass BatchProcessor to handle StudyGroups.
Define a BatchProcessor implementation which understands the processing of StudyGroups in
StudyGroupMethods
sync.py
Define command line interface for syncronizing files to Document Store.
Example run from command line. Sync folder xml_files to Document Store:
python -m kuha_client.sync --document-store-url=http://localhost:6001/v0 --file-cache cache.pickle xml_files
Print help:
python -m kuha_client.sync -h
- kuha_client.sync.get_proc(collection_methods=None, parsers=None, **kw)[source]
Convenience function to instantiate a StudyGroupsBatchProcessor with default parameters.
Additional keyword arguments are passed to StudyGroupsBatchProcessor.
- Parameters:
collection_methods (list or None) – Collection methods parameter for StudyGroupsBatchProcessor. Defaults to None, which implies the use of all implemented collection methods from impl.py.
parsers (list or None) – Parsers parameter for StudyGroupsBatchProcessor. Default to None, which implies to use all implemented Record Parsers from
kuha_common.document_store.mappings.ddi.
- Returns:
StudyGroupsBatchProcessor instance.
- Return type:
StudyGroupsBatchProcessor
kuha_delete.py
Callable module serves as entry poin to delete records from DocumentStore.
Example run from command line. Delete study with ID:
python -m kuha_client.kuha_delete --document-store-url=http://localhost:6001/v0 studies 5afa741d6fb71d7b2d333982
Print help:
python -m kuha_client.kuha_delete -h