#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Perform query operations against Kuha Document Store.
Offers High-level query methods to facilitate an easy access point
with all necessary actions and properties needed to perform queries.
To query the Document Store the caller only needs to use methods
defined in class QueryController and records defined in
:mod:`kuha_common.document_store.records`
"""
from inspect import iscoroutinefunction
from .document_store.client import JSONStreamClient
from .document_store.query import (
Query,
QueryException,
FilterKeyConstants
)
[docs]class ResultHandler:
"""Class which handles results and correct calls to
callbacks, if any. Stores the result for later use.
Dynamically creates callable method :meth:`handle` which
receives result payload and calls ``on_record`` correctly.
:param record_constructor: Class to construct Document Store record.
:type record_constructor: :class:`kuha_common.document_store.records.Study` or
:class:`kuha_common.document_store.records.Variable` or
:class:`kuha_common.document_store.records.Question` or
:class:`kuha_common.document_store.records.StudyGroup`
:param on_record: Callback called with constructed record instance as parameter.
:type on_record: function or coroutinefunction or None
:return: :obj:`ResultHandler`
"""
def __init__(self, record_constructor, on_record=None):
self.result = None
self.on_record_is_coroutine = False
self.record_constructor = record_constructor
self.on_record = on_record
if self.on_record:
self.on_record_is_coroutine = iscoroutinefunction(self.on_record)
if self.on_record_is_coroutine:
self.handle = self._async_handle_payload
else:
self.handle = self._handle_payload
def _construct_record(self, payload):
if payload == {}:
return None
return self.record_constructor(payload)
def _handle_payload(self, payload):
record = self._construct_record(payload)
if not self.on_record:
self.result = record
return self.result
self.result = self.on_record(record)
return None
async def _async_handle_payload(self, payload):
record = self._construct_record(payload)
self.result = await self.on_record(record)
[docs]class QueryController:
"""Asynchronous controller to query the Document Store.
Use to build queries and automatically fetch
responses using HTTP as a protocol and JSON as
exchange format.
Optional record_limit parameter may be given at
initialization time to limit the number of records
that are requested in a single HTTP request.
:param headers: Optional headers parameter to store
headers that are used in each query
application wide as HTTP headers.
:type headers: dict
:param record_limit: Optional record_limit parameter
which is used to limit the number
of records requested in a single
HTTP request.
:type record_limit: int
Example::
from kuha_common.document_store import Study
query_ctrl = QueryController()
study = await query_ctrl.query_single(
Study,
fields=[Study._metadata, Study.study_number],
_filter={Study.study_number: 1234}
)
"""
fk_constants = FilterKeyConstants
def __init__(self, headers=None, record_limit=0):
self.client = JSONStreamClient
self.stream = self.client()
self.headers = headers or {}
if not isinstance(self.headers, dict):
raise TypeError("headers must be a dict or None")
if not isinstance(record_limit, int) or record_limit < 0:
raise TypeError("record_limit must be a positive int")
self.record_limit = record_limit
def _new_client(self):
return self.client()
def _build_headers(self, headers=None):
_headers = self.headers.copy()
if headers:
_headers.update(headers)
return _headers
[docs] async def query_single(self, record, on_record=None, headers=None, **kwargs):
r"""Query single record.
:param record: class used to construct the record instance.
:type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase`
:param on_record: Optional callback function that gets passed the returned and
instantiated record object.
:type on_record: function or coroutinefunction
:param headers: Optional headers for this query. Headers get added to headers given
for QueryController at initialization time. Note that it will overwrite
headers with same key.
:type headers: dict
:param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed
to :meth:`kuha_common.document_store.query.Query.construct`
:returns: None if passed on_record callback, else returns the initiated record object.
:raises QueryException: if query parameters given as keyword arguments contain
limit-parameter.
"""
if Query.k_limit in kwargs:
raise QueryException('Cannot use limit-parameter for single record -query')
_query = Query(
Query.construct(**kwargs),
record.collection
)
_query.set_limit(1)
result_handler = ResultHandler(record, on_record)
client = self._new_client()
await client.fetch(
result_handler.handle,
_query.get_endpoint(),
_query.get_query(),
headers=self._build_headers(headers)
)
return result_handler.result
[docs] async def query_multiple(self, record, on_record, headers=None, **kwargs):
r"""Query multiple records.
Queries the document store for multiple records. Behaviour depends on
whether :attr:`.record_limit` has been set:
**If there is a record_limit**
- the queries are split in multiple HTTP requests and queued.
- This method returns a :meth:`kuha_common.document_store.client.JSONStreamClient.run_queued_requests`,
which is to be called without arguments when the queries are to be executed.
- on_record must be a normal function that takes the constructed record instance
as parameter.
**If there is no record_limit**
- this method returns nothing.
- The ``on_record`` callback gets called with each instantiated record object.
- ``on_record`` may be a normal function or a coroutine.
:param record: class used to construct the record instance.
:type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase`
:param on_record: callback that gets called with each instantiated record instance.
:type on_record: function or coroutine
:param headers: optional headers used for this query.
:type headers: dict
:param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed
to :meth:`kuha_common.document_store.query.Query.construct`
:returns: None if no :attr:`record_limit`, else
:meth:`kuha_common.document_store.client.JSONStreamClient.run_queued_requests`
"""
if self.record_limit:
return await self._query_multiple_with_limit(record, on_record, headers, **kwargs)
await self._query_multiple(record, on_record, headers, **kwargs)
async def _query_multiple(self, record, on_record, headers=None, **kwargs):
_query = Query(
Query.construct(**kwargs),
record.collection
)
result_handler = ResultHandler(record, on_record)
client = self._new_client()
await client.fetch(
result_handler.handle,
_query.get_endpoint(),
_query.get_query(),
headers=self._build_headers(headers)
)
return result_handler.result
async def _query_multiple_with_limit(self, record, on_record, headers=None, **kwargs):
if iscoroutinefunction(on_record):
raise NotImplementedError(
'Cannot use coroutine as on_record callback '
'when querying with record_limit.'
)
_query = Query(
Query.construct(**kwargs),
record.collection
)
count = {}
_query.set_query_type(_query.query_type_count)
headers = self._build_headers(headers)
# Get overall count, regardless of skip and limit parameters.
await self.stream.fetch(
count.update,
_query.get_endpoint(),
_query.get_query(strip_invalid_params=True),
headers=headers
)
# Format skip.
skip = _query.get_skip() or 0
# Number of records that will be fetched.
# count - skip
count = count['count'] - skip
if count <= 0:
# Zero records will be fetched.
return self.stream.run_queued_requests
# Get limit. Must be less or equal to count.
limit = self.record_limit if self.record_limit < count else count
# Get limit of query if any (set by the caller).
_query_limit = _query.get_limit()
if _query_limit:
_query_limit = abs(_query_limit) # Ensure positive int
# query-limit is no-op if it is larger than number of records that will be fetched.
if _query_limit < count:
# count = query-limit if query limit is less than number of records that would
# otherwise be fetched
count = _query_limit
if _query_limit < limit:
# query_limit may also be smaller than self.record_limit
limit = _query_limit
# Make the actual queries
url = _query.get_endpoint()
result_handler = ResultHandler(record, on_record)
_query.set_query_type(_query.query_type_select).set_limit(limit)
for _ in range(int(count/limit)):
_query.set_skip(skip)
await self.stream.queue_request(
result_handler.handle,
url,
_query.get_query(),
headers=headers
)
# After query the new skip = skip + limit
skip += limit
remains = count % limit
if remains:
# Last query with limit = remains
_query.set_skip(skip).set_limit(remains)
await self.stream.queue_request(
result_handler.handle,
url,
_query.get_query(),
headers=headers)
return self.stream.run_queued_requests
[docs] async def query_count(self, record, headers=None, **kwargs):
r"""Query the number of records.
:param record: class used to construct the record instance.
:type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase`
:param headers: optional headers used for this query.
:type headers: dict
:param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed
to :meth:`kuha_common.document_store.query.Query.construct`
:returns: Number of records
:rtype: int
"""
_query = Query(
Query.construct(**kwargs),
record.collection,
Query.query_type_count
)
result = {}
await self._new_client().fetch(
result.update,
_query.get_endpoint(),
_query.get_query(),
headers=self._build_headers(headers)
)
return result['count']
[docs] async def query_distinct(self, record, headers=None, **kwargs):
r"""Query distinct values.
:param record: record to query for.
:type record: Subclass of :py:class:`kuha_common.document_store.records.RecordBase`
:param headers: optional headers used for this query.
:type headers: dict
:param \*\*kwargs: Keyword arguments contain parameters for the query. They get passed
to :meth:`kuha_common.document_store.query.Query.construct_distinct`
:returns: distinct values: {fieldname : [value1, value2, value3]}. Note that contained values
may be dictionaries or strings, depending on what is stored in requested field.
:rtype: dict
"""
_query = Query(
Query.construct_distinct(**kwargs),
record.collection,
Query.query_type_distinct
)
result = {}
await self._new_client().fetch(
result.update,
_query.get_endpoint(),
_query.get_query(),
headers=self._build_headers(headers)
)
return result