Source code for kuha_common.document_store.client

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""``kuha_common.document_store.client`` provides
a http client interface to communicate with Kuha Document Store. """

import logging
import collections
from functools import partial
from inspect import iscoroutinefunction
from json.decoder import JSONDecodeError

from tornado.escape import json_encode, json_decode
import tornado.httpclient
import tornado.gen

from kuha_common import conf

from .constants import (
    DS_CLIENT_MAX_CLIENTS,
    DS_CLIENT_REQUEST_TIMEOUT,
    DS_CLIENT_CONNECT_TIMEOUT,
    DS_CLIENT_SLEEP_ON_QUEUE
)

logger = logging.getLogger(__name__)


[docs]class JSONStreamClient: """Base class used for requests. Implements own queue to store requests, since ``tornado.httpclient.AsyncHTTPClient`` starts timers for ``request_timeout`` and ``connect_timeout`` at the moment we call client.fetch(). See https://github.com/tornadoweb/tornado/issues/1400 for more details. Handles JSON decoding of incoming chunks and the encoding of body to JSON. """ #: Sets sleep timer for queue. sleep_on_queue = DS_CLIENT_SLEEP_ON_QUEUE max_clients = DS_CLIENT_MAX_CLIENTS request_timeout = DS_CLIENT_REQUEST_TIMEOUT connect_timeout = DS_CLIENT_CONNECT_TIMEOUT _concurrent_requests = 0 _content_type_json = {'Content-Type': 'application/json'} def __init__(self): tornado.httpclient.AsyncHTTPClient.configure(None, max_clients=self.max_clients) # AsyncHTTPClient() always returns the same instance. self.client = tornado.httpclient.AsyncHTTPClient() self._queued_requests = collections.deque() self._callbacks = collections.deque() self._json_chunk = b''
[docs] def request(self, url, **kwargs): """Constucts a streaming request. :param url: url to request. :type url: str :param kwargs: keyword arguments passed to ``tornado.httpclient.HTTPRequest`` :returns: ``tornado.httpclient.HTTPRequest`` """ logger.debug("Constructing request to %s", url) return tornado.httpclient.HTTPRequest( url, connect_timeout=self.connect_timeout, request_timeout=self.request_timeout, **kwargs )
@classmethod def _increase_concurrent_requests(cls): cls._concurrent_requests += 1 @classmethod def _decrease_concurrent_requests(cls): cls._concurrent_requests -= 1 @classmethod def _get_concurrent_requests(cls): return cls._concurrent_requests async def _on_response(self, future): response = await future logger.debug("Got response code: %s", response.code) self._decrease_concurrent_requests() if response.error: response.rethrow() def _get_full_chunk(self, chunk): prev_chunk = self._json_chunk self._json_chunk = b'' return prev_chunk + chunk def _store_incomplete_chunk(self, chunk): self._json_chunk = chunk def _on_json_chunk(self, callback, json_chunk): chunk = self._get_full_chunk(json_chunk) try: _data = json_decode(chunk) except JSONDecodeError: self._store_incomplete_chunk(chunk) else: if iscoroutinefunction(callback): self._callbacks.append(callback(_data)) else: callback(_data)
[docs] def wrap_streaming_callback(self, callback): """Wrap streaming callback to support chunked JSON responses. :param callback: streaming callback. Gets called with response which is decoded to python object from JSON. :type callback: callable. :returns: Wrapped callback :rtype: :class:`functools.partial` """ return partial(self._on_json_chunk, callback)
[docs] async def execute_stored_callbacks(self): """Executes asynchronous callbacks stored in :attr:`_callbacks` """ while self._callbacks: _call = self._callbacks.popleft() await _call
[docs] async def run_queued_requests(self, queued_requests=None): """Run queued requests. Calls queued requests asynchronically. Sleeps for :attr:`sleep_on_queue` if :attr:`max_clients` reached. :param queued_requests: Optionally pass queued_requests to run. :type queued_requests: :obj:`collections.deque` """ running_requests = [] queued_requests = queued_requests or self._queued_requests while self._get_concurrent_requests() == self.max_clients: # Sleep while waiting for concurrent requests to complete. # No need for a timeout, since the requests and connections get timed-out. # It will eventually timeout every concurrent requests if need be. logger.debug("Max clients reached. Sleeping for %s", self.sleep_on_queue) await tornado.gen.sleep(self.sleep_on_queue) while queued_requests: request = queued_requests.popleft() logger.debug("Run queued request") running_requests.append(self._on_response(self.client.fetch(request))) self._increase_concurrent_requests() if self._get_concurrent_requests() == self.max_clients: await tornado.gen.multi(running_requests) await self.execute_stored_callbacks() running_requests.clear() if running_requests: await tornado.gen.multi(running_requests) await self.execute_stored_callbacks()
[docs] def get_streaming_request(self, streaming_callback, url, body=None, method=None, headers=None, **kw): """Get a streaming request. Sets default headers ``Content-Type: application/json`` if not already given. Encodes body to JSON if given and is not string or bytes. If response is empty (for example query with no results) the streaming callback doesn't get called. Subclass and override to support arbitrary requests. :param streaming_callback: callback which receives the response if any. :type streaming_callback: callable :param url: URL to send request to. :type url: str :param body: Optional request body. String will be supplied as is. Other values will be encoded to JSON. :type body: str, dict, list, tuple, integer, float or None :param method: HTTP method. Defaults to ``POST``. :type method: str or None :param headers: optional request headers. if Content-Type is not set, will set 'Content-Type': 'application/json' as default. :type headers: dict or None :returns: HTTP request :rtype: :obj:`tornado.httpclient.HTTPRequest` """ if headers is None: headers = {} if 'Content-Type' not in headers: headers.update(self._content_type_json) if method is None: method = 'POST' if body is not None and\ not hasattr(body, 'capitalize'): body = json_encode(body) streaming_callback = self.wrap_streaming_callback(streaming_callback) return self.request( url, method=method, headers=headers, body=body, streaming_callback=streaming_callback, **kw )
[docs] async def queue_request(self, *args, **kwargs): r"""Queue request to be run aynchronously by calling ``run_queued_requests``. :param \*args: arguments passed to ``get_streaming_request`` :param \*\*kwargs: keyword arguments passed to ``get_streaming_request``. :returns: :meth:`run_queued_requests` method to call to run the queued requests. """ request = self.get_streaming_request(*args, **kwargs) self._queued_requests.append(request) return self.run_queued_requests
[docs] async def fetch(self, *args, **kwargs): r"""Run single query. :param \*args: arguments passed to ``queue_requests``. :param \*\*kwargs: keyword arguments passed to ``queue_requests`` """ request = self.get_streaming_request(*args, **kwargs) await self.run_queued_requests(collections.deque([request]))
def add_cli_args(): conf.add('--document-store-client-request-timeout', help='Configure the request timeout (in seconds) for Document Store client.', default=DS_CLIENT_REQUEST_TIMEOUT, env_var='DOCUMENT_STORE_CLIENT_REQUEST_TIMEOUT', type=int) conf.add('--document-store-client-connect-timeout', help='Configure the connect timeout (in seconds) for Document Store client.', default=DS_CLIENT_CONNECT_TIMEOUT, env_var='DOCUMENT_STORE_CLIENT_CONNECT_TIMEOUT', type=int) conf.add('--document-store-client-max-clients', help='Configure maximum for simultaneous client connections for Document Store client.', default=DS_CLIENT_MAX_CLIENTS, env_var='DOCUMENT_STORE_CLIENT_MAX_CLIENTS', type=int) def configure(settings): JSONStreamClient.max_clients = settings.document_store_client_max_clients JSONStreamClient.request_timeout = settings.document_store_client_request_timeout JSONStreamClient.connect_timeout = settings.document_store_client_connect_timeout