Source code for kuha_common.testing.testcases

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Test cases for Kuha
import os
import random
import string
import unittest
import asyncio

from tornado.httpclient import HTTPClient
from tornado.escape import (

from kuha_common import cli_setup

from kuha_common.document_store.query import Query

from kuha_common.document_store.records import (

[docs]class KuhaUnitTestCase(unittest.TestCase): """Base class for unittests. * Assertion methods to check record equality. * Helper methods to provide access to dummydata. """ #: Override in sublass to lookup dummydata from different directory. dummydata_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'dummydata') @classmethod def setUpClass(cls): cls._dummyvalues = [None] cls.record_lang_1 = cls.set_val('lang_1') cls.record_lang_2 = cls.set_val('lang_2') super(KuhaUnitTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): delattr(cls, 'record_lang_1') delattr(cls, 'record_lang_2') cls._dummyvalues = [None] super(KuhaUnitTestCase, cls).tearDownClass()
[docs] @classmethod def get_dummydata_path(cls, path): """Get absolute path to dummydatafile :param path: Path. Gets turned into an absolute if it isn't :returns: Absolute path. :rtype: str """ if not os.path.isabs(path): path = os.path.abspath(os.path.join(cls.dummydata_dir, path)) return path
[docs] @classmethod def get_dummydata(cls, path): """Get dummydata by reading file from ``path`` :param path: path to file. :returns: Contents of the file. """ path = cls.get_dummydata_path(path) with open(path, 'r') as _f: data = return data
[docs] @classmethod def remove_dummyfile_if_exists(cls, path): """Remove dummyfile from ``path`` if it exists. :param path: path to dummyfile. :returns: None """ path = cls.get_dummydata_path(path) if os.path.exists(path): os.remove(path)
[docs] @classmethod def set_val(cls, value): """Assign value as dummyvalue. :param value: Value to assign :returns: value """ if value in cls._dummyvalues: raise ValueError('%s already in use' % (value,)) cls._dummyvalues.append(value) return value
[docs] @classmethod def gen_val(cls, lenght=None, unique=False, chars=None): """Generate & assign dummyvalue. :param lenght: lenght of the value :type lenght: int or None :param unique: should the value be unique :type unique: bool :param chars: use specific characters. :type chars: str or None. :returns: generated value :rtype: str """ lenght = lenght if lenght else random.randint(1, 50) if not chars: whitespace = ' ' chars = string.ascii_letters + whitespace + string.digits def _val(): return ''.join( random.choice(chars) for _ in range(lenght) ) value = _val() if unique: retries = 100 while value in cls._dummyvalues: if not retries: raise ValueError("Maximum retries exceeded in gen_val()") value = _val() retries -= 1 cls._dummyvalues.append(value) return value
[docs] @classmethod def gen_id(cls): """Generate Id. :returns: Generated id. :rtype: str """ return cls.gen_val(10, True, string.ascii_letters + string.digits)
[docs] @classmethod def generate_dummy_study(cls): """Generate and return a Study with dummydata. :returns: study with dummydata :rtype: :obj:`kuha_common.document_store.records.Study` """ study = Study() study.add_study_number(cls.gen_id()) study.add_persistent_identifiers(cls.gen_val()) study.add_persistent_identifiers(cls.gen_val()) study.add_identifiers(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_identifiers(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_study_titles(cls.gen_val(), cls.record_lang_1) study.add_parallel_titles(cls.gen_val(), cls.record_lang_1) study.add_principal_investigators(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_publishers(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_distributors(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val()}) study.add_document_uris(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val()}) study.add_publication_dates(cls.gen_val(), cls.record_lang_1) study.add_publication_years(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_abstract(cls.gen_val(), cls.record_lang_1) study.add_classifications(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val(), cls.gen_val()}) study.add_keywords(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val(), cls.gen_val()}) study.add_time_methods(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val(), cls.gen_val()}) study.add_sampling_procedures(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val(), cls.gen_val()}) study.add_collection_modes(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val(), cls.gen_val()}) study.add_analysis_units(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), cls.gen_val(), cls.gen_val()}) study.add_collection_periods(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_study_area_countries(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_universes(cls.gen_val(), cls.record_lang_1, **{ True}) study.add_universes(cls.gen_val(), cls.record_lang_1, **{ False}) study.add_data_access(cls.gen_val(), cls.record_lang_1) study.add_data_access_descriptions(cls.gen_val(), cls.record_lang_1) study.add_file_names(cls.gen_val(), cls.record_lang_1) study.add_instruments(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val()}) study.add_study_groups(cls.gen_id(), cls.record_lang_1, **{ cls.gen_val()}) study.add_copyrights(cls.gen_val(), cls.record_lang_1) return study
[docs] @classmethod def generate_dummy_variable(cls): """Generate and return a Variable with dummydata. :returns: variable with dummydata :rtype: :obj:`kuha_common.document_store.records.Variable` """ var = Variable() var.add_study_number(cls.gen_id()) var.add_variable_name(cls.gen_id()) var.add_question_identifiers(cls.gen_val()) var.add_variable_labels(cls.gen_val(), cls.record_lang_1) var.add_variable_labels(cls.gen_val(), cls.record_lang_2) var.add_codelist_codes(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), True}) var.add_codelist_codes(cls.gen_val(), cls.record_lang_1, **{ cls.gen_val(), False}) return var
[docs] @classmethod def generate_dummy_question(cls): """Generate and return a Question with dummydata. :returns: question with dummydata :rtype: :obj:`kuha_common.document_store.records.Question` """ question = Question() question.add_study_number(cls.gen_id()) question.add_question_identifier(cls.gen_id()) question.add_variable_name(cls.gen_val()) question.add_question_texts(cls.gen_val(), cls.record_lang_1) question.add_research_instruments(cls.gen_val(), cls.record_lang_1) question.add_codelist_references(cls.gen_val(), cls.record_lang_1) return question
[docs] @classmethod def generate_dummy_studygroup(cls): """Generate and return a StudyGroup with dummydata. :returns: studygroup with dummydata. :rtype: :obj:`kuha_common.document_store.records.StudyGroup` """ studygroup = StudyGroup() studygroup.add_study_group_identifier(cls.gen_id()) studygroup.add_study_group_names(cls.gen_val(), cls.record_lang_1) studygroup.add_study_numbers(cls.gen_id()) return studygroup
[docs] def setUp(self): """Format testcase values and initialize event loop. Call asynchronous code synchronously:: self._loop.run_until_complete(coro()) """ super().setUp() # get loop so we can call asynchronous code synchronously. # Usage: self._loop.run_until_complete(coro()) self._loop = asyncio.get_event_loop() self._patchers = [] self._stored_result = None
[docs] def tearDown(self): """Stop patchers. """ for patcher in self._patchers: patcher.stop()
[docs] async def await_and_store_result(self, coro): """Await coroutine and store returning result. Example:: self._loop.run_until_complete(self.await_future_and_store_result(coro())) :param coro: Coroutine or Future to await """ self._stored_result = await coro
# Patchers
[docs] def init_patcher(self, patcher): """Initialize patcher, store for later use, return it. :param patcher: Patch to start. :type patcher: :obj:`unittest.mock._patch` :returns: MagicMock acting as patched object. :rtype: :class:`unittest.mock.MagicMock` """ _mock = patcher.start() self._patchers.append(patcher) return _mock
# Assertions
[docs] def assert_records_are_equal(self, first, second, msg=None): """Assert two Document Store records are equal. :param first: First record to compare. :param second: Second record to compare. :param msg: Optional message to output on assertion. """ if msg is None: msg = "Record instances are not equal. Expecting equal." self.assertEqual(first.export_dict(include_metadata=False, include_id=False), second.export_dict(include_metadata=False, include_id=False), msg=msg)
[docs] def assert_records_are_not_equal(self, first, second, msg=None): """Assert two Document Store records are not equal. :param first: First record to compare. :param second: Second record to compare. :param msg: Optional message to output on assertion. """ if msg is None: msg = "Record instances are equal. Expecting different instances." self.assertNotEqual(first.export_dict(include_metadata=False, include_id=False), second.export_dict(include_metadata=False, include_id=False), msg=msg)
[docs] def assert_mock_meth_has_calls(self, mock_meth, call, *calls): """Assert moch_meth was called with arguments. This calls Mock.assert_has_calls and tests for call count. The actual benefit of using this method over the built-in assert_has_calls is that this method tries to pinpoint the actual call that was missing when assert_has_calls raised AssertionError. This is useful when mock_meth has had multiple calls. The built-in assert_has_calls will notify of all calls that the mock_meth has had, while this method will notify of the actual call that was missing. :param mock_meth: Mocked method that is target of testing. :param call: Call that should be found. Instance of :class:`unittest.mock._Call` Repeat this argument to test for multiple calls. :raises: :exc:`AssertionError` if calls not found. """ exception = None calls = list((call,) + calls) self.assertEqual(mock_meth.call_count, len(calls)) try: mock_meth.assert_has_calls(calls) except AssertionError as exc: exception = exc if exception is None: return None # Try to pinpoint the actual call that was not found. # This will not work properly in all conditions # so we must make sure to raise the original assertionerror # unconditionally try: for single_call in calls: mock_meth.assert_has_calls([single_call]) except AssertionError as exc: raise exc from exception raise exception
[docs]class KuhaEndToEndTestCase(KuhaUnitTestCase): """Base class for end-to-end tests. * HTTPClient for interacting with Document Store. * Assertion methods to check returning payload and status codes. """ JSON_HEADERS = {'Content-Type': 'application/json'} POST_FORM_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'} unsupported_args = None http_client = None @classmethod def setUpClass(cls): if not cli_setup.settings.is_settings_loaded(): cls.unsupported_args = cls.load_cli_args(sysexit_to_skiptest=True) cls.http_client = HTTPClient() super(KuhaEndToEndTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): cls.http_client.close() cli_setup.settings = cli_setup.Settings() super(KuhaEndToEndTestCase, cls).tearDownClass()
[docs] @staticmethod def load_cli_args(sysexit_to_skiptest=False): """Load command line arguments. Setup Document Store URL. :param sysexit_to_skiptest: Mask SystemExit as :exc:`unittest.SkipTest`. Useful when missing command line arguments should not terminate the test run, but skip tests requiring the arguments. :type sysexit_to_skiptest: bool :returns: arguments not known to :class:`kuha_common.cli_setup.settings` (= arguments external to Kuha) :rtype: list """ if not cli_setup.settings.is_parser_loaded(): cli_setup.settings.load_parser() cli_setup.add_document_store_url(cli_setup.settings.parser, required=True) try: opts, args = cli_setup.settings.parser.parse_known_args() except SystemExit: if sysexit_to_skiptest: # Unload parser on skiptest here since skipping won't call tearDownClass cli_setup.settings = cli_setup.Settings() raise unittest.SkipTest("Document Store URL not set") raise cli_setup.settings.set(opts) cli_setup.settings.setup_document_store_query() return args
[docs] @staticmethod def get_record_url(rec_or_coll, _id=None): """Get URL to Document Store records or single record. :param rec_or_coll: record, record class or collection :param _id: Optional record ID. :type _id: str or None :returns: URL to Document Store collection or single record. :rtype: str """ collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll if _id is not None: return '/'.join([cli_setup.settings.get().document_store_url, collection, _id]) return '/'.join([cli_setup.settings.get().document_store_url, collection])
[docs] @staticmethod def get_query_url(rec_or_coll, query_type=None): """Get URL to Document Store query endpoint for ``collection`` :param rec_or_coll: Collection to query. :type rec_or_coll: str, record, or record class :param query_type: Optional query type :returns: URL to query endpoint. :rtype: str """ collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll url = '/'.join([cli_setup.settings.get().document_store_url, 'query', collection]) if not query_type: return url if not Query.is_valid_query_type(query_type): raise ValueError("Invalid query-type {}".format(query_type)) return url + '?query_type={}'.format(query_type)
# Communication with Document Store
[docs] @classmethod def GET_to_document_store(cls, rec_or_coll, _id=None): # pylint: disable=C0103 """GET to Document Store returns record(s). :param rec_or_coll: record or collection to get. :param _id: Optional ObjectId. Will take precedence over ``rec_or_coll`` id. :returns: response body """ if _id is None and hasattr(rec_or_coll, 'get_id'): try: _id = rec_or_coll.get_id() except TypeError: pass url = cls.get_record_url(rec_or_coll, _id) response = cls.http_client.fetch(url) if not response.body: return None # Note that if body contains multiple streamed JSON records, json_decode will fail. return json_decode(response.body)
[docs] @classmethod def POST_to_document_store(cls, record): # pylint: disable=C0103 """POST to Document Store creates record. :param record: Record to post. :returns: response body """ url = cls.get_record_url(record.get_collection()) response = cls.http_client.fetch(url, method='POST', body=json_encode(record.export_dict(include_metadata=False, include_id=False)), headers=cls.JSON_HEADERS) return json_decode(response.body)
[docs] @classmethod def DELETE_to_document_store(cls, rec_or_coll=None, _id=None): # pylint: disable=C0103 """DELETE to Document Store deletes record(s). Call without arguments to delete all records from all collections. :param rec_or_coll: Collection to delete from. :type rec_or_coll: str or None :param _id: ID of the record to delete. :type _id: str or None :returns: None """ if _id is not None and rec_or_coll is None: raise ValueError("Give collection with record id") if rec_or_coll is None: for coll in COLLECTIONS: cls.http_client.fetch(cls.get_record_url(coll), method="DELETE") return collection = rec_or_coll.get_collection() if\ hasattr(rec_or_coll, 'get_collection') else\ rec_or_coll url = cls.get_record_url(collection, _id) cls.http_client.fetch(url, method="DELETE") return
[docs] @classmethod def query_document_store(cls, rec_or_coll, query, query_type=None): """Execute query against Document Store query API. :param rec_or_coll: Collection to query. :type rec_or_coll: str or record class or record instance :param query: Query. :param query_type: Type of Query. :returns: query results :rtype: None if query returned no results, dict for results. """ url = cls.get_query_url(rec_or_coll, query_type) response = cls.http_client.fetch(url, method="POST", body=json_encode(query), headers=cls.JSON_HEADERS) if not response.body: return None # Note that if body contains multiple streamed JSON records, json_decode will fail. return json_decode(response.body)
[docs] @classmethod def get_collection_record_count(cls, rec_or_coll): """Return number or records for collection in Document Store. :param rec_or_coll: Document Store record, Document Store record class or collection. :returns: record count in Document Store. :rtype: int """ collection = rec_or_coll.get_collection() if\ hasattr(rec_or_coll, 'get_collection') else\ rec_or_coll url = cls.get_query_url(collection, Query.query_type_count) response = cls.http_client.fetch(url, method="POST", body=json_encode({}), headers=cls.JSON_HEADERS) _dict = json_decode(response.body) return _dict['count']
[docs] def assert_document_store_is_empty(self): """Assert Document Store contains no records. :raises: :exc:`AssertionError` if Document Store has records. """ for coll in COLLECTIONS: count = self.get_collection_record_count(coll) self.assertEqual(count, 0)