Source code for kuha_common.query

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Perform query operations against Kuha Document Store.

Offers High-level query methods to facilitate an easy access point
with all necessary actions and properties needed to perform queries.
To query the Document Store the caller only needs to use methods
defined in class QueryController and records defined in
:mod:`kuha_common.document_store.records`
"""

from inspect import iscoroutinefunction

from .document_store.client import JSONStreamClient
from .document_store.query import (
    Query,
    QueryException,
    FilterKeyConstants
)


[docs]class ResultHandler: """Class which handles results and correct calls to callbacks, if any. Stores the result for later use. Dynamically creates callable method :meth:`handle` which receives result payload and calls ``on_record`` correctly. :param record_constructor: Class to construct Document Store record. :type record_constructor: :class:`kuha_common.document_store.records.Study` or :class:`kuha_common.document_store.records.Variable` or :class:`kuha_common.document_store.records.Question` or :class:`kuha_common.document_store.records.StudyGroup` :param on_record: Callback called with constructed record instance as parameter. :type on_record: function or coroutinefunction or None :return: :obj:`ResultHandler` """ def __init__(self, record_constructor, on_record=None): self.result = None self.on_record_is_coroutine = False self.record_constructor = record_constructor self.on_record = on_record if self.on_record: self.on_record_is_coroutine = iscoroutinefunction(self.on_record) if self.on_record_is_coroutine: self.handle = self._async_handle_payload else: self.handle = self._handle_payload def _construct_record(self, payload): if payload == {}: return None return self.record_constructor(payload) def _handle_payload(self, payload): record = self._construct_record(payload) if not self.on_record: self.result = record return self.result self.result = self.on_record(record) return None async def _async_handle_payload(self, payload): record = self._construct_record(payload) self.result = await self.on_record(record)
[docs]class QueryController: """Asynchronous controller to query the Document Store. Use to build queries and automatically fetch responses using HTTP as a protocol and JSON as exchange format. Optional record_limit parameter may be given at initialization time to limit the number of records that are requested in a single HTTP request. :param headers: Optional headers parameter to store headers that are used in each query application wide as HTTP headers. :type headers: dict :param record_limit: Optional record_limit parameter which is used to limit the number of records requested in a single HTTP request. :type record_limit: int Example:: from kuha_common.document_store import Study query_ctrl = QueryController() study = await query_ctrl.query_single( Study, fields=[Study._metadata, Study.study_number], _filter={Study.study_number: 1234} ) """ fk_constants = FilterKeyConstants def __init__(self, headers=None, record_limit=0): self.client = JSONStreamClient self.stream = self.client() self.headers = headers or {} if not isinstance(self.headers, dict): raise TypeError("headers must be a dict or None") if not isinstance(record_limit, int) or record_limit < 0: raise TypeError("record_limit must be a positive int") self.record_limit = record_limit def _new_client(self): return self.client() def _build_headers(self, headers=None): _headers = self.headers.copy() if headers: _headers.update(headers) return _headers
[docs] async def query_single(self, record, on_record=None, headers=None, **kwargs): r"""Query single record. :param record: class used to construct the record instance. :type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase` :param on_record: Optional callback function that gets passed the returned and instantiated record object. :type on_record: function or coroutinefunction :param headers: Optional headers for this query. Headers get added to headers given for QueryController at initialization time. Note that it will overwrite headers with same key. :type headers: dict :param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed to :meth:`kuha_common.document_store.query.Query.construct` :returns: None if passed on_record callback, else returns the initiated record object. :raises QueryException: if query parameters given as keyword arguments contain limit-parameter. """ if Query.k_limit in kwargs: raise QueryException('Cannot use limit-parameter for single record -query') _query = Query( Query.construct(**kwargs), record.collection ) _query.set_limit(1) result_handler = ResultHandler(record, on_record) client = self._new_client() await client.fetch( result_handler.handle, _query.get_endpoint(), _query.get_query(), headers=self._build_headers(headers) ) return result_handler.result
[docs] async def query_multiple(self, record, on_record, headers=None, **kwargs): r"""Query multiple records. Queries the document store for multiple records. Behaviour depends on whether :attr:`.record_limit` has been set: **If there is a record_limit** - the queries are split in multiple HTTP requests and queued. - This method returns a :meth:`kuha_common.document_store.client.JSONStreamClient.run_queued_requests`, which is to be called without arguments when the queries are to be executed. - on_record must be a normal function that takes the constructed record instance as parameter. **If there is no record_limit** - this method returns nothing. - The ``on_record`` callback gets called with each instantiated record object. - ``on_record`` may be a normal function or a coroutine. :param record: class used to construct the record instance. :type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase` :param on_record: callback that gets called with each instantiated record instance. :type on_record: function or coroutine :param headers: optional headers used for this query. :type headers: dict :param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed to :meth:`kuha_common.document_store.query.Query.construct` :returns: None if no :attr:`record_limit`, else :meth:`kuha_common.document_store.client.JSONStreamClient.run_queued_requests` """ if self.record_limit: return await self._query_multiple_with_limit(record, on_record, headers, **kwargs) await self._query_multiple(record, on_record, headers, **kwargs)
async def _query_multiple(self, record, on_record, headers=None, **kwargs): _query = Query( Query.construct(**kwargs), record.collection ) result_handler = ResultHandler(record, on_record) client = self._new_client() await client.fetch( result_handler.handle, _query.get_endpoint(), _query.get_query(), headers=self._build_headers(headers) ) return result_handler.result async def _query_multiple_with_limit(self, record, on_record, headers=None, **kwargs): if iscoroutinefunction(on_record): raise NotImplementedError( 'Cannot use coroutine as on_record callback ' 'when querying with record_limit.' ) _query = Query( Query.construct(**kwargs), record.collection ) count = {} _query.set_query_type(_query.query_type_count) headers = self._build_headers(headers) # Get overall count, regardless of skip and limit parameters. await self.stream.fetch( count.update, _query.get_endpoint(), _query.get_query(strip_invalid_params=True), headers=headers ) # Format skip. skip = _query.get_skip() or 0 # Number of records that will be fetched. # count - skip count = count['count'] - skip if count <= 0: # Zero records will be fetched. return self.stream.run_queued_requests # Get limit. Must be less or equal to count. limit = self.record_limit if self.record_limit < count else count # Get limit of query if any (set by the caller). _query_limit = _query.get_limit() if _query_limit: _query_limit = abs(_query_limit) # Ensure positive int # query-limit is no-op if it is larger than number of records that will be fetched. if _query_limit < count: # count = query-limit if query limit is less than number of records that would # otherwise be fetched count = _query_limit if _query_limit < limit: # query_limit may also be smaller than self.record_limit limit = _query_limit # Make the actual queries url = _query.get_endpoint() result_handler = ResultHandler(record, on_record) _query.set_query_type(_query.query_type_select).set_limit(limit) for _ in range(int(count/limit)): _query.set_skip(skip) await self.stream.queue_request( result_handler.handle, url, _query.get_query(), headers=headers ) # After query the new skip = skip + limit skip += limit remains = count % limit if remains: # Last query with limit = remains _query.set_skip(skip).set_limit(remains) await self.stream.queue_request( result_handler.handle, url, _query.get_query(), headers=headers) return self.stream.run_queued_requests
[docs] async def query_count(self, record, headers=None, **kwargs): r"""Query the number of records. :param record: class used to construct the record instance. :type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase` :param headers: optional headers used for this query. :type headers: dict :param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed to :meth:`kuha_common.document_store.query.Query.construct` :returns: Number of records :rtype: int """ _query = Query( Query.construct(**kwargs), record.collection, Query.query_type_count ) result = {} await self._new_client().fetch( result.update, _query.get_endpoint(), _query.get_query(), headers=self._build_headers(headers) ) return result['count']
[docs] async def query_distinct(self, record, headers=None, **kwargs): r"""Query distinct values. :param record: record to query for. :type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase` :param headers: optional headers used for this query. :type headers: dict :param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed to :meth:`kuha_common.document_store.query.Query.construct_distinct` :returns: distinct values: {fieldname : [value1, value2, value3]}. Note that contained values may be dictionaries or strings, depending on what is stored in requested field. :rtype: dict """ _query = Query( Query.construct_distinct(**kwargs), record.collection, Query.query_type_distinct ) result = {} await self._new_client().fetch( result.update, _query.get_endpoint(), _query.get_query(), headers=self._build_headers(headers) ) return result