Source code for kuha_common.server

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Common server functions & classes used in Kuha.
"""
import uuid
import datetime
import time
import json
import logging
import traceback

import tornado.web
import tornado.httpserver
import tornado.ioloop

logger = logging.getLogger('json_logger')

#: Level for :func:`log_request`. Use level from python logging-module.
REQUEST_LOG_LEVEL = 'info'
#: Level for :func:`log_exception`. Use level from python logging-module.
EXCEPTION_LOG_LEVEL = 'error'
TIMESTAMP_FMT_LOG = '%Y-%m-%dT%H:%M:%S.%fZ'
TIMESTAMP_FMT_REQUEST = '%Y-%m-%dT%H:%M:%SZ'
REQUEST_LOG_BODY_MAXLEN = 1024


def _log_timestamp():
    return datetime.datetime.utcnow().strftime(TIMESTAMP_FMT_LOG)


def _log_get_request_body_str(request_body):
    if hasattr(request_body, 'encode'):
        # Expect unicode string
        if len(request_body.encode('utf-8')) > REQUEST_LOG_BODY_MAXLEN:
            logger.debug("Omitting large request body:")
            logger.debug(request_body)
            return None
        return request_body
    # Expect bytes
    if len(request_body) > REQUEST_LOG_BODY_MAXLEN:
        logger.debug("Omitting large request body:")
        logger.debug(request_body)
        return None
    try:
        request_body = request_body.decode('utf-8')
    except UnicodeDecodeError:
        request_body = str(request_body)
    return request_body


[docs]def log_request(handler): """Log request output to JSON. Gets called after each successful response. :param handler: handler for the current request. :type handler: subclass of ``tornado.web.RequestHandler`` """ try: host, port = handler.request.host.split(':') except ValueError: port = None host = handler.request.host_name headers = {key: value for key, value in handler.request.headers.items()} timestamp_log = _log_timestamp() timestamp_request = time.strftime( TIMESTAMP_FMT_REQUEST, time.gmtime(handler.request._start_time) ) request_body = None request_time = 1000.0 * handler.request.request_time() if handler.request.method in ['POST', 'PUT', 'PATCH']: request_body = _log_get_request_body_str(handler.request.body) if request_body is None: request_body = 'Omitting large request body' elif headers.get('Content-Type') == RequestHandler.CONTENT_TYPE_JSON: try: request_body = json.loads(request_body) except json.decoder.JSONDecodeError: pass msg = { 'request_time_ms': request_time, 'request_timestamp': timestamp_request, 'log_timestamp': timestamp_log, 'host': host, 'port': port, 'level': REQUEST_LOG_LEVEL, 'status': handler.get_status(), 'request': { 'method': handler.request.method, 'url': handler.request.full_url(), 'remote_ip': handler.request.remote_ip, 'headers': headers, 'request_body': request_body } } # Parameter ensure_ascii in json.dumps is set to True as a default. # Therefore all non-ASCII characters are escaped. getattr(logger, REQUEST_LOG_LEVEL)(json.dumps(msg))
[docs]def log_exception(typ, value, tb, correlation_id): """Log exception output to JSON. Gets called from :class:`RequestHandler`. :param typ: type of exception :param value: caught exception :param tb: traceback :param correlation_id: correlation id of the request that ended in exception. """ # Get last four lines of traceback traceback_for_json = [] lines = traceback.extract_tb(tb) for line in lines: traceback_for_json.append({ 'file': line[0], 'lineno': line[1], 'function': line[2], 'text': line[3] }) status_code = str(value.status_code) if\ hasattr(value, 'status_code') else\ None msg = { 'level': EXCEPTION_LOG_LEVEL, 'log_time': _log_timestamp(), 'exception_type': str(typ), 'msg': str(value), 'http_status_code': status_code, 'request_correlation_id': correlation_id.get(), 'traceback': traceback_for_json } getattr(logger, EXCEPTION_LOG_LEVEL)(json.dumps(msg))
[docs]def str_api_endpoint(api_version, suffix=None): """Helper function to prepend endpoints with api_version. :param api_version: version of the api. :type api_version: str :param suffix: api endpoint. :type suffix: str :returns: str -- endpoint prepended with ``api_version`` """ if suffix: return r"/{}/{}".format(api_version, suffix) return r"/{}".format(api_version)
class CorrelationID: """Create and store correlation id of the request.""" _header_key = 'X-REQUEST-ID' def __init__(self, headers): self._id = headers[self._header_key] if self._header_key in headers else self.create() @staticmethod def create(): """Create and return new correlation id. :returns: uuid.UUID -- correlation id """ return uuid.uuid4() def get(self): """Return stored correlation id as str. :returns: str -- correlation id """ return str(self._id) def as_header(self): """Return stored correlation id to be used as http-header. :returns: dict -- correlation id with header key for using it in ``tornado.httpclient.HTTPRequest`` as header """ return {self.get_header_key(): self.get()} @classmethod def set_header_key(cls, key): """Configure header key for correlation id. :param key: HTTP header key to use with correlation id. :type key: str """ cls._header_key = key @classmethod def get_header_key(cls): """Get HTTP header key for correlation id. :returns: str -- header key. """ return cls._header_key
[docs]class KuhaServerError(Exception): """Base class for common HTTP-exceptions""" def __init__(self, msg, status_code=500, context=None): super().__init__(msg) self.msg = msg self.status_code = status_code self.context = context def __str__(self): if self.context: result = "{}: {}".format(self.msg, self.context) else: result = str(self.msg) return result
[docs]class InvalidContentType(KuhaServerError): """Invalid content type HTTP-exception.""" def __init__(self, requested_content_type, supported_content_type): super().__init__("Invalid content type: {}. Endpoint supports {}". format(requested_content_type, supported_content_type), 415)
[docs]class BadRequest(KuhaServerError): """Bad request HTTP-exception.""" def __init__(self, msg=None): msg = msg if msg else \ "The request could not be understood by the server due to malformed or missing syntax." super().__init__(msg, 400)
[docs]class ResourceNotFound(KuhaServerError): """Resource not found HTTP-exception.""" def __init__(self, msg=None, context=None): msg = msg if msg else\ 'Resource not found' super().__init__(msg, 404, context)
[docs]class RequestHandler(tornado.web.RequestHandler): """Common request handler for kuha server applications. Subclass in application specific handlers. """ CONTENT_TYPE_JSON = 'application/json' CONTENT_TYPE_XML = 'text/xml' CONTENT_TYPE_POST = 'application/x-www-form-urlencoded' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._correlation_id = None self._output_content_type = None self._output_charset = None
[docs] async def prepare(self): """Prepare each request. Look for correlation id; create one if not found. Set correlation id to response header. """ self._correlation_id = CorrelationID(self.request.headers) self.set_header(CorrelationID.get_header_key(), self._correlation_id.get())
[docs] def set_output_content_type(self, ct, charset='UTF-8'): """Sets content type for responses. :param ct: content type for response. :type ct: str :param charset: charset definition for response content type. :type charset: str """ if ct not in [self.CONTENT_TYPE_JSON, self.CONTENT_TYPE_XML]: raise Exception("Unsupported content type: {}".format(ct)) self._output_content_type = ct self._output_charset = charset self.set_header('Content-Type', '{}; charset={}'.format(ct, charset))
[docs] def log_exception(self, typ, value, tb): """Overrides tornados exception logging. Sends HTTP errors as responses. Calls customised :func:`log_exception` which outputs JSON encoded log messages with request correlation ids. For easier debugging it also calls ``tornado.web.RequestHandler.log_exception`` to output full traceback. :param typ: type of exception :param value: caught exception :param tb: traceback """ if isinstance(value, (KuhaServerError, tornado.web.HTTPError)): self.send_error(value.status_code, reason=str(value)) log_exception(typ, value, tb, self._correlation_id) super().log_exception(typ, value, tb)
[docs] def assert_request_content_type(self, supported_content_type): """Assert request has correct content type header. :param supported_content_type: content type supported by endpoint. :type supported_content_type: str :raises InvalidContentType: if request has invalid content type. """ _req_ct = self.request.headers.get('Content-Type') _supp_ct = supported_content_type if not _req_ct == _supp_ct: raise InvalidContentType(_req_ct, _supp_ct)
[docs] def write_error(self, status_code, **kwargs): r"""Overrides ``tornado.web.RequestHandler.write_error``. Outputs error messages in preferred content type. :param status_code: HTTP status code. :type status_code: int :param \*\*kwargs: keyword arguments are passed to ``tornado.web.RequestHandler.write_error`` if output content type is not application/json """ if self._output_content_type == self.CONTENT_TYPE_JSON: self.finish({ 'code': status_code, 'message': self._reason }) else: super().write_error(status_code, **kwargs)
[docs]class WebApplication(tornado.web.Application): """Override ``tornado.web.Application`` to make sure server applications are using correct initialization parameters. """ def __init__(self, handlers): super().__init__(handlers, autoreload=False, debug=False)
[docs] def log_request(self, handler): """Override ``tornado.web.Application.log_request``. Server uses it's own implementation of log_request. :param handler: Handler of current request. :type handler: Subclass of ``tornado.web.RequestHandler`` """ log_request(handler)
[docs]def serve(web_app, port, process_count=0, on_exit=None): """Serve web application. :param web_app: application to serve :type web_app: :obj:`tornado.web.Application` :param port: Port to listen to. :type port: int :param process_count: number of processes to spawn. 0 = forks one process per cpu. :type process_count: int :param on_exit: callback on server/ioloop stop. :type on_exit: function """ server = tornado.httpserver.HTTPServer(web_app) server.bind(port) # 0 = fork one process per cpu, 1 = single process server.start(process_count) logging.info("Serving on port %s...", port) ioloop = None try: ioloop = tornado.ioloop.IOLoop.current() ioloop.start() finally: server.stop() if on_exit: on_exit() if ioloop: ioloop.stop()