Source code for kuha_common.document_store.query

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Access query properties by convenience methods
to help build valid queries against the Document Store.
"""

import datetime

from .field_types import FieldTypeFactory, FieldAttribute

from .records import (
    STUDIES,
    VARIABLES,
    QUESTIONS,
    STUDY_GROUPS
)

from .constants import (
    MDB_EXISTS,
    MDB_LESS_THAN,
    MDB_GREATER_THAN_OR_EQUAL,
    MDB_ISODATE,
    MDB_FIELDNAME_ID,
    MDB_OID
)


[docs]class FilterKeyConstants: """Class used as a namespace to contain constants used in query filter. """ from_ = MDB_GREATER_THAN_OR_EQUAL until = MDB_LESS_THAN exists = MDB_EXISTS isodate = MDB_ISODATE
[docs]class QueryException(Exception): """Exception for errors raised by :class:`Query`. Has an optional context parameter for giving some additional information about the context of the exception. """ def __init__(self, msg, context=None): super().__init__(msg) self.msg = msg self.context = context def __str__(self): if self.context: return "{}: {}".format(self.msg, self.context) return str(self.msg)
def _field_path(field): if isinstance(field, (FieldTypeFactory, FieldAttribute)): return field.path return field def _build_fields(fields): _fields = [] if isinstance(fields, list): for field in fields: _fields.append(_field_path(field)) else: _fields.append(_field_path(fields)) return _fields def _build_filter(_filter): _result = {} for field, condition in _filter.items(): path = field.path if hasattr(condition, 'get'): from_ = condition.get(FilterKeyConstants.from_) until = condition.get(FilterKeyConstants.until) exists = condition.get(FilterKeyConstants.exists) if (from_, until, exists) == (None, None, None): raise QueryException("Unknown filter condition", condition) if (from_, until) != (None, None): _result.update( {path: Query.build_query_for_date_range(from_, until)}) if exists is not None: _result.update({path: Query.build_query_for_exists(exists)}) else: if path == MDB_FIELDNAME_ID: condition = {MDB_OID: condition} _result.update({path: condition}) return _result def _validate_limit(limit): if not isinstance(limit, int): raise QueryException("Limit query parameter must be an integer", limit) return limit def _validate_skip(skip): if not isinstance(skip, int) or skip < 0: raise QueryException("Skip query parameter must be an integer, 0 or greater", skip) return skip def _validate_sort_order(sort_order): """Validate sort order. Sort order must be an integer -1 or 1. Because Python's booleans are also integers (True == 1) and floats compare with integers (1.0 == 1), they need to be compared explicitly. :param sort_order: sort order to validate. :type sort_order: int :returns: Sort order :rtype: int :raises: :exc:`QueryException` for invalid sort order. """ valid_values = (-1, 1) if sort_order not in valid_values or isinstance(sort_order, (float, bool)): raise QueryException("Sort order parameter must be one of %s" % (', '.join(str(x) for x in valid_values),), sort_order) return sort_order
[docs]class Query: """Manipulate query properties without compromising the validity of the constructed query. Build the correct url for different query types. :note: This class provides low-level operations. Use :class:`kuha_common.query.QueryController` for easy access to query actions and properties. Example:: from kuha_common.document_store import Study, Query query = Query(Query.construct(_filter={Study.study_number:'123'}), Study.collection) :param query: Actual query containing the properties such as _filter, fields, sort_by etc. :type query: dict :param query_document: One of the supported query documents declared in :attr:`Query.supported_query_documents` and specified in :mod:`kuha_common.document_store.records.py` :type query_document: str :param query_type: Optional query_type parameter. Defaults to :attr:`Query.query_type_select`. Other valid values are :attr:`Query.query_type_count` and :attr:`Query.query_type_distinct`. :type query_type: str """ base_url = None #: Query parameter for filtering results. k_filter = '_filter' #: Query parameter for fields to contain in results. k_fields = 'fields' #: Query parameters for limiting returned results. k_limit = 'limit' #: Query parameter for skipping number of results from the beginning of the resultset. k_skip = 'skip' #: Query parameter for sort order. k_sort_order = 'sort_order' #: Query parameter for sorting by a certain field. k_sort_by = 'sort_by' #: Query parameter for distinct queries. Specifies a field from which the distinct #: values are to be fetched. k_fieldname = 'fieldname' k_query_type = 'query_type' #: Query type for `select` queries. #: Using this query type gets records as a response. query_type_select = 'select' #: Query type for count queries. #: Using this query type the query returns an integer. query_type_count = 'count' #: Query type for distinct queries. #: Using this query type the returning object contains all #: distinct values for a certain field. query_type_distinct = 'distinct' params_basic = [ k_filter, k_fields, k_limit, k_skip, k_sort_order, k_sort_by ] params_count = [ k_filter ] params_distinct = [ k_fieldname, k_filter ] fmt_datetime = '%Y-%m-%dT%H:%M:%SZ' supported_query_types = [ query_type_select, query_type_count, query_type_distinct ] supported_query_documents = [ STUDIES, VARIABLES, QUESTIONS, STUDY_GROUPS ] def __init__(self, query, query_document, query_type=query_type_select): self.query_document = self.validate_query_document(query_document) self.query_type = self.validate_query_type(query_type) self.query = self.validate_query(query)
[docs] @classmethod def set_base_url(cls, base_url): """Configure document store url used as a base when constructing the endpoint url for queries. """ cls.base_url = base_url
[docs] @classmethod def as_supported_datetime_str(cls, datetime_obj): """Get datetime object as a supported datetime string. :param datetime_obj: Python datetime-object to be convered to str. :type datetime_obj: datetime-object. :returns: String represenation of the datetime-object that is suitable foe querying. :rtype: str """ return datetime.datetime.strftime(datetime_obj, cls.fmt_datetime)
[docs] @classmethod def construct(cls, **kwargs): r"""Construct valid query parameters. Example:: from kuha_common.document_store import Query, Study params = Query.construct(_filter={Study.study_number:'123'}, fields=[Study._metadata, Study._id, Study.abstract], sort_by=Study._id) query = Query(params, Study.collection) :param \*\*kwargs: keys should be valid query properties, while values should hold corresponding query values supported by the key. :returns: Valid query, ready to be sent to Document Store. :rtype: dict """ query_dict = {} for key, value in kwargs.items(): if key not in cls.params_basic: raise QueryException("Invalid query parameter", key) value_builder_fun = {cls.k_filter: _build_filter, cls.k_fields: _build_fields, cls.k_limit: _validate_limit, cls.k_skip: _validate_skip, cls.k_sort_order: _validate_sort_order, cls.k_sort_by: _field_path}.get(key) query_dict.update({key: value_builder_fun(value)}) return query_dict
[docs] @classmethod def construct_distinct(cls, **kwargs): r"""Construct valid query parameters for distinct queries. :param \*\*kwargs: keys should be valid query properties, while values should hold corresponding query values supported by the key. :returns: Valid query, ready to be sent to Document Store. :rtype: dict """ query_dict = {} for key, value in kwargs.items(): if key not in cls.params_distinct: raise QueryException("Invalid query parameter", key) value_builder_fun = {cls.k_filter: _build_filter, cls.k_fieldname: _field_path}.get(key) query_dict.update({key: value_builder_fun(value)}) return query_dict
[docs] @classmethod def build_query_for_date_range(cls, from_=None, until=None): """Build query filter for date-range. :param from_: start of the date-range: :type from_: datetime-object :param until: end of the date-range: :type until: datetime-object :returns: date-range query-filter with datetime-objects converted into string representation. :rtype: dict """ # from_ and until are datetime objects. date_query_dict = {} if from_ is not None: from_str = cls.as_supported_datetime_str(from_) date_query_dict.update({ FilterKeyConstants.from_: { FilterKeyConstants.isodate: from_str } }) if until is not None: until_str = cls.as_supported_datetime_str(until) date_query_dict.update({ FilterKeyConstants.until: { FilterKeyConstants.isodate: until_str } }) return date_query_dict
[docs] @classmethod def build_query_for_exists(cls, exists): """Build query for exists-query. :param exists: whether the field should exists or not. :type exists: bool :returns: valid exists query for filter. :rtype: dict :raises: ValueError for invalid boolean values in exists-parameter. """ if not isinstance(exists, bool): raise ValueError("Exists must be boolean: True or False") return {FilterKeyConstants.exists: exists}
[docs] @classmethod def get_valid_params(cls, query_type=None): """Return valid query parameters for the query type. :param query_type: Optional query_type for which the query-parameters should be valid for. :type query_type: str """ if query_type is None: query_type = cls.query_type_select return { cls.query_type_select: cls.params_basic, cls.query_type_count: cls.params_count, cls.query_type_distinct: cls.params_distinct }[query_type]
[docs] @classmethod def is_valid_query(cls, query, query_type): """Check the validity of query parameters. :param query: Full query to validate. :type query: dict :param query_type: Query type to validate against. :type query_type: str :returns: Whether or not the query-parameters given are valid. :rtype: bool """ valid_params = cls.get_valid_params(query_type) invalid_params = set(list(query.keys())).difference(set(valid_params)) return not bool(invalid_params)
[docs] @classmethod def is_valid_query_type(cls, query_type): """Check the validity of query_type. :param query_type: Query type to validate. :type query_type: str :returns: Whether or not the query type is valid. :rtype: bool """ return query_type in cls.supported_query_types
[docs] def is_valid_query_document(self, query_document): """Check the validity of query document. :param query_document: Query document to validate. :type query_document: str :returns: Whether or not the query document is valid. :rtype: bool """ return query_document in self.supported_query_documents
[docs] def is_valid_param(self, parameter): """Check the validity of a single query parameter. :param parameter: Query parameter to validate. :type parameter: str :returns: Whether or not the parameter is valid. :rtype: bool """ valid_params = self.get_valid_params() return parameter in valid_params
[docs] def validate_query(self, query): """Validate query parameters. Checks parameters' validity for chosen query type. Raises :exc:`QueryException` if invalid. :param query: Query parameters. :type query: dict :returns: Query parameters. :rtype: dict :raises: :exc:`QueryException` if query parameters are invalid. """ if not self.is_valid_query(query, self.query_type): raise QueryException("Invalid query", query) return query
[docs] def validate_query_type(self, query_type): """Validate query type. Checks that the query type is supported by Document Store. Raises :exc:`QueryException` for invalid query type. :param query_type: Query type to validate. :type query_type: str :returns: Query type. :rtype: str :raises: :exc:`QueryException` if query type is invalid. """ if not self.is_valid_query_type(query_type): raise QueryException("Invalid query_type", query_type) return query_type
[docs] def validate_query_document(self, query_document): """Validates query document. Checks that the query document is supported by Document Store. Raises :exc:`QueryException` if invalid. :param query_document: Query document to validate. :type query_document: str :returns: Query document :rtype: str :raises: :exc:`QueryException` if query document is invalid. """ if not self.is_valid_query_document(query_document): raise QueryException("Invalid query_document", query_document) return query_document
[docs] def get_endpoint(self): """Get correct endpoint for querying the Document Store. Builds the endpoint by consulting configured values and the instantiated query for query_type and query_document :returns: Full url to Document Store endpoint which handles the constructed query. :rtype: str """ if not self.base_url: raise ValueError("Base URL is not set. Cannot construct url to endpoint.") query_args_str = '?{}={}'.format(self.k_query_type, self.query_type) return "{base}/query/{document_type}{query_args}"\ .format(base=self.base_url, document_type=self.query_document, query_args=query_args_str)
[docs] def get_query(self, strip_invalid_params=True): """Returns the constructed query parameters. If the query type has been changed after initialization, for example to get the count of records, this method strips the invalid query parameters from the returned query. When doing so, it does not change the stored query parameters, but rather makes a copy of them for manipulating and returning. :param strip_invalid_params: Whether to strip the unsupported (=invalid) query parameters out of the returned query. :type strip_invalid_params: bool :returns: Constructed query parameters ready to submit to Document Store. :rtype: dict """ if strip_invalid_params: valid_params = self.get_valid_params(self.query_type) invalid_params = set(list(self.query.keys()))\ .difference(set(valid_params)) _query = self.query.copy() for invalid in invalid_params: del _query[invalid] return _query return self.query
[docs] def get_limit(self): """Get query limit parameter. :returns: Query limit (int) if set. None if not set. :rtype: int or None """ return self.query.get(self.k_limit)
[docs] def get_skip(self): """Get query skip parameter. :returns: Query skip (int) if set. None if not set. :rtype: int or None """ return self.query.get(self.k_skip)
[docs] def set_limit(self, limit): """Set limit parameter for query. Limit controls how many results should be returned. :param limit: Limit parameter for query. :type limit: int :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` """ self.query.update({self.k_limit: _validate_limit(limit)}) return self
[docs] def set_skip(self, skip): """Set skip parameter for query. Skip conrols how many results should be skipped from the start (offset). :param skip: Skip parameter for query. :type skip: int :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` """ self.query.update({self.k_skip: _validate_skip(skip)}) return self
[docs] def set_fields(self, fields): """Set fields parameter for query. Field controls which fields of the record should be returned. `fields` can be a list of strings in the form used by MongoDB or a list of :mod:`kuha_common.document_store.records` class-variables. Example:: from kuha_common.document_store import Query, Study _params = Query.construct(_filter={Study.study_number:'123'}) _query = Query(_params, Study.collection) _query.set_fields([Study.abstract, Study.study_number]) :param fields: Fields parameter for query. :type fields: list :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` """ self.query.update({self.k_fields: _build_fields(fields)}) return self
[docs] def set_sort_by(self, sort_by): """Set sort_by parameter for query. Determines sorting of the returned results. `sort_by` can be a string in the form used by MongoDB or a :mod:`kuha_common.document_store.records` class-variables. :param sort_by: Sort by parameter for query. :type sort_by: srt or class-variable of a record. :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` """ self.query.update({self.k_sort_by: _field_path(sort_by)}) return self
[docs] def set_sort_order(self, order): """Set sort order for the query. Determines the order which the returned results are to be sorted by. :note: Valid values come from pymongo. They actually depend on the mongodb driver, but since this is a caller API we don't want to make pymongo a dependency. :param order: Sort order. Must be either 1 or -1. :type order: int :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` :raises: :exc:`QueryException` for invalid order values. """ self.query.update({self.k_sort_order: _validate_sort_order(order)}) return self
[docs] def set_query_type(self, query_type): """Set query type. :param query_type: Valid query type for the query to be constructed. :type query_type: str :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` """ self.query_type = self.validate_query_type(query_type) return self
[docs] def add_query_statement(self, field, statement): """Add query statement. Manipulates the _filter parameter of the query parameters. Raises a :exc:`QueryException` if the field already has a statement declared in _filter. :param field: Field to target the statement to. :type field: str :param statement: Statement to filter the results by. :type statement: str :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` """ if field in self.query.get(self.k_filter, {}).keys(): raise QueryException("Field already has query statement") self.query[self.k_filter][field] = statement return self
[docs] def add_query_statements(self, **kwargs): r"""Add multiple query statements to filter the returned results. Manipulates the _filter parameter of the query parameters. :param \*\*kwargs: key-value pairs that are to be added to the _filter parameter. :returns: self for easy aggregation of manipulation methods. :rtype: instantiated :class:`Query()` """ for _k, _v in kwargs.items(): self.add_query_statement(_k, _v) return self