#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Test cases for Kuha
"""
import os
import random
import string
import unittest
import asyncio
import copy
from tornado.httpclient import HTTPClient
from tornado.escape import (
json_encode,
json_decode
)
from kuha_common import (
conf,
server
)
from kuha_common.document_store import (
query,
client
)
from kuha_common.document_store.query import Query
from kuha_common.document_store.records import (
REC_STATUS_DELETED,
record_by_collection,
COLLECTIONS,
Study,
Variable,
Question,
StudyGroup
)
_STORE = {'eventloop': None}
[docs]class KuhaUnitTestCase(unittest.TestCase):
"""Base class for unittests.
* Assertion methods to check record equality.
* Helper methods to provide access to dummydata.
"""
#: Override in sublass to lookup dummydata from different directory.
dummydata_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'dummydata')
@classmethod
def prepare_dummyvalues(cls):
cls._dummyvalues = [None]
cls.record_lang_1 = cls.set_val('lang_1')
cls.record_lang_2 = cls.set_val('lang_2')
@classmethod
def discard_dummyvalues(cls):
delattr(cls, 'record_lang_1')
delattr(cls, 'record_lang_2')
cls._dummyvalues = [None]
[docs] @classmethod
def setUpClass(cls):
cls.prepare_dummyvalues()
super(KuhaUnitTestCase, cls).setUpClass()
[docs] @classmethod
def tearDownClass(cls):
cls.discard_dummyvalues()
super(KuhaUnitTestCase, cls).tearDownClass()
[docs] @classmethod
def get_dummydata_path(cls, path):
"""Get absolute path to dummydatafile
:param path: Path. Gets turned into an absolute if it isn't
:returns: Absolute path.
:rtype: str
"""
if not os.path.isabs(path):
path = os.path.abspath(os.path.join(cls.dummydata_dir, path))
return path
[docs] @classmethod
def get_dummydata(cls, path):
"""Get dummydata by reading file from ``path``
:param path: path to file.
:returns: Contents of the file.
"""
path = cls.get_dummydata_path(path)
with open(path, 'r') as _f:
data = _f.read()
return data
[docs] @classmethod
def remove_dummyfile_if_exists(cls, path):
"""Remove dummyfile from ``path`` if it exists.
:param path: path to dummyfile.
:returns: None
"""
path = cls.get_dummydata_path(path)
if os.path.exists(path):
os.remove(path)
[docs] @classmethod
def set_val(cls, value):
"""Assign value as dummyvalue.
:param value: Value to assign
:returns: value
"""
if value in cls._dummyvalues:
raise ValueError('%s already in use' % (value,))
cls._dummyvalues.append(value)
return value
[docs] @classmethod
def gen_val(cls, lenght=None, unique=False, chars=None):
"""Generate & assign dummyvalue.
:param lenght: lenght of the value
:type lenght: int or None
:param unique: should the value be unique
:type unique: bool
:param chars: use specific characters.
:type chars: str or None.
:returns: generated value
:rtype: str
"""
lenght = lenght if lenght else random.randint(1, 50)
if not chars:
whitespace = ' '
chars = string.ascii_letters + whitespace + string.digits
def _val():
return ''.join(
random.choice(chars) for _ in range(lenght)
)
value = _val()
if unique:
retries = 100
while value in cls._dummyvalues:
if not retries:
raise ValueError("Maximum retries exceeded in gen_val()")
value = _val()
retries -= 1
cls._dummyvalues.append(value)
return value
[docs] @classmethod
def gen_id(cls):
"""Generate Id.
:returns: Generated id.
:rtype: str
"""
return cls.gen_val(10, True, string.ascii_letters + string.digits)
[docs] @classmethod
def generate_dummy_study(cls):
"""Generate and return a Study with dummydata.
:returns: study with dummydata
:rtype: :obj:`kuha_common.document_store.records.Study`
"""
study = Study()
study.add_study_number(cls.gen_id())
study.add_persistent_identifiers(cls.gen_val())
study.add_persistent_identifiers(cls.gen_val())
study.add_identifiers(cls.gen_val(), cls.record_lang_1,
**{Study.identifiers.attr_agency.name: cls.gen_val()})
study.add_identifiers(cls.gen_val(), cls.record_lang_1,
**{Study.identifiers.attr_agency.name: cls.gen_val()})
study.add_study_titles(cls.gen_val(), cls.record_lang_1)
study.add_parallel_titles(cls.gen_val(), cls.record_lang_1)
study.add_principal_investigators(
cls.gen_val(), cls.record_lang_1,
**{Study.principal_investigators.attr_organization.name: cls.gen_val(),
Study.principal_investigators.attr_external_link.name: cls.gen_val(),
Study.principal_investigators.attr_external_link_title.name: cls.gen_val(),
Study.principal_investigators.attr_external_link_role.name: cls.gen_val(),
Study.principal_investigators.attr_external_link_uri.name: cls.gen_val()})
study.add_publishers(cls.gen_val(), cls.record_lang_1,
**{Study.publishers.attr_abbreviation.name: cls.gen_val()})
study.add_distributors(cls.gen_val(), cls.record_lang_1,
**{Study.distributors.attr_abbreviation.name: cls.gen_val(),
Study.distributors.attr_uri.name: cls.gen_val()})
study.add_document_uris(cls.gen_val(), cls.record_lang_1,
**{Study.document_uris.attr_location.name: cls.gen_val(),
Study.document_uris.attr_description.name: cls.gen_val()})
study.add_publication_dates(cls.gen_val(), cls.record_lang_1)
study.add_publication_years(cls.gen_val(), cls.record_lang_1,
**{Study.publication_years.attr_distribution_date.name: cls.gen_val()})
study.add_abstract(cls.gen_val(), cls.record_lang_1)
study.add_classifications(cls.gen_val(), cls.record_lang_1,
**{Study.classifications.attr_system_name.name: cls.gen_val(),
Study.classifications.attr_uri.name: cls.gen_val(),
Study.classifications.attr_description.name: cls.gen_val()})
study.add_keywords(cls.gen_val(), cls.record_lang_1,
**{Study.keywords.attr_system_name.name: cls.gen_val(),
Study.keywords.attr_uri.name: cls.gen_val(),
Study.keywords.attr_description.name: cls.gen_val()})
study.add_time_methods(cls.gen_val(), cls.record_lang_1,
**{Study.time_methods.attr_system_name.name: cls.gen_val(),
Study.time_methods.attr_uri.name: cls.gen_val(),
Study.time_methods.attr_description.name: cls.gen_val()})
study.add_sampling_procedures(cls.gen_val(), cls.record_lang_1,
**{Study.sampling_procedures.attr_system_name.name: cls.gen_val(),
Study.sampling_procedures.attr_uri.name: cls.gen_val(),
Study.sampling_procedures.attr_description.name: cls.gen_val()})
study.add_collection_modes(cls.gen_val(), cls.record_lang_1,
**{Study.collection_modes.attr_system_name.name: cls.gen_val(),
Study.collection_modes.attr_uri.name: cls.gen_val(),
Study.collection_modes.attr_description.name: cls.gen_val()})
study.add_analysis_units(cls.gen_val(), cls.record_lang_1,
**{Study.analysis_units.attr_system_name.name: cls.gen_val(),
Study.analysis_units.attr_uri.name: cls.gen_val(),
Study.analysis_units.attr_description.name: cls.gen_val()})
study.add_collection_periods(cls.gen_val(), cls.record_lang_1,
**{Study.collection_periods.attr_event.name: cls.gen_val()})
study.add_study_area_countries(cls.gen_val(), cls.record_lang_1,
**{Study.study_area_countries.attr_abbreviation.name: cls.gen_val()})
study.add_universes(cls.gen_val(), cls.record_lang_1,
**{Study.universes.attr_included.name: True})
study.add_universes(cls.gen_val(), cls.record_lang_1,
**{Study.universes.attr_included.name: False})
study.add_data_access(cls.gen_val(), cls.record_lang_1)
study.add_data_access_descriptions(cls.gen_val(), cls.record_lang_1)
study.add_file_names(cls.gen_val(), cls.record_lang_1)
study.add_instruments(cls.gen_val(), cls.record_lang_1,
**{Study.instruments.attr_instrument_name.name: cls.gen_val()})
study.add_study_groups(cls.gen_id(), cls.record_lang_1,
**{Study.study_groups.attr_name.name: cls.gen_val()})
study.add_copyrights(cls.gen_val(), cls.record_lang_1)
return study
[docs] @classmethod
def generate_dummy_variable(cls):
"""Generate and return a Variable with dummydata.
:returns: variable with dummydata
:rtype: :obj:`kuha_common.document_store.records.Variable`
"""
var = Variable()
var.add_study_number(cls.gen_id())
var.add_variable_name(cls.gen_id())
var.add_question_identifiers(cls.gen_val())
var.add_variable_labels(cls.gen_val(), cls.record_lang_1)
var.add_variable_labels(cls.gen_val(), cls.record_lang_2)
var.add_codelist_codes(cls.gen_val(), cls.record_lang_1,
**{Variable.codelist_codes.attr_label.name: cls.gen_val(),
Variable.codelist_codes.attr_missing.name: True})
var.add_codelist_codes(cls.gen_val(), cls.record_lang_1,
**{Variable.codelist_codes.attr_label.name: cls.gen_val(),
Variable.codelist_codes.attr_missing.name: False})
return var
[docs] @classmethod
def generate_dummy_question(cls):
"""Generate and return a Question with dummydata.
:returns: question with dummydata
:rtype: :obj:`kuha_common.document_store.records.Question`
"""
question = Question()
question.add_study_number(cls.gen_id())
question.add_question_identifier(cls.gen_id())
question.add_variable_name(cls.gen_val())
question.add_question_texts(cls.gen_val(), cls.record_lang_1)
question.add_research_instruments(cls.gen_val(), cls.record_lang_1)
question.add_codelist_references(cls.gen_val(), cls.record_lang_1)
return question
[docs] @classmethod
def generate_dummy_studygroup(cls):
"""Generate and return a StudyGroup with dummydata.
:returns: studygroup with dummydata.
:rtype: :obj:`kuha_common.document_store.records.StudyGroup`
"""
studygroup = StudyGroup()
studygroup.add_study_group_identifier(cls.gen_id())
studygroup.add_study_group_names(cls.gen_val(), cls.record_lang_1)
studygroup.add_study_numbers(cls.gen_id())
return studygroup
@staticmethod
def _to_json_bytestring(value):
return bytes(json_encode(value), encoding='utf8')
[docs] @staticmethod
def run_until_complete(coro):
"""Run coroutine until it's completed
:param coro: Coroutine function.
"""
asyncio.run(coro)
[docs] def setUp(self):
"""Format testcase values and initialize event loop.
Call asynchronous code synchronously::
self.run_until_complete(coro())
"""
super().setUp()
if _STORE['eventloop'] is None:
_STORE['eventloop'] = asyncio.new_event_loop()
asyncio.set_event_loop(_STORE['eventloop'])
self._patchers = []
self._resets = []
self._stored_result = None
self._isolate_conf()
[docs] def tearDown(self):
"""Stop patchers.
"""
for patcher in self._patchers:
patcher.stop()
for reset in self._resets:
reset()
client.JSONStreamClient.connect_timeout = client.DS_CLIENT_CONNECT_TIMEOUT
client.JSONStreamClient.request_timeout = client.DS_CLIENT_REQUEST_TIMEOUT
client.JSONStreamClient.max_clients = client.DS_CLIENT_MAX_CLIENTS
server._server_conf['server_process_count'] = server.DEFAULT_PROCESS_COUNT
query.Query.base_url = None
super().tearDown()
def _isolate_conf(self):
stored = copy.deepcopy(conf._STORED)
def reset():
setattr(conf, '_STORED', stored)
self._resets.append(reset)
return reset
[docs] async def await_and_store_result(self, coro):
"""Await coroutine and store returning result.
Example::
self.run_until_complete(self.await_and_store_result(coro()))
:param coro: Coroutine or Future to await
"""
self._stored_result = await coro
# Patchers
[docs] def init_patcher(self, patcher):
"""Initialize patcher, store for later use, return it.
:param patcher: Patch to start.
:type patcher: :obj:`unittest.mock._patch`
:returns: MagicMock acting as patched object.
:rtype: :class:`unittest.mock.MagicMock`
"""
_mock = patcher.start()
self._patchers.append(patcher)
return _mock
# Assertions
[docs] def assert_records_are_equal(self, first, second, msg=None):
"""Assert two Document Store records are equal.
:param first: First record to compare.
:param second: Second record to compare.
:param msg: Optional message to output on assertion.
"""
if msg is None:
msg = "Record instances are not equal. Expecting equal."
self.assertEqual(first.export_dict(include_metadata=False, include_id=False),
second.export_dict(include_metadata=False, include_id=False),
msg=msg)
[docs] def assert_records_are_not_equal(self, first, second, msg=None):
"""Assert two Document Store records are not equal.
:param first: First record to compare.
:param second: Second record to compare.
:param msg: Optional message to output on assertion.
"""
if msg is None:
msg = "Record instances are equal. Expecting different instances."
self.assertNotEqual(first.export_dict(include_metadata=False, include_id=False),
second.export_dict(include_metadata=False, include_id=False),
msg=msg)
[docs] def assert_mock_meth_has_calls(self, mock_meth, call, *calls):
"""Assert moch_meth was called with arguments.
This calls Mock.assert_has_calls and tests for call count. The actual
benefit of using this method over the built-in assert_has_calls is that
this method tries to pinpoint the actual call that was missing when
assert_has_calls raised AssertionError. This is useful when mock_meth
has had multiple calls. The built-in assert_has_calls will notify of
all calls that the mock_meth has had, while this method will notify of the
actual call that was missing.
:param mock_meth: Mocked method that is target of testing.
:param call: Call that should be found. Instance of :class:`unittest.mock._Call`
Repeat this argument to test for multiple calls.
:raises: :exc:`AssertionError` if calls not found.
"""
exception = None
calls = list((call,) + calls)
self.assertEqual(mock_meth.call_count, len(calls))
try:
mock_meth.assert_has_calls(calls)
except AssertionError as exc:
exception = exc
if exception is None:
return None
# Try to pinpoint the actual call that was not found.
# This will not work properly in all conditions
# so we must make sure to raise the original assertionerror
# unconditionally
try:
for single_call in calls:
mock_meth.assert_has_calls([single_call])
except AssertionError as exc:
raise exc from exception
raise exception
[docs]class KuhaEndToEndTestCase(KuhaUnitTestCase):
"""Base class for end-to-end tests.
* HTTPClient for interacting with Document Store.
* Assertion methods to check returning payload and status codes.
"""
JSON_HEADERS = {'Content-Type': 'application/json'}
POST_FORM_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
unsupported_args = None
http_client = None
[docs] @classmethod
def setUpClass(cls):
if 'KUHA_DS_URL' not in os.environ:
raise unittest.SkipTest("KUHA_DS_URL environment variable is not set.")
try:
conf.get_conf('document_store_url')
except AttributeError:
conf.add_conf('document_store_url', os.environ['KUHA_DS_URL'])
cls.http_client = HTTPClient()
super(KuhaEndToEndTestCase, cls).setUpClass()
[docs] @classmethod
def tearDownClass(cls):
cls.http_client.close()
conf._STORED['conf'] = conf.configargparse.Namespace()
super(KuhaEndToEndTestCase, cls).tearDownClass()
@staticmethod
def settings():
return conf.get_conf()
[docs] @staticmethod
def get_record_url(rec_or_coll, _id=None):
"""Get URL to Document Store records or single record.
:param rec_or_coll: record, record class or collection
:param _id: Optional record ID.
:type _id: str or None
:returns: URL to Document Store collection or single record.
:rtype: str
"""
collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll
if _id is not None:
return '/'.join([conf.get_conf('document_store_url'), collection, _id])
return '/'.join([conf.get_conf('document_store_url'), collection])
[docs] @staticmethod
def get_query_url(rec_or_coll, query_type=None):
"""Get URL to Document Store query endpoint for ``collection``
:param rec_or_coll: Collection to query.
:type rec_or_coll: str, record, or record class
:param query_type: Optional query type
:returns: URL to query endpoint.
:rtype: str
"""
collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll
url = '/'.join([conf.get_conf('document_store_url'), 'query', collection])
if not query_type:
return url
if not Query.is_valid_query_type(query_type):
raise ValueError("Invalid query-type {}".format(query_type))
return url + '?query_type={}'.format(query_type)
# Communication with Document Store
[docs] @classmethod
def GET_to_document_store(cls, rec_or_coll, _id=None): # pylint: disable=C0103
"""GET to Document Store returns record(s).
:param rec_or_coll: record or collection to get.
:param _id: Optional ObjectId. Will take precedence over ``rec_or_coll`` id.
:returns: response body
"""
if _id is None and hasattr(rec_or_coll, 'get_id'):
try:
_id = rec_or_coll.get_id()
except TypeError:
pass
url = cls.get_record_url(rec_or_coll, _id)
response = cls.http_client.fetch(url)
if not response.body:
return None
# Note that if body contains multiple streamed JSON records, json_decode will fail.
return json_decode(response.body)
[docs] @classmethod
def POST_to_document_store(cls, record): # pylint: disable=C0103
"""POST to Document Store creates record.
:param record: Record to post.
:returns: response body
"""
url = cls.get_record_url(record.get_collection())
response = cls.http_client.fetch(url, method='POST',
body=json_encode(record.export_dict(include_metadata=False,
include_id=False)),
headers=cls.JSON_HEADERS)
return json_decode(response.body)
[docs] @classmethod
def DELETE_to_document_store(cls, rec_or_coll=None, _id=None, hard_delete=True): # pylint: disable=C0103
"""DELETE to Document Store deletes record(s).
Call without arguments to delete all records from all collections.
:param str or None rec_or_coll: Collection to delete from.
:param str or None _id: ID of the record to delete.
:param bool hard_delete: Set to False to do a logical delete instead.
:returns: Response body or bodies wrapped in dict ({<collection>: <resp_body>})
"""
if _id is not None and rec_or_coll is None:
raise ValueError("Give collection with record id")
def _url(rec_or_coll, _id=None):
delete_type = 'hard' if hard_delete else 'soft'
return '%s?delete_type=%s' % (cls.get_record_url(rec_or_coll, _id), delete_type)
if rec_or_coll is None:
rval = {}
for coll in COLLECTIONS:
resp = cls.http_client.fetch(_url(coll), method="DELETE")
rval.update({coll: json_decode(resp.body)})
return rval
collection = rec_or_coll.get_collection() if\
hasattr(rec_or_coll, 'get_collection') else\
rec_or_coll
url = _url(collection, _id)
resp = cls.http_client.fetch(url, method="DELETE")
return json_decode(resp.body)
[docs] @classmethod
def query_document_store(cls, rec_or_coll, query, query_type=None):
"""Execute query against Document Store query API.
:param rec_or_coll: Collection to query.
:type rec_or_coll: str or record class or record instance
:param query: Query.
:param query_type: Type of Query.
:returns: query results
:rtype: None if query returned no results, dict for results.
"""
url = cls.get_query_url(rec_or_coll, query_type)
response = cls.http_client.fetch(url, method="POST", body=json_encode(query), headers=cls.JSON_HEADERS)
if not response.body:
return None
# Note that if body contains multiple streamed JSON records, json_decode will fail.
return json_decode(response.body)
[docs] @classmethod
def get_collection_record_count(cls, rec_or_coll, exclude_deleted=True):
"""Return number or records for collection in Document Store.
:param rec_or_coll: Document Store record, Document Store record class or collection.
:param bool exclude_deleted: exclude logically deleted records from count.
:returns: record count in Document Store.
:rtype: int
"""
collection = rec_or_coll.get_collection() if\
hasattr(rec_or_coll, 'get_collection') else\
rec_or_coll
url = cls.get_query_url(collection, Query.query_type_count)
query_dict = {}
if exclude_deleted:
rec = record_by_collection(collection)
query_dict = {
query.Query.k_filter: {
rec._metadata.attr_status.path: {
query.FilterKeyConstants.not_equal: REC_STATUS_DELETED}}}
response = cls.http_client.fetch(url, method="POST", body=json_encode(query_dict), headers=cls.JSON_HEADERS)
_dict = json_decode(response.body)
return _dict['count']
[docs] def assert_document_store_is_empty(self, exclude_deleted=True):
"""Assert Document Store contains no records.
:param bool exclude_deleted: exclude logically deleted records from count.
:raises: :exc:`AssertionError` if Document Store has records.
"""
for coll in COLLECTIONS:
count = self.get_collection_record_count(coll, exclude_deleted=exclude_deleted)
self.assertEqual(count, 0)