Source code for kuha_common.server

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Common server functions & classes used in Kuha.
"""
import uuid
import json
import logging

import tornado.web
import tornado.httpserver
import tornado.ioloop
from py12flogging.log_formatter import LM_KEY_HOSTIP

from kuha_common import conf


DEFAULT_PROCESS_COUNT = 0
REQUEST_LOG_BODY_MAXLEN = 1024
_server_conf = {'server_process_count': DEFAULT_PROCESS_COUNT}

_logger = logging.getLogger(__name__)


class LoggingContext:

    def __init__(self):
        self._ctx = []

    def iter_ctx(self):
        for item in self._ctx:
            yield item

    def push_to_ctx(self, key, value, overwrite=False):
        self._ctx.append((key, value, overwrite))

    def clear(self):
        self._ctx = []


_ctx = LoggingContext()


def serverlog_ctx_populator(pusher):
    for key, value, overwrite in _ctx.iter_ctx():
        pusher(key, value, overwrite)


def _get_request_body_str(request_body):
    is_unicode_str = False
    if hasattr(request_body, 'encode'):
        # Expect unicode string
        req_body_len = len(request_body.encode('utf-8'))
        is_unicode_str = True
    else:
        # Expect bytes
        req_body_len = len(request_body)
    if req_body_len > REQUEST_LOG_BODY_MAXLEN:
        _logger.debug("Omitting large request body:")
        _logger.debug(request_body)
        return None
    if is_unicode_str:
        return request_body
    # Expect bytes
    try:
        request_body = request_body.decode('utf-8')
    except UnicodeDecodeError:
        request_body = str(request_body)
    return request_body


def request_to_ctx(handler, overwrite=False):
    """Set information of http traffic to logging context.

       * status
       * host_ip
       * correlation_id
       * request_time_ms
       * request.method
       * request.url
       * request.remote_ip
       * request.request_body

    :param :obj:`tornado.web.RequestHandler` handler: Current request handler
    :param bool overwrite: Overwrite HTTP request information. Host IP is always overwritten.
                           Defaults to False.
    """
    try:
        host_ip, _ = handler.request.host.split(':')
    except ValueError:
        host_ip = handler.request.host_name
    request_body = None
    # handler.request.headers is a tornado.httputil.HTTPHeaders object,
    # which can be casted to a dict. If headers should contains multiple values for
    # the same key, the values are joined by a comma.
    headers = dict(handler.request.headers)
    if handler.request.method in ('POST', 'PUT', 'PATCH'):
        request_body = _get_request_body_str(handler.request.body)
        if request_body is None:
            request_body = 'Omitting large request body'
        elif headers.get('Content-Type') == RequestHandler.CONTENT_TYPE_JSON:
            try:
                request_body = json.loads(request_body)
            except json.decoder.JSONDecodeError:
                pass
    if not hasattr(handler, 'correlation_id') or handler.correlation_id is None:
        corr_id = CorrelationID(handler.request.headers).get()
    else:
        corr_id = handler.correlation_id.get()
    _ctx.push_to_ctx('correlation_id', corr_id, overwrite=overwrite)
    _ctx.push_to_ctx('status', handler.get_status(), overwrite=overwrite)
    _ctx.push_to_ctx('request_time_ms', 1000.0 * handler.request.request_time(), overwrite=overwrite)
    _ctx.push_to_ctx(LM_KEY_HOSTIP, host_ip, overwrite=True)
    _ctx.push_to_ctx('request', {'method': handler.request.method,
                                 'url': handler.request.full_url(),
                                 'headers': headers,
                                 'request_body': request_body,
                                 'remote_ip': handler.request.remote_ip}, overwrite=overwrite)


[docs]def log_request(handler): """Gets called after each completed HTTP request. Note that even requests that end up in exceptions are usually responded, thus successfully handled. :param :obj:`tornado.web.RequestHandler` handler: Current request handler. :returns: None """ request_to_ctx(handler, overwrite=True) status = handler.get_status() if status < 400: level = 'info' elif status < 500: level = 'warning' else: level = 'error' getattr(_logger, level)('%d %s', status, handler._request_summary()) # Clear context since this is the last logmessage regarding this particular # http request _ctx.clear()
[docs]def str_api_endpoint(api_version, suffix=None): """Helper function to prepend endpoints with api_version. :param api_version: version of the api. :type api_version: str :param suffix: api endpoint. :type suffix: str :returns: str -- endpoint prepended with ``api_version`` """ if suffix: return r"/{}/{}".format(api_version, suffix) return r"/{}".format(api_version)
class CorrelationID: """Create and store correlation id of the request.""" _header_key = 'X-REQUEST-ID' def __init__(self, headers): """Create correlation ID. If correlation ID is found from headers, extract and save it. Otherwise create a new one. :param :obj:`tornado.httputil.HTTPHeaders` headers: Request headers. """ self._id = headers.get(self._header_key) or self.create() @staticmethod def create(): """Create and return new correlation id. :returns: uuid.UUID -- correlation id """ return uuid.uuid4() def get(self): """Return stored correlation id as str. :returns: str -- correlation id """ return str(self._id) def as_header(self): """Return stored correlation id to be used as http-header. :returns: dict -- correlation id with header key for using it in ``tornado.httpclient.HTTPRequest`` as header """ return {self.get_header_key(): self.get()} @classmethod def get_header_key(cls): """Get HTTP header key for correlation id. :returns: str -- header key. """ return cls._header_key
[docs]class InvalidContentType(tornado.web.HTTPError): """Invalid content type HTTP-exception.""" def __init__(self, requested_content_type, supported_content_type): log_message = "Invalid content type: {}. Endpoint supports {}".format(requested_content_type, supported_content_type) super().__init__(status_code=415, log_message=log_message)
[docs]class BadRequest(tornado.web.HTTPError): """Bad request HTTP-exception.""" def __init__(self, log_message=None): log_message = log_message or ("The request could not be understood by the server due to " "malformed or missing syntax.") super().__init__(status_code=400, log_message=log_message)
[docs]class ResourceNotFound(tornado.web.HTTPError): """Resource not found HTTP-exception.""" def __init__(self, log_message=None, context=None): log_message = log_message or 'Resource not found' if context is not None: log_message += ': %s' % (context,) super().__init__(status_code=404, log_message=log_message)
[docs]class Conflict(tornado.web.HTTPError): """Resource in conflicting state""" def __init__(self, log_message=None): super().__init__(status_code=409, log_message=log_message)
[docs]class RequestHandler(tornado.web.RequestHandler): """Common request handler for kuha server applications. Subclass in application specific handlers. """ CONTENT_TYPE_JSON = 'application/json' CONTENT_TYPE_XML = 'text/xml' CONTENT_TYPE_POST = 'application/x-www-form-urlencoded' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._correlation_id = None self._output_content_type = None self._output_charset = None
[docs] async def prepare(self): """Prepare each request. Look for correlation id; create one if not found. Set correlation id to response header. Prepare logging context with HTTP request information. """ self._correlation_id = CorrelationID(self.request.headers) self.set_header(CorrelationID.get_header_key(), self._correlation_id.get()) self.application.settings['request_to_ctx'](self)
@property def correlation_id(self): """Public read-only interface to get response correlation ID. To get request correlation ID, use request.headers.get(correlation_id.get_header_key()) :returns: CorrelationID :rtype: :obj:`CorrelationID` """ return self._correlation_id
[docs] def set_output_content_type(self, ct, charset='UTF-8'): """Sets content type for responses. :param ct: content type for response. :type ct: str :param charset: charset definition for response content type. :type charset: str """ if ct not in [self.CONTENT_TYPE_JSON, self.CONTENT_TYPE_XML]: raise Exception("Unsupported content type: {}".format(ct)) self._output_content_type = ct self._output_charset = charset self.set_header('Content-Type', '{}; charset={}'.format(ct, charset))
[docs] def log_exception(self, typ, value, tb): """Overrides tornados exception logging. Sends HTTP errors as responses. For easier debugging it also calls ``tornado.web.RequestHandler.log_exception`` to output full traceback. :param typ: type of exception :param value: caught exception :param tb: traceback """ if isinstance(value, tornado.web.HTTPError): self.send_error(value.status_code, reason=str(value)) self.application.settings['request_to_ctx'](self, overwrite=True) super().log_exception(typ, value, tb) _ctx.clear()
[docs] def assert_request_content_type(self, supported_content_type): """Assert request has correct content type header. :param supported_content_type: content type supported by endpoint. :type supported_content_type: str :raises InvalidContentType: if request has invalid content type. """ _req_ct = self.request.headers.get('Content-Type') _supp_ct = supported_content_type if not _req_ct == _supp_ct: raise InvalidContentType(_req_ct, _supp_ct)
@staticmethod def _validate_occurrence(dct, key, occurrence): """Specialized method used to validate query parameter occurrences. Converts values from bytestrings to strings. If occurrence is 0, the values will only be converted from bytes to strings and returned. If occurrence is 1, the value will be popped out from the list and returned. :param dict dct: query parameters in a dictionary form: tornado's RequestHandler.request.query_arguments. :param str key: Key to lookup from dct. :param int occurrence: number of occurrences. :returns: validated value :rtype: str if occurrence is 1, else list """ value = [byt.decode('utf8') for byt in dct.pop(key)] if occurrence == 0: return value len_val = len(value) if len_val != occurrence: raise BadRequest("Invalid number of query parameters for key '%s'. Expected %d, " "got %d" % (key, occurrence, len_val)) if occurrence == 1: return value[0] return value
[docs] def get_valid_query_arguments(self, *valid_keys_occurrences): """Validate and return query arguments. Validation is performed based on query argument keys. The value is not checked. Request can be checked to contain no query arguments by calling this method without arguments. :param valid_keys_occurrences: optional positional arguments contain all valid keys and their expected occurrences. The parameter is a two-tuple: (<key>, <number_of_expected_occurrences>). If the number of expected occurrences is 0, no validation is performed based on occurrences. If the number of expected occurrences is 1, the value is popped out from the list container: {<key>: [<val>]} -> {<key>: <val>} :returns: validated query arguments. :rtype: dict :raises: :exc:`BadRequest` for invalid query argument keys. """ query_args = dict(self.request.query_arguments) valid_query_args = {} for valid_key, occurrence in valid_keys_occurrences: if valid_key in query_args: valid_query_args[valid_key] = self._validate_occurrence( query_args, valid_key, occurrence) if query_args != {}: raise BadRequest('Invalid query parameters: %s' % (', '.join("'%s'" % (x,) for x in query_args.keys()),)) return valid_query_args
[docs] def write_error(self, status_code, **kwargs): r"""Overrides ``tornado.web.RequestHandler.write_error``. Outputs error messages in preferred content type. :param status_code: HTTP status code. :type status_code: int :param \*\*kwargs: keyword arguments are passed to ``tornado.web.RequestHandler.write_error`` if output content type is not application/json """ if self._output_content_type == self.CONTENT_TYPE_JSON: self.finish({ 'code': status_code, 'message': self._reason }) else: super().write_error(status_code, **kwargs)
[docs]class WebApplication(tornado.web.Application): def __init__(self, *args, **kwargs): if 'request_to_ctx' not in kwargs: kwargs.update({'request_to_ctx': request_to_ctx}) if 'log_function' not in kwargs: kwargs.update({'log_function': log_request}) super().__init__(*args, **kwargs)
[docs]def serve(web_app, port, on_exit=None): """Serve web application. :param web_app: application to serve :type web_app: :obj:`tornado.web.Application` :param port: Port to listen to. :type port: int :param on_exit: callback on server/ioloop stop. :type on_exit: function """ server = tornado.httpserver.HTTPServer(web_app) server.bind(port) process_count = _server_conf['server_process_count'] # 0 = fork one process per cpu, 1 = single process server.start(process_count) _logger.info("Serving on port %s...", port) ioloop = None try: ioloop = tornado.ioloop.IOLoop.current() ioloop.start() finally: server.stop() if on_exit: on_exit() if ioloop: ioloop.stop()
def add_cli_args(): conf.add('--server-process-count', help='Set number of processes that tornado server will fork. ' '0 forks one process for each processor core.', default=DEFAULT_PROCESS_COUNT, type=int, env_var='SERVER_PROCESS_COUNT') def configure(settings): _server_conf['server_process_count'] = settings.server_process_count