Source code for kuha_client.impl

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2022 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Implementations for client collection methods.

Define CollectionMethods subclasses for supported collections.
Subclass BatchProcessor to support StudyGroups gathered from multiple
source files.
"""
import logging
from kuha_common.query import QueryController
from kuha_common.document_store import (
    Study,
    Variable,
    Question,
    StudyGroup
)
from kuha_common.document_store.constants import REC_STATUS_DELETED
from kuha_client import (
    BatchProcessor,
    CollectionMethods,
    NoSuchCollectionMethod,
    send_delete_record_request,
    send_update_record_request,
)


_logger = logging.getLogger(__name__)


[docs]class StudyMethods(CollectionMethods): """Define StudyMethods :param cache: Initialized cache implementation. """ collection = Study.get_collection() def __init__(self, cache): super().__init__(cache) # Study has relative collections for rec in (Variable, Question, StudyGroup): self._cache.register_collection(rec.get_collection())
[docs] async def query_record(self, record): """Query Study record. :param :obj:`kuha_common.document_store.records.Study` record: Study record to query for. :returns: Study found from Document Store or None. :rtype: :obj:`kuha_common.document_store.records.Study` or None """ return await QueryController().query_single( Study, _filter={Study.study_number: record.study_number.get_value()})
[docs] async def query_distinct_ids(self): """Get distinct ids from Study collection. The query filters out logically deleted Study ids. :returns: Distinct ids from Study collection :rtype: set """ ids = await QueryController().query_distinct( Study, fieldname=Study._id, _filter={Study._metadata.attr_status: {QueryController.fk_constants.not_equal: REC_STATUS_DELETED}}) return set(ids[Study._id.path])
async def _remove_relative(self, relative_record): coll = relative_record.get_collection() _id = relative_record.get_id() await send_delete_record_request(coll, _id) await self._cache.result(coll, _id, self._cache.DELETE)
[docs] async def remove_record_by_id(self, _id): """Remove Study and relatives by Study id Will remove Study and its relative records. Also removes the reference to this study from relative StudyGroups. :param str _id: Id of the record to remove :raises: :exc:`ValueError` if Study delete request returns an unexpected response. """ study = await QueryController().query_single( Study, _filter={Study._id: _id}, fields=Study.study_number) study_number = study.study_number.get_value() response = await send_delete_record_request(Study.get_collection(), record_id=_id) if response['result'] != 'delete_successful': # Raise here to get traceback. raise ValueError("Unexpected response for delete: '%s'" % (response,)) await self._cache.result(Study.get_collection(), _id, self._cache.DELETE) await QueryController().query_multiple( Variable, self._remove_relative, _filter={Variable.study_number: study_number}, fields=Variable._id) await QueryController().query_multiple( Question, self._remove_relative, _filter={Question.study_number: study_number}, fields=Question._id) async def _remove_study_number_from_study_group(study_group): """Remove study number from StudyGroup.study_numbers. Updates a study group record by removing 'study_number' from its list of study numbers. :param :obj:`StudyGroup` study_group: StudyGroup instance to update """ _id = study_group.get_id() _dict = study_group.export_dict(include_id=False, include_metadata=False) vals = StudyGroup.study_numbers.name.value_from_dict(_dict) vals.remove(study_number) _dict[StudyGroup.study_numbers.path] = vals _logger.debug("Removing study number '%s' from study group '%s'", study_number, _id) await send_update_record_request(StudyGroup.get_collection(), _dict, _id) await self._cache.result(StudyGroup.get_collection(), _id, self._cache.UPDATE) await QueryController().query_multiple( StudyGroup, _remove_study_number_from_study_group, _filter={StudyGroup.study_numbers: study_number} )
[docs]class VariableMethods(CollectionMethods): """Define VariableMethods""" collection = Variable.get_collection()
[docs] async def query_record(self, record): """Query Variable record. :param :obj:`kuha_common.document_store.records.Variable` record: Variable record to query for. :returns: Variable found from Document Store or None. :rtype: :obj:`kuha_common.document_store.records.Variable` or None """ return await QueryController().query_single( Variable, _filter={Variable.study_number: record.study_number.get_value(), Variable.variable_name: record.variable_name.get_value()})
[docs] async def query_distinct_ids(self): """Query for distinct Variable ids The query filters out logically deleted Variables. :returns: Distinct ids from Variable collection. :rtype: set """ ids = await QueryController().query_distinct( Variable, fieldname=Variable._id, _filter={Variable._metadata.attr_status: {QueryController.fk_constants.not_equal: REC_STATUS_DELETED}}) return set(ids[Variable._id.path])
[docs]class QuestionMethods(CollectionMethods): """Define QuestionMethods""" collection = Question.get_collection()
[docs] async def query_record(self, record): """Query Question record. :param :obj:`kuha_common.document_store.records.Question` record: Question record to query for. :returns: Question found from Document Store. :rtype: :obj:`kuha_common.document_store.records.Question` """ return await QueryController().query_single( Question, _filter={Question.study_number: record.study_number.get_value(), Question.question_identifier: record.question_identifier.get_value()})
[docs] async def query_distinct_ids(self): """Query distinct ids from Question collection. The query filters out logically deleted questions. :returns: Distinct ids from Questions collection. :rtype: set """ ids = await QueryController().query_distinct( Question, fieldname=Question._id, _filter={Question._metadata.attr_status: {QueryController.fk_constants.not_equal: REC_STATUS_DELETED}}) return set(ids[Question._id.path])
[docs]class StudyGroupMethods(CollectionMethods): """Define StudyGroup methods Keeps track of found study groups and does not issue requests to Document Store right away. Implements method :meth:`really_upsert` which actually performs the requests to Document Store. This implementation must be used with a compatible BatchProcessor implementation that understands the behaviour. Mainly the upsert_paths() method must call :meth:`really_upsert` after all files in batch have been processed. See :class:`StudyGroupsBatchProcessor` for compatible BatchProcessor implementation details. """ collection = StudyGroup.get_collection() def __init__(self, *args, **kwargs): # Pending study groups: {<id>: <record>} self._pending_study_groups = {} # Pending files with studygroup info: {<id>: [<sourcefile>]} self._pending_study_groups_files = {} super().__init__(*args, **kwargs) def _has_filecache(self): return hasattr(self._cache, 'current_file')
[docs] async def query_record(self, record): """Query StudyGroup record. :param :obj:`kuha_common.document_store.records.StudyGroup` record: StudyGroup record to query for. :returns: StudyGroup found from Document Store or None. :rtype: :obj:`kuha_common.document_store.records.StudyGroup` or None """ return await QueryController().query_single(StudyGroup, _filter={ StudyGroup.study_group_identifier: record.study_group_identifier.get_value()})
[docs] async def query_distinct_ids(self): """Query distinct StudyGroup ids. Filters out all logically deleted StudyGroups. :returns: Distinct StudyGroup ids. :rtype: set """ ids = await QueryController().query_distinct( StudyGroup, fieldname=StudyGroup._id, _filter={StudyGroup._metadata.attr_status: {QueryController.fk_constants.not_equal: REC_STATUS_DELETED}}) return set(ids[StudyGroup._id.path])
[docs] async def really_upsert(self): """Special method to actually perform requests to Document Store. Iterates thought all pending study groups and calls upsert(study_group) for each. If cache implements a filecache calls add_id('study_groups', obj_id) for each file that has a reference to the study_group. """ for sg_id, study_group in self._pending_study_groups.items(): obj_id = await super().upsert(study_group) if not self._has_filecache(): continue for _file in self._pending_study_groups_files.pop(sg_id): _file.add_id(self.collection, obj_id)
[docs] async def upsert(self, record): """Override to handle special case for StudyGroups StudyGroups are are gathered from all sourcefiles and should be upserted at the end of the run. :param :obj:`kuha_common.document_store.records.StudyGroup` record: StudyGroup record. """ sg_id = record.study_group_identifier.get_value() if sg_id not in self._pending_study_groups: self._pending_study_groups.update({sg_id: record}) else: self._pending_study_groups[sg_id].updates(record) if self._has_filecache(): if sg_id in self._pending_study_groups_files: self._pending_study_groups_files[sg_id].append(self._cache.current_file) else: self._pending_study_groups_files.update({sg_id: [self._cache.current_file]})
[docs] async def remove_records_with_no_study_numbers(self): """Remove StudyGroup records that do not have any references to Studies Filters out StudyGroups that have been logically deleted. """ ids_with_no_study_numbers = await QueryController().query_distinct( StudyGroup, fieldname=StudyGroup._id, _filter={QueryController.fk_constants.and_: [ {StudyGroup.study_numbers: []}, {StudyGroup._metadata.attr_status: {QueryController.fk_constants.not_equal: REC_STATUS_DELETED}} ]}) for _id in ids_with_no_study_numbers[StudyGroup._id.path]: await self.remove_record_by_id(_id)
COLLECTIONS_METHODS = {m.collection: m for m in (StudyMethods, VariableMethods, QuestionMethods, StudyGroupMethods)}
[docs]def collection_methods(collections=None): """Get implemented collection methods by collection names :param list or None collections: Returns collection methods for these collections. Leave None to return all collection methods. :returns: Implemented collection methods :rtype: list """ if collections is None: return list(COLLECTIONS_METHODS.values()) rval = [] for coll in collections: rval.append(COLLECTIONS_METHODS[coll]) return rval
[docs]class StudyGroupsBatchProcessor(BatchProcessor): """Subclass BatchProcessor to handle StudyGroups. Define a BatchProcessor implementation which understands the processing of StudyGroups in :class:`StudyGroupMethods` """
[docs] async def upsert_paths(self, *paths): """Upsert records from paths. Calls parent upsert_paths() first and calls StudyGroupMethods.really_upsert() after, to make sure the StudyGroups are actually updated and inserted. """ await super().upsert_paths(*paths) try: sgmethods = self._methods_for_collection(StudyGroup.get_collection()) except NoSuchCollectionMethod: _logger.warning("Could not find methods for StudyGroup. Will not upsert StudyGroups.") else: await sgmethods.really_upsert()
[docs] async def remove_absent_records(self): """Remove records that were not found in this batch. Removes StudyGroups that do not contain any references to studies. :returns: False if no filepaths were processed (see parent method). Otherwise True. """ if await super().remove_absent_records() is False: # About this condition: # # This is actually not necessary for the sake of correctness: no records were # processed and no absent records were removed. Continuing the process with # StydyGroups will not lead to a corrupted database. # This is done for contract: If no files were processed in this batch, # this process will not remove any records. return False try: sgmethods = self._methods_for_collection(StudyGroup.get_collection()) except NoSuchCollectionMethod: _logger.warning("Could not find methods for StudyGroup. Will not remove absent StudyGroups.") else: await sgmethods.remove_records_with_no_study_numbers() return True