Source code for kuha_document_store.handlers

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Define handlers for responding to HTTP-requests.
"""
import logging

from json.decoder import JSONDecodeError

from kuha_common import server
from kuha_common.document_store.query import Query


[docs]class BaseHandler(server.RequestHandler): """BaseHandler to derive from. Provides common methods for subclasses. :note: use from a subclass """
[docs] async def prepare(self): """Prepare for each request. Set output content type. """ await super().prepare() self.set_output_content_type(self.CONTENT_TYPE_JSON)
[docs] def get_db(self): """Get database object stored in settings. :return: database object. :rtype: :obj:`kuha_document_store.database.DocumentStoreDatabase` """ return self.application.settings['db']
[docs] def assert_body_not_empty(self, msg=None): """Assert that request body contains data. :exc:`kuha_common.server.BadRequest` is raised if body is empty. :param msg: Optional message for exception. :type msg: str :raises: :exc:`kuha_common.server.BadRequest` if body is empty. """ if not self.request.body: raise server.BadRequest(msg)
async def _write_and_flush(self, data, error=None): if error: logging.error(error) return # Data will be a valid json document or python dict self.write(data) await self.flush()
[docs]class RestApiHandler(BaseHandler): """Handle requests to REST api. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._error = None self._affected_resource = None self._result = None def _respond(self): self.finish({ 'error': self._error, 'affected_resource': self._affected_resource, 'result': self._result })
[docs] async def get(self, collection, resource_id=None): """HTTP-GET to REST api endpoint. Respond with single record or multiple records, depending on whether ``resource_id`` is requested. :note: Results will be streamed. :param collection: type of the requested collection. :type collection: str :param resource_id: optional ID of the requested resource. If left out of request, will return all records of requested type. :type resource_id: str or None :raises: :exc:`kuha_common.server.BadRequest` if there are recoverable errors in database operation. The error message is passed to BadRequest. See: :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` :raises: :exc:`kuha_common.server.ResourceNotFound` if requested resource_id does not return results. """ db = self.get_db() if resource_id: try: await db.query_by_oid(collection, resource_id, self._write_and_flush, not_found_exception=server.ResourceNotFound) except db.recoverable_errors as exc: raise server.BadRequest(str(exc)) else: await db.query_multiple(collection, {}, self._write_and_flush) self.finish()
[docs] async def post(self, collection, resource_id=None): """HTTP-POST to REST api endpoint. Create new resource from data submitted in request body. :param collection: collection type to create. :type collection: str :param resource_id: receives resource_id for completeness in handler configuration. It is however a :exc:`kuha_common.server.BadRequest` if one is submitted. :type resource_id: str or None :raises: :exc:`kuha_common.server.BadRequest` if request contains resource_id or if database operations raise recoverable errors. See: :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` """ if resource_id: raise server.BadRequest( "POST only supports request with no resource id" ) self.assert_request_content_type(self.CONTENT_TYPE_JSON) self.assert_body_not_empty() db = self.get_db() try: db_result = await db.insert_json(collection, self.request.body) except db.recoverable_errors as exc: # Will catch exceptions from invalid json format and database validation & duplicate key. raise server.BadRequest(str(exc)) self.set_status(201) self._result = 'insert_successful' self._affected_resource = str(db_result.inserted_id) self._respond()
[docs] async def put(self, collection, resource_id=None): """HTTP-PUT to REST api endpoint. Replace existing resource with data in request body. :param collection: collection type to replace. :type collection: str :param resource_id: resource ID to replace. Optional for completeness in handler configuration. It is however a :exc:`kuha_common.server.BadRequest` if not submitted. :type resource_id: str or None :raises: :exc:`kuha_common.server.BadRequest` if requested endpoint does not contain resource_id or if database operation raises one of :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` :raises: :exc:`kuha_common.server.ResourceNotFound` if resource_id returns no results. """ if resource_id is None: raise server.BadRequest( "PUT only supports request with resource id" ) self.assert_request_content_type(self.CONTENT_TYPE_JSON) self.assert_body_not_empty() db = self.get_db() try: db_result = await db.replace_json(collection, resource_id, self.request.body, server.ResourceNotFound) except db.recoverable_errors as exc: raise server.BadRequest(str(exc)) self._result = 'replace_successful' self._affected_resource = str(db_result.upserted_id) if db_result.upserted_id else resource_id self._respond()
[docs] async def delete(self, collection, resource_id=None): """HTTP-DELETE to REST api endpoint. Delete resource or all resources of certain type. :param str collection: type of collection :param str or None resource_id: resource ID to delete. :raises: :exc:`kuha_common.server.BadRequest` if database operation raises one of :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` :raises: :exc:`kuha_common.server.ResourceNotFound` if resoure_id returns no results. """ types_hard_deletes = {'soft': False, 'hard': True} query_args = self.get_valid_query_arguments(('delete_type', 1)) requested_type = query_args.get('delete_type', 'soft') hard_delete = types_hard_deletes.get(requested_type) if hard_delete is None: raise server.BadRequest( "Invalid delete type '%s'. Endpoint supports %s" % (requested_type, ', '.join("\'%s\'" % (type_,) for type_ in types_hard_deletes))) db = self.get_db() try: deleted_count = await db.delete_records(collection, oid=resource_id, hard_delete=hard_delete) except db.recoverable_errors as exc: raise server.BadRequest(str(exc)) if resource_id is not None and deleted_count == 0: if hard_delete is False and await db.count(collection, await db.oid_filter(resource_id)): raise server.Conflict("Resource is logically deleted") raise server.ResourceNotFound(context=resource_id) self._affected_resource = resource_id or 'affected %s records' % (deleted_count,) self._result = 'delete_successful' self._respond()
[docs]class QueryHandler(BaseHandler): """Handle request to query endpoint. :note: Results will be streamed. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._query = None self._query_type = None
[docs] async def prepare(self): """Prepare for each request. Request content type must be JSON. Request body must not be empty. Requested query type must be supported and query must have valid parameters. """ await super().prepare() self.assert_request_content_type(self.CONTENT_TYPE_JSON) self.assert_body_not_empty("Give query-object in message body") self._query_type = await self._get_query_type() self._query = await self._get_query()
async def _get_query_type(self): query_type = self.get_query_argument( Query.k_query_type, Query.query_type_select ) if not Query.is_valid_query_type(query_type): raise server.BadRequest("Invalid query type: {}. Supported query types: {}.". format(query_type, ', '.join(Query.supported_query_types))) return query_type async def _get_query(self): query = self.request.body # Decode to dict try: query = self.get_db().json_decode(query) except JSONDecodeError as exc: raise server.BadRequest(str(exc)) if not Query.is_valid_query(query, self._query_type): supported = Query.get_valid_params(self._query_type) raise server.BadRequest("Invalid query parameters: {}. Supported parameters: {}". format(query, ', '.join(supported))) return query
[docs] async def post(self, collection): """HTTP-POST to query endpoint. Streams the results one JSON document at a time. Thus, the result of a response for multiple records will not a a valid JSON document. :note: Body must be a JSON object. :param collection: collection (resource type) to query. :type collection: str """ logging.debug("Begin query %s %s %s", self._query_type, collection, self._query) _filter = self._query.get(Query.k_filter) if self._query_type == Query.query_type_select: _fields = self._query.get(Query.k_fields) _skip = self._query.get(Query.k_skip, 0) _limit = self._query.get(Query.k_limit, 0) _sort_by = self._query.get(Query.k_sort_by) _sort_order = self._query.get(Query.k_sort_order) await self.get_db().query_multiple(collection, _filter, self._write_and_flush, fields=_fields, skip=_skip, limit=_limit, sort_by=_sort_by, sort_order=_sort_order) elif self._query_type == Query.query_type_count: result = await self.get_db().count(collection, _filter) await self._write_and_flush({'count': result}) elif self._query_type == Query.query_type_distinct: fieldname = self._query[Query.k_fieldname] result = await self.get_db().query_distinct(collection, fieldname, _filter) await self._write_and_flush({fieldname: result}) logging.debug("Finished query %s %s %s", self._query_type, collection, self._query) self.finish()