Source code for kuha_document_store.handlers

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Define handlers for responding to HTTP-requests.
"""
import logging

from json.decoder import JSONDecodeError

from kuha_common import server
from kuha_common.document_store.query import Query
from kuha_common.document_store.mappings.exceptions import MappingError


[docs]class BaseHandler(server.RequestHandler): """BaseHandler to derive from. Provides common methods for subclasses. :note: use from a subclass """
[docs] async def prepare(self): """Prepare for each request. Set output content type. """ await super().prepare() self.set_output_content_type(self.CONTENT_TYPE_JSON)
[docs] def get_db(self): """Get database object stored in settings. :return: database object. :rtype: :obj:`kuha_document_store.database.DocumentStoreDatabase` """ return self.application.settings['db']
[docs] def assert_body_not_empty(self, msg=None): """Assert that request body contains data. :exc:`kuha_common.server.BadRequest` is raised if body is empty. :param msg: Optional message for exception. :type msg: str :raises: :exc:`kuha_common.server.BadRequest` if body is empty. """ if not self.request.body: raise server.BadRequest(msg)
async def _write_and_flush(self, data, error=None): if error: logging.error(error) return # Data will be a valid json document or python dict self.write(data) await self.flush()
[docs]class RestApiHandler(BaseHandler): """Handle requests to REST api. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._error = None self._affected_resource = None self._result = None def _respond(self): self.finish({ 'error': self._error, 'affected_resource': self._affected_resource, 'result': self._result })
[docs] async def get(self, collection, resource_id=None): """HTTP-GET to REST api endpoint. Respond with single record or multiple records, depending on whether ``resource_id`` is requested. :note: Results will be streamed. :param collection: type of the requested collection. :type collection: str :param resource_id: optional ID of the requested resource. If left out of request, will return all records of requested type. :type resource_id: str or None :raises: :exc:`kuha_common.server.BadRequest` if there are recoverable errors in database operation. The error message is passed to BadRequest. See: :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` :raises: :exc:`kuha_common.server.ResourceNotFound` if requested resource_id does not return results. """ db = self.get_db() if resource_id: try: await db.query_by_oid(collection, resource_id, self._write_and_flush, not_found_exception=server.ResourceNotFound) except db.recoverable_errors as exc: raise server.BadRequest(str(exc)) else: await db.query_multiple(collection, {}, self._write_and_flush) self.finish()
[docs] async def post(self, collection, resource_id=None): """HTTP-POST to REST api endpoint. Create new resource from data submitted in request body. :param collection: collection type to create. :type collection: str :param resource_id: receives resource_id for completeness in handler configuration. It is however a :exc:`kuha_common.server.BadRequest` if one is submitted. :type resource_id: str or None :raises: :exc:`kuha_common.server.BadRequest` if request contains resource_id or if database operations raise recoverable errors. See: :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` """ if resource_id: raise server.BadRequest( "POST only supports request with no resource id" ) self.assert_request_content_type(self.CONTENT_TYPE_JSON) self.assert_body_not_empty() db = self.get_db() try: db_result = await db.insert_json(collection, self.request.body) except db.recoverable_errors as exc: # Will catch exceptions from invalid json format and database validation & duplicate key. raise server.BadRequest(msg=str(exc)) self.set_status(201) self._result = 'insert_successful' self._affected_resource = str(db_result.inserted_id) self._respond()
[docs] async def put(self, collection, resource_id=None): """HTTP-PUT to REST api endpoint. Replace existing resource with data in request body. :param collection: collection type to replace. :type collection: str :param resource_id: resource ID to replace. Optional for completeness in handler configuration. It is however a :exc:`kuha_common.server.BadRequest` if not submitted. :type resource_id: str or None :raises: :exc:`kuha_common.server.BadRequest` if requested endpoint does not contain resource_id or if database operation raises one of :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` :raises: :exc:`kuha_common.server.ResourceNotFound` if resource_id returns no results. """ if resource_id is None: raise server.BadRequest( "PUT only supports request with resource id" ) self.assert_request_content_type(self.CONTENT_TYPE_JSON) self.assert_body_not_empty() db = self.get_db() try: db_result = await db.replace_json(collection, resource_id, self.request.body, server.ResourceNotFound) except db.recoverable_errors as exc: raise server.BadRequest(msg=str(exc)) self._result = 'replace_successful' self._affected_resource = str(db_result.upserted_id)\ if db_result.upserted_id else resource_id self._respond()
[docs] async def delete(self, collection, resource_id=None): """HTTP-DELETE to REST api endpoint. Delete resource or all resources of certain type. :param collection: type of collection :type collection: str :param resource_id: resource ID to delete. :type resource_id: str or None :raises: :exc:`kuha_common.server.BadRequest` if database operation raises one of :attr:`kuha_document_store.database.DocumentStoreDatabase.recoverable_errors` :raises: :exc:`kuha_common.server.ResourceNotFound` if resoure_id returns no results. """ db = self.get_db() if resource_id: try: db_result = await db.delete_by_oid(collection, resource_id) except db.recoverable_errors as exc: raise server.BadRequest(str(exc)) if db_result.deleted_count == 0: raise server.ResourceNotFound(context=resource_id) self._affected_resource = resource_id else: db_result = await db.delete_many(collection, {}) self._affected_resource = 'affected %s records' % db_result.deleted_count self._result = 'delete_successful' self._respond()
[docs]class ImportHandler(BaseHandler): """Handle request to import endpoint.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._error = None self._imported_docs = None self._result = None
[docs] async def prepare(self): """Prepare for each request. All requests must define content type for XML. All requests must contain body data. """ await super().prepare() self.assert_request_content_type(self.CONTENT_TYPE_XML) self.assert_body_not_empty("Submit data in message body")
def _get_importer(self, importer_id): return self.application.settings['importers'].get(importer_id) def _respond(self): self.finish({ 'error': self._error, 'imported_docs': self._imported_docs, 'result': self._result })
[docs] async def post(self, importer_id, collection=None): """HTTP-POST to import endpoint. Lookup correct importer. Load iterative parser. Pass iterative parser to database for processing. :param importer_id: importer to use for importing. :type importer_id: str :param collection: Optional parameter limits the import to a spesific collection (resource type). :type collection: str or None """ importer = self._get_importer(importer_id) try: records = importer(self.request.body).select(collection) self._imported_docs = await self.get_db().bulk_insert_or_update_record(records) except MappingError as exc: self.set_status(400) self._error = str(exc) self._result = 'import failed' self._respond() raise self._result = 'import successful' self._respond()
[docs]class QueryHandler(BaseHandler): """Handle request to query endpoint. :note: Results will be streamed. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._query = None self._query_type = None
[docs] async def prepare(self): """Prepare for each request. Request content type must be JSON. Request body must not be empty. Requested query type must be supported and query must have valid parameters. """ await super().prepare() self.assert_request_content_type(self.CONTENT_TYPE_JSON) self.assert_body_not_empty("Give query-object in message body") self._query_type = await self._get_query_type() self._query = await self._get_query()
async def _get_query_type(self): query_type = self.get_query_argument( Query.k_query_type, Query.query_type_select ) if not Query.is_valid_query_type(query_type): raise server.BadRequest("Invalid query type: {}. Supported query types: {}.". format(query_type, ', '.join(Query.supported_query_types))) return query_type async def _get_query(self): query = self.request.body # Decode to dict try: query = self.get_db().json_decode(query) except JSONDecodeError as exc: raise server.BadRequest(msg=str(exc)) if not Query.is_valid_query(query, self._query_type): supported = Query.get_valid_params(self._query_type) raise server.BadRequest("Invalid query parameters: {}. Supported parameters: {}". format(query, ', '.join(supported))) return query
[docs] async def post(self, collection): """HTTP-POST to query endpoint. Streams the results one JSON document at a time. Thus, the result of a response for multiple records will not a a valid JSON document. :note: Body must be a JSON object. :param collection: collection (resource type) to query. :type collection: str """ logging.debug("Begin query %s %s %s", self._query_type, collection, self._query) _filter = self._query.get(Query.k_filter) if self._query_type == Query.query_type_select: _fields = self._query.get(Query.k_fields) _skip = self._query.get(Query.k_skip, 0) _limit = self._query.get(Query.k_limit, 0) _sort_by = self._query.get(Query.k_sort_by) _sort_order = self._query.get(Query.k_sort_order) await self.get_db().query_multiple(collection, _filter, self._write_and_flush, fields=_fields, skip=_skip, limit=_limit, sort_by=_sort_by, sort_order=_sort_order) elif self._query_type == Query.query_type_count: result = await self.get_db().count(collection, _filter) await self._write_and_flush({'count': result}) elif self._query_type == Query.query_type_distinct: fieldname = self._query[Query.k_fieldname] result = await self.get_db().query_distinct(collection, fieldname, _filter) await self._write_and_flush({fieldname: result}) logging.debug("Finished query %s %s %s", self._query_type, collection, self._query) self.finish()