#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Test cases for Kuha
"""
import os
import random
import string
import unittest
import asyncio
from tornado.httpclient import HTTPClient
from tornado.escape import (
json_encode,
json_decode
)
from kuha_common import cli_setup
from kuha_common.document_store.query import Query
from kuha_common.document_store.records import (
COLLECTIONS,
Study,
Variable,
Question,
StudyGroup
)
[docs]class KuhaUnitTestCase(unittest.TestCase):
"""Base class for unittests.
* Assertion methods to check record equality.
* Helper methods to provide access to dummydata.
"""
#: Override in sublass to lookup dummydata from different directory.
dummydata_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'dummydata')
@classmethod
def setUpClass(cls):
cls._dummyvalues = [None]
cls.record_lang_1 = cls.set_val('lang_1')
cls.record_lang_2 = cls.set_val('lang_2')
super(KuhaUnitTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
delattr(cls, 'record_lang_1')
delattr(cls, 'record_lang_2')
cls._dummyvalues = [None]
super(KuhaUnitTestCase, cls).tearDownClass()
[docs] @classmethod
def get_dummydata_path(cls, path):
"""Get absolute path to dummydatafile
:param path: Path. Gets turned into an absolute if it isn't
:returns: Absolute path.
:rtype: str
"""
if not os.path.isabs(path):
path = os.path.abspath(os.path.join(cls.dummydata_dir, path))
return path
[docs] @classmethod
def get_dummydata(cls, path):
"""Get dummydata by reading file from ``path``
:param path: path to file.
:returns: Contents of the file.
"""
path = cls.get_dummydata_path(path)
with open(path, 'r') as _f:
data = _f.read()
return data
[docs] @classmethod
def remove_dummyfile_if_exists(cls, path):
"""Remove dummyfile from ``path`` if it exists.
:param path: path to dummyfile.
:returns: None
"""
path = cls.get_dummydata_path(path)
if os.path.exists(path):
os.remove(path)
[docs] @classmethod
def set_val(cls, value):
"""Assign value as dummyvalue.
:param value: Value to assign
:returns: value
"""
if value in cls._dummyvalues:
raise ValueError('%s already in use' % (value,))
cls._dummyvalues.append(value)
return value
[docs] @classmethod
def gen_val(cls, lenght=None, unique=False, chars=None):
"""Generate & assign dummyvalue.
:param lenght: lenght of the value
:type lenght: int or None
:param unique: should the value be unique
:type unique: bool
:param chars: use specific characters.
:type chars: str or None.
:returns: generated value
:rtype: str
"""
lenght = lenght if lenght else random.randint(1, 50)
if not chars:
whitespace = ' '
chars = string.ascii_letters + whitespace + string.digits
def _val():
return ''.join(
random.choice(chars) for _ in range(lenght)
)
value = _val()
if unique:
retries = 100
while value in cls._dummyvalues:
if not retries:
raise ValueError("Maximum retries exceeded in gen_val()")
value = _val()
retries -= 1
cls._dummyvalues.append(value)
return value
[docs] @classmethod
def gen_id(cls):
"""Generate Id.
:returns: Generated id.
:rtype: str
"""
return cls.gen_val(10, True, string.ascii_letters + string.digits)
[docs] @classmethod
def generate_dummy_study(cls):
"""Generate and return a Study with dummydata.
:returns: study with dummydata
:rtype: :obj:`kuha_common.document_store.records.Study`
"""
study = Study()
study.add_study_number(cls.gen_id())
study.add_persistent_identifiers(cls.gen_val())
study.add_persistent_identifiers(cls.gen_val())
study.add_identifiers(cls.gen_val(), cls.record_lang_1,
**{Study.identifiers.attr_agency.name: cls.gen_val()})
study.add_identifiers(cls.gen_val(), cls.record_lang_1,
**{Study.identifiers.attr_agency.name: cls.gen_val()})
study.add_study_titles(cls.gen_val(), cls.record_lang_1)
study.add_parallel_titles(cls.gen_val(), cls.record_lang_1)
study.add_principal_investigators(cls.gen_val(), cls.record_lang_1,
**{Study.principal_investigators.attr_organization.name: cls.gen_val()})
study.add_publishers(cls.gen_val(), cls.record_lang_1,
**{Study.publishers.attr_abbreviation.name: cls.gen_val()})
study.add_distributors(cls.gen_val(), cls.record_lang_1,
**{Study.distributors.attr_abbreviation.name: cls.gen_val(),
Study.distributors.attr_uri.name: cls.gen_val()})
study.add_document_uris(cls.gen_val(), cls.record_lang_1,
**{Study.document_uris.attr_location.name: cls.gen_val(),
Study.document_uris.attr_description.name: cls.gen_val()})
study.add_publication_dates(cls.gen_val(), cls.record_lang_1)
study.add_publication_years(cls.gen_val(), cls.record_lang_1,
**{Study.publication_years.attr_distribution_date.name: cls.gen_val()})
study.add_abstract(cls.gen_val(), cls.record_lang_1)
study.add_classifications(cls.gen_val(), cls.record_lang_1,
**{Study.classifications.attr_system_name.name: cls.gen_val(),
Study.classifications.attr_uri.name: cls.gen_val(),
Study.classifications.attr_description.name: cls.gen_val()})
study.add_keywords(cls.gen_val(), cls.record_lang_1,
**{Study.keywords.attr_system_name.name: cls.gen_val(),
Study.keywords.attr_uri.name: cls.gen_val(),
Study.keywords.attr_description.name: cls.gen_val()})
study.add_time_methods(cls.gen_val(), cls.record_lang_1,
**{Study.time_methods.attr_system_name.name: cls.gen_val(),
Study.time_methods.attr_uri.name: cls.gen_val(),
Study.time_methods.attr_description.name: cls.gen_val()})
study.add_sampling_procedures(cls.gen_val(), cls.record_lang_1,
**{Study.sampling_procedures.attr_system_name.name: cls.gen_val(),
Study.sampling_procedures.attr_uri.name: cls.gen_val(),
Study.sampling_procedures.attr_description.name: cls.gen_val()})
study.add_collection_modes(cls.gen_val(), cls.record_lang_1,
**{Study.collection_modes.attr_system_name.name: cls.gen_val(),
Study.collection_modes.attr_uri.name: cls.gen_val(),
Study.collection_modes.attr_description.name: cls.gen_val()})
study.add_analysis_units(cls.gen_val(), cls.record_lang_1,
**{Study.analysis_units.attr_system_name.name: cls.gen_val(),
Study.analysis_units.attr_uri.name: cls.gen_val(),
Study.analysis_units.attr_description.name: cls.gen_val()})
study.add_collection_periods(cls.gen_val(), cls.record_lang_1,
**{Study.collection_periods.attr_event.name: cls.gen_val()})
study.add_study_area_countries(cls.gen_val(), cls.record_lang_1,
**{Study.study_area_countries.attr_abbreviation.name: cls.gen_val()})
study.add_universes(cls.gen_val(), cls.record_lang_1,
**{Study.universes.attr_included.name: True})
study.add_universes(cls.gen_val(), cls.record_lang_1,
**{Study.universes.attr_included.name: False})
study.add_data_access(cls.gen_val(), cls.record_lang_1)
study.add_data_access_descriptions(cls.gen_val(), cls.record_lang_1)
study.add_file_names(cls.gen_val(), cls.record_lang_1)
study.add_instruments(cls.gen_val(), cls.record_lang_1,
**{Study.instruments.attr_instrument_name.name: cls.gen_val()})
study.add_study_groups(cls.gen_id(), cls.record_lang_1,
**{Study.study_groups.attr_name.name: cls.gen_val()})
study.add_copyrights(cls.gen_val(), cls.record_lang_1)
return study
[docs] @classmethod
def generate_dummy_variable(cls):
"""Generate and return a Variable with dummydata.
:returns: variable with dummydata
:rtype: :obj:`kuha_common.document_store.records.Variable`
"""
var = Variable()
var.add_study_number(cls.gen_id())
var.add_variable_name(cls.gen_id())
var.add_question_identifiers(cls.gen_val())
var.add_variable_labels(cls.gen_val(), cls.record_lang_1)
var.add_variable_labels(cls.gen_val(), cls.record_lang_2)
var.add_codelist_codes(cls.gen_val(), cls.record_lang_1,
**{Variable.codelist_codes.attr_label.name: cls.gen_val(),
Variable.codelist_codes.attr_missing.name: True})
var.add_codelist_codes(cls.gen_val(), cls.record_lang_1,
**{Variable.codelist_codes.attr_label.name: cls.gen_val(),
Variable.codelist_codes.attr_missing.name: False})
return var
[docs] @classmethod
def generate_dummy_question(cls):
"""Generate and return a Question with dummydata.
:returns: question with dummydata
:rtype: :obj:`kuha_common.document_store.records.Question`
"""
question = Question()
question.add_study_number(cls.gen_id())
question.add_question_identifier(cls.gen_id())
question.add_variable_name(cls.gen_val())
question.add_question_texts(cls.gen_val(), cls.record_lang_1)
question.add_research_instruments(cls.gen_val(), cls.record_lang_1)
question.add_codelist_references(cls.gen_val(), cls.record_lang_1)
return question
[docs] @classmethod
def generate_dummy_studygroup(cls):
"""Generate and return a StudyGroup with dummydata.
:returns: studygroup with dummydata.
:rtype: :obj:`kuha_common.document_store.records.StudyGroup`
"""
studygroup = StudyGroup()
studygroup.add_study_group_identifier(cls.gen_id())
studygroup.add_study_group_names(cls.gen_val(), cls.record_lang_1)
studygroup.add_study_numbers(cls.gen_id())
return studygroup
[docs] def setUp(self):
"""Format testcase values and initialize event loop.
Call asynchronous code synchronously::
self._loop.run_until_complete(coro())
"""
super().setUp()
# get loop so we can call asynchronous code synchronously.
# Usage: self._loop.run_until_complete(coro())
self._loop = asyncio.get_event_loop()
self._patchers = []
self._stored_result = None
[docs] def tearDown(self):
"""Stop patchers.
"""
for patcher in self._patchers:
patcher.stop()
[docs] async def await_and_store_result(self, coro):
"""Await coroutine and store returning result.
Example::
self._loop.run_until_complete(self.await_future_and_store_result(coro()))
:param coro: Coroutine or Future to await
"""
self._stored_result = await coro
# Patchers
[docs] def init_patcher(self, patcher):
"""Initialize patcher, store for later use, return it.
:param patcher: Patch to start.
:type patcher: :obj:`unittest.mock._patch`
:returns: MagicMock acting as patched object.
:rtype: :class:`unittest.mock.MagicMock`
"""
_mock = patcher.start()
self._patchers.append(patcher)
return _mock
# Assertions
[docs] def assert_records_are_equal(self, first, second, msg=None):
"""Assert two Document Store records are equal.
:param first: First record to compare.
:param second: Second record to compare.
:param msg: Optional message to output on assertion.
"""
if msg is None:
msg = "Record instances are not equal. Expecting equal."
self.assertEqual(first.export_dict(include_metadata=False, include_id=False),
second.export_dict(include_metadata=False, include_id=False),
msg=msg)
[docs] def assert_records_are_not_equal(self, first, second, msg=None):
"""Assert two Document Store records are not equal.
:param first: First record to compare.
:param second: Second record to compare.
:param msg: Optional message to output on assertion.
"""
if msg is None:
msg = "Record instances are equal. Expecting different instances."
self.assertNotEqual(first.export_dict(include_metadata=False, include_id=False),
second.export_dict(include_metadata=False, include_id=False),
msg=msg)
[docs] def assert_mock_meth_has_calls(self, mock_meth, call, *calls):
"""Assert moch_meth was called with arguments.
This calls Mock.assert_has_calls and tests for call count. The actual
benefit of using this method over the built-in assert_has_calls is that
this method tries to pinpoint the actual call that was missing when
assert_has_calls raised AssertionError. This is useful when mock_meth
has had multiple calls. The built-in assert_has_calls will notify of
all calls that the mock_meth has had, while this method will notify of the
actual call that was missing.
:param mock_meth: Mocked method that is target of testing.
:param call: Call that should be found. Instance of :class:`unittest.mock._Call`
Repeat this argument to test for multiple calls.
:raises: :exc:`AssertionError` if calls not found.
"""
exception = None
calls = list((call,) + calls)
self.assertEqual(mock_meth.call_count, len(calls))
try:
mock_meth.assert_has_calls(calls)
except AssertionError as exc:
exception = exc
if exception is None:
return None
# Try to pinpoint the actual call that was not found.
# This will not work properly in all conditions
# so we must make sure to raise the original assertionerror
# unconditionally
try:
for single_call in calls:
mock_meth.assert_has_calls([single_call])
except AssertionError as exc:
raise exc from exception
raise exception
[docs]class KuhaEndToEndTestCase(KuhaUnitTestCase):
"""Base class for end-to-end tests.
* HTTPClient for interacting with Document Store.
* Assertion methods to check returning payload and status codes.
"""
JSON_HEADERS = {'Content-Type': 'application/json'}
POST_FORM_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'}
unsupported_args = None
http_client = None
@classmethod
def setUpClass(cls):
if not cli_setup.settings.is_settings_loaded():
cls.unsupported_args = cls.load_cli_args(sysexit_to_skiptest=True)
cls.http_client = HTTPClient()
super(KuhaEndToEndTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
cls.http_client.close()
cli_setup.settings = cli_setup.Settings()
super(KuhaEndToEndTestCase, cls).tearDownClass()
[docs] @staticmethod
def load_cli_args(sysexit_to_skiptest=False):
"""Load command line arguments. Setup Document Store URL.
:param sysexit_to_skiptest: Mask SystemExit as :exc:`unittest.SkipTest`.
Useful when missing command line arguments should not terminate
the test run, but skip tests requiring the arguments.
:type sysexit_to_skiptest: bool
:returns: arguments not known to :class:`kuha_common.cli_setup.settings` (= arguments external to Kuha)
:rtype: list
"""
if not cli_setup.settings.is_parser_loaded():
cli_setup.settings.load_parser()
cli_setup.add_document_store_url(cli_setup.settings.parser, required=True)
try:
opts, args = cli_setup.settings.parser.parse_known_args()
except SystemExit:
if sysexit_to_skiptest:
# Unload parser on skiptest here since skipping won't call tearDownClass
cli_setup.settings = cli_setup.Settings()
raise unittest.SkipTest("Document Store URL not set")
raise
cli_setup.settings.set(opts)
cli_setup.settings.setup_document_store_query()
return args
[docs] @staticmethod
def get_record_url(rec_or_coll, _id=None):
"""Get URL to Document Store records or single record.
:param rec_or_coll: record, record class or collection
:param _id: Optional record ID.
:type _id: str or None
:returns: URL to Document Store collection or single record.
:rtype: str
"""
collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll
if _id is not None:
return '/'.join([cli_setup.settings.get().document_store_url, collection, _id])
return '/'.join([cli_setup.settings.get().document_store_url, collection])
[docs] @staticmethod
def get_query_url(rec_or_coll, query_type=None):
"""Get URL to Document Store query endpoint for ``collection``
:param rec_or_coll: Collection to query.
:type rec_or_coll: str, record, or record class
:param query_type: Optional query type
:returns: URL to query endpoint.
:rtype: str
"""
collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll
url = '/'.join([cli_setup.settings.get().document_store_url, 'query', collection])
if not query_type:
return url
if not Query.is_valid_query_type(query_type):
raise ValueError("Invalid query-type {}".format(query_type))
return url + '?query_type={}'.format(query_type)
# Communication with Document Store
[docs] @classmethod
def GET_to_document_store(cls, rec_or_coll, _id=None): # pylint: disable=C0103
"""GET to Document Store returns record(s).
:param rec_or_coll: record or collection to get.
:param _id: Optional ObjectId. Will take precedence over ``rec_or_coll`` id.
:returns: response body
"""
if _id is None and hasattr(rec_or_coll, 'get_id'):
try:
_id = rec_or_coll.get_id()
except TypeError:
pass
url = cls.get_record_url(rec_or_coll, _id)
response = cls.http_client.fetch(url)
if not response.body:
return None
# Note that if body contains multiple streamed JSON records, json_decode will fail.
return json_decode(response.body)
[docs] @classmethod
def POST_to_document_store(cls, record): # pylint: disable=C0103
"""POST to Document Store creates record.
:param record: Record to post.
:returns: response body
"""
url = cls.get_record_url(record.get_collection())
response = cls.http_client.fetch(url, method='POST',
body=json_encode(record.export_dict(include_metadata=False,
include_id=False)),
headers=cls.JSON_HEADERS)
return json_decode(response.body)
[docs] @classmethod
def DELETE_to_document_store(cls, rec_or_coll=None, _id=None): # pylint: disable=C0103
"""DELETE to Document Store deletes record(s).
Call without arguments to delete all records from all collections.
:param rec_or_coll: Collection to delete from.
:type rec_or_coll: str or None
:param _id: ID of the record to delete.
:type _id: str or None
:returns: None
"""
if _id is not None and rec_or_coll is None:
raise ValueError("Give collection with record id")
if rec_or_coll is None:
for coll in COLLECTIONS:
cls.http_client.fetch(cls.get_record_url(coll), method="DELETE")
return
collection = rec_or_coll.get_collection() if\
hasattr(rec_or_coll, 'get_collection') else\
rec_or_coll
url = cls.get_record_url(collection, _id)
cls.http_client.fetch(url, method="DELETE")
return
[docs] @classmethod
def query_document_store(cls, rec_or_coll, query, query_type=None):
"""Execute query against Document Store query API.
:param rec_or_coll: Collection to query.
:type rec_or_coll: str or record class or record instance
:param query: Query.
:param query_type: Type of Query.
:returns: query results
:rtype: None if query returned no results, dict for results.
"""
url = cls.get_query_url(rec_or_coll, query_type)
response = cls.http_client.fetch(url, method="POST", body=json_encode(query), headers=cls.JSON_HEADERS)
if not response.body:
return None
# Note that if body contains multiple streamed JSON records, json_decode will fail.
return json_decode(response.body)
[docs] @classmethod
def get_collection_record_count(cls, rec_or_coll):
"""Return number or records for collection in Document Store.
:param rec_or_coll: Document Store record, Document Store record class or collection.
:returns: record count in Document Store.
:rtype: int
"""
collection = rec_or_coll.get_collection() if\
hasattr(rec_or_coll, 'get_collection') else\
rec_or_coll
url = cls.get_query_url(collection, Query.query_type_count)
response = cls.http_client.fetch(url, method="POST", body=json_encode({}), headers=cls.JSON_HEADERS)
_dict = json_decode(response.body)
return _dict['count']
[docs] def assert_document_store_is_empty(self):
"""Assert Document Store contains no records.
:raises: :exc:`AssertionError` if Document Store has records.
"""
for coll in COLLECTIONS:
count = self.get_collection_record_count(coll)
self.assertEqual(count, 0)