#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Common server functions & classes used in Kuha.
"""
import uuid
import datetime
import time
import json
import logging
import traceback
import tornado.web
import tornado.httpserver
import tornado.ioloop
logger = logging.getLogger('json_logger')
#: Level for :func:`log_request`. Use level from python logging-module.
REQUEST_LOG_LEVEL = 'info'
#: Level for :func:`log_exception`. Use level from python logging-module.
EXCEPTION_LOG_LEVEL = 'error'
TIMESTAMP_FMT_LOG = '%Y-%m-%dT%H:%M:%S.%fZ'
TIMESTAMP_FMT_REQUEST = '%Y-%m-%dT%H:%M:%SZ'
REQUEST_LOG_BODY_MAXLEN = 1024
def _log_timestamp():
return datetime.datetime.utcnow().strftime(TIMESTAMP_FMT_LOG)
def _log_get_request_body_str(request_body):
if hasattr(request_body, 'encode'):
# Expect unicode string
if len(request_body.encode('utf-8')) > REQUEST_LOG_BODY_MAXLEN:
logger.debug("Omitting large request body:")
logger.debug(request_body)
return None
return request_body
# Expect bytes
if len(request_body) > REQUEST_LOG_BODY_MAXLEN:
logger.debug("Omitting large request body:")
logger.debug(request_body)
return None
try:
request_body = request_body.decode('utf-8')
except UnicodeDecodeError:
request_body = str(request_body)
return request_body
[docs]def log_request(handler):
"""Log request output to JSON. Gets called after each
successful response.
:param handler: handler for the current request.
:type handler: subclass of ``tornado.web.RequestHandler``
"""
try:
host, port = handler.request.host.split(':')
except ValueError:
port = None
host = handler.request.host_name
headers = {key: value for key, value in handler.request.headers.items()}
timestamp_log = _log_timestamp()
timestamp_request = time.strftime(
TIMESTAMP_FMT_REQUEST,
time.gmtime(handler.request._start_time)
)
request_body = None
request_time = 1000.0 * handler.request.request_time()
if handler.request.method in ['POST', 'PUT', 'PATCH']:
request_body = _log_get_request_body_str(handler.request.body)
if request_body is None:
request_body = 'Omitting large request body'
elif headers.get('Content-Type') == RequestHandler.CONTENT_TYPE_JSON:
try:
request_body = json.loads(request_body)
except json.decoder.JSONDecodeError:
pass
msg = {
'request_time_ms': request_time,
'request_timestamp': timestamp_request,
'log_timestamp': timestamp_log,
'host': host,
'port': port,
'level': REQUEST_LOG_LEVEL,
'status': handler.get_status(),
'request': {
'method': handler.request.method,
'url': handler.request.full_url(),
'remote_ip': handler.request.remote_ip,
'headers': headers,
'request_body': request_body
}
}
# Parameter ensure_ascii in json.dumps is set to True as a default.
# Therefore all non-ASCII characters are escaped.
getattr(logger, REQUEST_LOG_LEVEL)(json.dumps(msg))
[docs]def log_exception(typ, value, tb, correlation_id):
"""Log exception output to JSON. Gets called
from :class:`RequestHandler`.
:param typ: type of exception
:param value: caught exception
:param tb: traceback
:param correlation_id: correlation id of the request that
ended in exception.
"""
# Get last four lines of traceback
traceback_for_json = []
lines = traceback.extract_tb(tb)
for line in lines:
traceback_for_json.append({
'file': line[0],
'lineno': line[1],
'function': line[2],
'text': line[3]
})
status_code = str(value.status_code) if\
hasattr(value, 'status_code') else\
None
msg = {
'level': EXCEPTION_LOG_LEVEL,
'log_time': _log_timestamp(),
'exception_type': str(typ),
'msg': str(value),
'http_status_code': status_code,
'request_correlation_id': correlation_id.get(),
'traceback': traceback_for_json
}
getattr(logger, EXCEPTION_LOG_LEVEL)(json.dumps(msg))
[docs]def str_api_endpoint(api_version, suffix=None):
"""Helper function to prepend endpoints with api_version.
:param api_version: version of the api.
:type api_version: str
:param suffix: api endpoint.
:type suffix: str
:returns: str -- endpoint prepended with ``api_version``
"""
if suffix:
return r"/{}/{}".format(api_version, suffix)
return r"/{}".format(api_version)
class CorrelationID:
"""Create and store correlation id of the request."""
_header_key = 'X-REQUEST-ID'
def __init__(self, headers):
self._id = headers[self._header_key] if self._header_key in headers else self.create()
@staticmethod
def create():
"""Create and return new correlation id.
:returns: uuid.UUID -- correlation id
"""
return uuid.uuid4()
def get(self):
"""Return stored correlation id as str.
:returns: str -- correlation id
"""
return str(self._id)
def as_header(self):
"""Return stored correlation id to be used as http-header.
:returns: dict -- correlation id with header key for using it in
``tornado.httpclient.HTTPRequest`` as header
"""
return {self.get_header_key(): self.get()}
@classmethod
def set_header_key(cls, key):
"""Configure header key for correlation id.
:param key: HTTP header key to use with correlation id.
:type key: str
"""
cls._header_key = key
@classmethod
def get_header_key(cls):
"""Get HTTP header key for correlation id.
:returns: str -- header key.
"""
return cls._header_key
[docs]class KuhaServerError(Exception):
"""Base class for common HTTP-exceptions"""
def __init__(self, msg, status_code=500, context=None):
super().__init__(msg)
self.msg = msg
self.status_code = status_code
self.context = context
def __str__(self):
if self.context:
result = "{}: {}".format(self.msg, self.context)
else:
result = str(self.msg)
return result
[docs]class InvalidContentType(KuhaServerError):
"""Invalid content type HTTP-exception."""
def __init__(self, requested_content_type, supported_content_type):
super().__init__("Invalid content type: {}. Endpoint supports {}".
format(requested_content_type,
supported_content_type),
415)
[docs]class BadRequest(KuhaServerError):
"""Bad request HTTP-exception."""
def __init__(self, msg=None):
msg = msg if msg else \
"The request could not be understood by the server due to malformed or missing syntax."
super().__init__(msg, 400)
[docs]class ResourceNotFound(KuhaServerError):
"""Resource not found HTTP-exception."""
def __init__(self, msg=None, context=None):
msg = msg if msg else\
'Resource not found'
super().__init__(msg, 404, context)
[docs]class RequestHandler(tornado.web.RequestHandler):
"""Common request handler for kuha server applications.
Subclass in application specific handlers.
"""
CONTENT_TYPE_JSON = 'application/json'
CONTENT_TYPE_XML = 'text/xml'
CONTENT_TYPE_POST = 'application/x-www-form-urlencoded'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._correlation_id = None
self._output_content_type = None
self._output_charset = None
[docs] async def prepare(self):
"""Prepare each request.
Look for correlation id; create one if not found.
Set correlation id to response header.
"""
self._correlation_id = CorrelationID(self.request.headers)
self.set_header(CorrelationID.get_header_key(),
self._correlation_id.get())
[docs] def set_output_content_type(self, ct, charset='UTF-8'):
"""Sets content type for responses.
:param ct: content type for response.
:type ct: str
:param charset: charset definition for response content type.
:type charset: str
"""
if ct not in [self.CONTENT_TYPE_JSON, self.CONTENT_TYPE_XML]:
raise Exception("Unsupported content type: {}".format(ct))
self._output_content_type = ct
self._output_charset = charset
self.set_header('Content-Type', '{}; charset={}'.format(ct, charset))
[docs] def log_exception(self, typ, value, tb):
"""Overrides tornados exception logging.
Sends HTTP errors as responses.
Calls customised :func:`log_exception` which outputs JSON encoded log
messages with request correlation ids.
For easier debugging it also calls ``tornado.web.RequestHandler.log_exception``
to output full traceback.
:param typ: type of exception
:param value: caught exception
:param tb: traceback
"""
if isinstance(value, (KuhaServerError, tornado.web.HTTPError)):
self.send_error(value.status_code, reason=str(value))
log_exception(typ, value, tb, self._correlation_id)
super().log_exception(typ, value, tb)
[docs] def assert_request_content_type(self, supported_content_type):
"""Assert request has correct content type header.
:param supported_content_type: content type supported by endpoint.
:type supported_content_type: str
:raises InvalidContentType: if request has invalid content type.
"""
_req_ct = self.request.headers.get('Content-Type')
_supp_ct = supported_content_type
if not _req_ct == _supp_ct:
raise InvalidContentType(_req_ct, _supp_ct)
[docs] def write_error(self, status_code, **kwargs):
r"""Overrides ``tornado.web.RequestHandler.write_error``.
Outputs error messages in preferred content type.
:param status_code: HTTP status code.
:type status_code: int
:param \*\*kwargs: keyword arguments are passed to
``tornado.web.RequestHandler.write_error``
if output content type is not application/json
"""
if self._output_content_type == self.CONTENT_TYPE_JSON:
self.finish({
'code': status_code,
'message': self._reason
})
else:
super().write_error(status_code, **kwargs)
[docs]class WebApplication(tornado.web.Application):
"""Override ``tornado.web.Application`` to make sure server applications
are using correct initialization parameters.
"""
def __init__(self, handlers):
super().__init__(handlers, autoreload=False, debug=False)
[docs] def log_request(self, handler):
"""Override ``tornado.web.Application.log_request``.
Server uses it's own implementation of log_request.
:param handler: Handler of current request.
:type handler: Subclass of ``tornado.web.RequestHandler``
"""
log_request(handler)
[docs]def serve(web_app, port, process_count=0, on_exit=None):
"""Serve web application.
:param web_app: application to serve
:type web_app: :obj:`tornado.web.Application`
:param port: Port to listen to.
:type port: int
:param process_count: number of processes to spawn. 0 = forks one process per cpu.
:type process_count: int
:param on_exit: callback on server/ioloop stop.
:type on_exit: function
"""
server = tornado.httpserver.HTTPServer(web_app)
server.bind(port)
# 0 = fork one process per cpu, 1 = single process
server.start(process_count)
logging.info("Serving on port %s...", port)
ioloop = None
try:
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
finally:
server.stop()
if on_exit:
on_exit()
if ioloop:
ioloop.stop()