Source code for kuha_common.testing.testcases

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Test cases for Kuha
"""
import os
import random
import string
import unittest
import asyncio

from tornado.httpclient import HTTPClient
from tornado.escape import (
    json_encode,
    json_decode
)

from kuha_common import cli_setup

from kuha_common.document_store.query import Query

from kuha_common.document_store.records import (
    COLLECTIONS,
    Study,
    Variable,
    Question,
    StudyGroup
)


[docs]class KuhaUnitTestCase(unittest.TestCase): """Base class for unittests. * Assertion methods to check record equality. * Helper methods to provide access to dummydata. """ #: Override in sublass to lookup dummydata from different directory. dummydata_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'dummydata') @classmethod def setUpClass(cls): cls._dummyvalues = [None] cls.record_lang_1 = cls.set_val('lang_1') cls.record_lang_2 = cls.set_val('lang_2') super(KuhaUnitTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): delattr(cls, 'record_lang_1') delattr(cls, 'record_lang_2') cls._dummyvalues = [None] super(KuhaUnitTestCase, cls).tearDownClass()
[docs] @classmethod def get_dummydata_path(cls, path): """Get absolute path to dummydatafile :param path: Path. Gets turned into an absolute if it isn't :returns: Absolute path. :rtype: str """ if not os.path.isabs(path): path = os.path.abspath(os.path.join(cls.dummydata_dir, path)) return path
[docs] @classmethod def get_dummydata(cls, path): """Get dummydata by reading file from ``path`` :param path: path to file. :returns: Contents of the file. """ path = cls.get_dummydata_path(path) with open(path, 'r') as _f: data = _f.read() return data
[docs] @classmethod def remove_dummyfile_if_exists(cls, path): """Remove dummyfile from ``path`` if it exists. :param path: path to dummyfile. :returns: None """ path = cls.get_dummydata_path(path) if os.path.exists(path): os.remove(path)
[docs] @classmethod def set_val(cls, value): """Assign value as dummyvalue. :param value: Value to assign :returns: value """ if value in cls._dummyvalues: raise ValueError('%s already in use' % (value,)) cls._dummyvalues.append(value) return value
[docs] @classmethod def gen_val(cls, lenght=None, unique=False, chars=None): """Generate & assign dummyvalue. :param lenght: lenght of the value :type lenght: int or None :param unique: should the value be unique :type unique: bool :param chars: use specific characters. :type chars: str or None. :returns: generated value :rtype: str """ lenght = lenght if lenght else random.randint(1, 50) if not chars: whitespace = ' ' chars = string.ascii_letters + whitespace + string.digits def _val(): return ''.join( random.choice(chars) for _ in range(lenght) ) value = _val() if unique: retries = 100 while value in cls._dummyvalues: if not retries: raise ValueError("Maximum retries exceeded in gen_val()") value = _val() retries -= 1 cls._dummyvalues.append(value) return value
[docs] @classmethod def gen_id(cls): """Generate Id. :returns: Generated id. :rtype: str """ return cls.gen_val(10, True, string.ascii_letters + string.digits)
[docs] @classmethod def generate_dummy_study(cls): """Generate and return a Study with dummydata. :returns: study with dummydata :rtype: :obj:`kuha_common.document_store.records.Study` """ study = Study() study.add_study_number(cls.gen_id()) study.add_persistent_identifiers(cls.gen_val()) study.add_persistent_identifiers(cls.gen_val()) study.add_identifiers(cls.gen_val(), cls.record_lang_1, **{Study.identifiers.attr_agency.name: cls.gen_val()}) study.add_identifiers(cls.gen_val(), cls.record_lang_1, **{Study.identifiers.attr_agency.name: cls.gen_val()}) study.add_study_titles(cls.gen_val(), cls.record_lang_1) study.add_parallel_titles(cls.gen_val(), cls.record_lang_1) study.add_principal_investigators(cls.gen_val(), cls.record_lang_1, **{Study.principal_investigators.attr_organization.name: cls.gen_val()}) study.add_publishers(cls.gen_val(), cls.record_lang_1, **{Study.publishers.attr_abbreviation.name: cls.gen_val()}) study.add_distributors(cls.gen_val(), cls.record_lang_1, **{Study.distributors.attr_abbreviation.name: cls.gen_val(), Study.distributors.attr_uri.name: cls.gen_val()}) study.add_document_uris(cls.gen_val(), cls.record_lang_1, **{Study.document_uris.attr_location.name: cls.gen_val(), Study.document_uris.attr_description.name: cls.gen_val()}) study.add_publication_dates(cls.gen_val(), cls.record_lang_1) study.add_publication_years(cls.gen_val(), cls.record_lang_1, **{Study.publication_years.attr_distribution_date.name: cls.gen_val()}) study.add_abstract(cls.gen_val(), cls.record_lang_1) study.add_classifications(cls.gen_val(), cls.record_lang_1, **{Study.classifications.attr_system_name.name: cls.gen_val(), Study.classifications.attr_uri.name: cls.gen_val(), Study.classifications.attr_description.name: cls.gen_val()}) study.add_keywords(cls.gen_val(), cls.record_lang_1, **{Study.keywords.attr_system_name.name: cls.gen_val(), Study.keywords.attr_uri.name: cls.gen_val(), Study.keywords.attr_description.name: cls.gen_val()}) study.add_time_methods(cls.gen_val(), cls.record_lang_1, **{Study.time_methods.attr_system_name.name: cls.gen_val(), Study.time_methods.attr_uri.name: cls.gen_val(), Study.time_methods.attr_description.name: cls.gen_val()}) study.add_sampling_procedures(cls.gen_val(), cls.record_lang_1, **{Study.sampling_procedures.attr_system_name.name: cls.gen_val(), Study.sampling_procedures.attr_uri.name: cls.gen_val(), Study.sampling_procedures.attr_description.name: cls.gen_val()}) study.add_collection_modes(cls.gen_val(), cls.record_lang_1, **{Study.collection_modes.attr_system_name.name: cls.gen_val(), Study.collection_modes.attr_uri.name: cls.gen_val(), Study.collection_modes.attr_description.name: cls.gen_val()}) study.add_analysis_units(cls.gen_val(), cls.record_lang_1, **{Study.analysis_units.attr_system_name.name: cls.gen_val(), Study.analysis_units.attr_uri.name: cls.gen_val(), Study.analysis_units.attr_description.name: cls.gen_val()}) study.add_collection_periods(cls.gen_val(), cls.record_lang_1, **{Study.collection_periods.attr_event.name: cls.gen_val()}) study.add_study_area_countries(cls.gen_val(), cls.record_lang_1, **{Study.study_area_countries.attr_abbreviation.name: cls.gen_val()}) study.add_universes(cls.gen_val(), cls.record_lang_1, **{Study.universes.attr_included.name: True}) study.add_universes(cls.gen_val(), cls.record_lang_1, **{Study.universes.attr_included.name: False}) study.add_data_access(cls.gen_val(), cls.record_lang_1) study.add_data_access_descriptions(cls.gen_val(), cls.record_lang_1) study.add_file_names(cls.gen_val(), cls.record_lang_1) study.add_instruments(cls.gen_val(), cls.record_lang_1, **{Study.instruments.attr_instrument_name.name: cls.gen_val()}) study.add_study_groups(cls.gen_id(), cls.record_lang_1, **{Study.study_groups.attr_name.name: cls.gen_val()}) study.add_copyrights(cls.gen_val(), cls.record_lang_1) return study
[docs] @classmethod def generate_dummy_variable(cls): """Generate and return a Variable with dummydata. :returns: variable with dummydata :rtype: :obj:`kuha_common.document_store.records.Variable` """ var = Variable() var.add_study_number(cls.gen_id()) var.add_variable_name(cls.gen_id()) var.add_question_identifiers(cls.gen_val()) var.add_variable_labels(cls.gen_val(), cls.record_lang_1) var.add_variable_labels(cls.gen_val(), cls.record_lang_2) var.add_codelist_codes(cls.gen_val(), cls.record_lang_1, **{Variable.codelist_codes.attr_label.name: cls.gen_val(), Variable.codelist_codes.attr_missing.name: True}) var.add_codelist_codes(cls.gen_val(), cls.record_lang_1, **{Variable.codelist_codes.attr_label.name: cls.gen_val(), Variable.codelist_codes.attr_missing.name: False}) return var
[docs] @classmethod def generate_dummy_question(cls): """Generate and return a Question with dummydata. :returns: question with dummydata :rtype: :obj:`kuha_common.document_store.records.Question` """ question = Question() question.add_study_number(cls.gen_id()) question.add_question_identifier(cls.gen_id()) question.add_variable_name(cls.gen_val()) question.add_question_texts(cls.gen_val(), cls.record_lang_1) question.add_research_instruments(cls.gen_val(), cls.record_lang_1) question.add_codelist_references(cls.gen_val(), cls.record_lang_1) return question
[docs] @classmethod def generate_dummy_studygroup(cls): """Generate and return a StudyGroup with dummydata. :returns: studygroup with dummydata. :rtype: :obj:`kuha_common.document_store.records.StudyGroup` """ studygroup = StudyGroup() studygroup.add_study_group_identifier(cls.gen_id()) studygroup.add_study_group_names(cls.gen_val(), cls.record_lang_1) studygroup.add_study_numbers(cls.gen_id()) return studygroup
[docs] def setUp(self): """Format testcase values and initialize event loop. Call asynchronous code synchronously:: self._loop.run_until_complete(coro()) """ super().setUp() # get loop so we can call asynchronous code synchronously. # Usage: self._loop.run_until_complete(coro()) self._loop = asyncio.get_event_loop() self._patchers = [] self._stored_result = None
[docs] def tearDown(self): """Stop patchers. """ for patcher in self._patchers: patcher.stop()
[docs] async def await_and_store_result(self, coro): """Await coroutine and store returning result. Example:: self._loop.run_until_complete(self.await_future_and_store_result(coro())) :param coro: Coroutine or Future to await """ self._stored_result = await coro
# Patchers
[docs] def init_patcher(self, patcher): """Initialize patcher, store for later use, return it. :param patcher: Patch to start. :type patcher: :obj:`unittest.mock._patch` :returns: MagicMock acting as patched object. :rtype: :class:`unittest.mock.MagicMock` """ _mock = patcher.start() self._patchers.append(patcher) return _mock
# Assertions
[docs] def assert_records_are_equal(self, first, second, msg=None): """Assert two Document Store records are equal. :param first: First record to compare. :param second: Second record to compare. :param msg: Optional message to output on assertion. """ if msg is None: msg = "Record instances are not equal. Expecting equal." self.assertEqual(first.export_dict(include_metadata=False, include_id=False), second.export_dict(include_metadata=False, include_id=False), msg=msg)
[docs] def assert_records_are_not_equal(self, first, second, msg=None): """Assert two Document Store records are not equal. :param first: First record to compare. :param second: Second record to compare. :param msg: Optional message to output on assertion. """ if msg is None: msg = "Record instances are equal. Expecting different instances." self.assertNotEqual(first.export_dict(include_metadata=False, include_id=False), second.export_dict(include_metadata=False, include_id=False), msg=msg)
[docs] def assert_mock_meth_has_calls(self, mock_meth, call, *calls): """Assert moch_meth was called with arguments. This calls Mock.assert_has_calls and tests for call count. The actual benefit of using this method over the built-in assert_has_calls is that this method tries to pinpoint the actual call that was missing when assert_has_calls raised AssertionError. This is useful when mock_meth has had multiple calls. The built-in assert_has_calls will notify of all calls that the mock_meth has had, while this method will notify of the actual call that was missing. :param mock_meth: Mocked method that is target of testing. :param call: Call that should be found. Instance of :class:`unittest.mock._Call` Repeat this argument to test for multiple calls. :raises: :exc:`AssertionError` if calls not found. """ exception = None calls = list((call,) + calls) self.assertEqual(mock_meth.call_count, len(calls)) try: mock_meth.assert_has_calls(calls) except AssertionError as exc: exception = exc if exception is None: return None # Try to pinpoint the actual call that was not found. # This will not work properly in all conditions # so we must make sure to raise the original assertionerror # unconditionally try: for single_call in calls: mock_meth.assert_has_calls([single_call]) except AssertionError as exc: raise exc from exception raise exception
[docs]class KuhaEndToEndTestCase(KuhaUnitTestCase): """Base class for end-to-end tests. * HTTPClient for interacting with Document Store. * Assertion methods to check returning payload and status codes. """ JSON_HEADERS = {'Content-Type': 'application/json'} POST_FORM_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'} unsupported_args = None http_client = None @classmethod def setUpClass(cls): if not cli_setup.settings.is_settings_loaded(): cls.unsupported_args = cls.load_cli_args(sysexit_to_skiptest=True) cls.http_client = HTTPClient() super(KuhaEndToEndTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): cls.http_client.close() cli_setup.settings = cli_setup.Settings() super(KuhaEndToEndTestCase, cls).tearDownClass()
[docs] @staticmethod def load_cli_args(sysexit_to_skiptest=False): """Load command line arguments. Setup Document Store URL. :param sysexit_to_skiptest: Mask SystemExit as :exc:`unittest.SkipTest`. Useful when missing command line arguments should not terminate the test run, but skip tests requiring the arguments. :type sysexit_to_skiptest: bool :returns: arguments not known to :class:`kuha_common.cli_setup.settings` (= arguments external to Kuha) :rtype: list """ if not cli_setup.settings.is_parser_loaded(): cli_setup.settings.load_parser() cli_setup.add_document_store_url(cli_setup.settings.parser, required=True) try: opts, args = cli_setup.settings.parser.parse_known_args() except SystemExit: if sysexit_to_skiptest: # Unload parser on skiptest here since skipping won't call tearDownClass cli_setup.settings = cli_setup.Settings() raise unittest.SkipTest("Document Store URL not set") raise cli_setup.settings.set(opts) cli_setup.settings.setup_document_store_query() return args
[docs] @staticmethod def get_record_url(rec_or_coll, _id=None): """Get URL to Document Store records or single record. :param rec_or_coll: record, record class or collection :param _id: Optional record ID. :type _id: str or None :returns: URL to Document Store collection or single record. :rtype: str """ collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll if _id is not None: return '/'.join([cli_setup.settings.get().document_store_url, collection, _id]) return '/'.join([cli_setup.settings.get().document_store_url, collection])
[docs] @staticmethod def get_query_url(rec_or_coll, query_type=None): """Get URL to Document Store query endpoint for ``collection`` :param rec_or_coll: Collection to query. :type rec_or_coll: str, record, or record class :param query_type: Optional query type :returns: URL to query endpoint. :rtype: str """ collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll url = '/'.join([cli_setup.settings.get().document_store_url, 'query', collection]) if not query_type: return url if not Query.is_valid_query_type(query_type): raise ValueError("Invalid query-type {}".format(query_type)) return url + '?query_type={}'.format(query_type)
# Communication with Document Store
[docs] @classmethod def GET_to_document_store(cls, rec_or_coll, _id=None): # pylint: disable=C0103 """GET to Document Store returns record(s). :param rec_or_coll: record or collection to get. :param _id: Optional ObjectId. Will take precedence over ``rec_or_coll`` id. :returns: response body """ if _id is None and hasattr(rec_or_coll, 'get_id'): try: _id = rec_or_coll.get_id() except TypeError: pass url = cls.get_record_url(rec_or_coll, _id) response = cls.http_client.fetch(url) if not response.body: return None # Note that if body contains multiple streamed JSON records, json_decode will fail. return json_decode(response.body)
[docs] @classmethod def POST_to_document_store(cls, record): # pylint: disable=C0103 """POST to Document Store creates record. :param record: Record to post. :returns: response body """ url = cls.get_record_url(record.get_collection()) response = cls.http_client.fetch(url, method='POST', body=json_encode(record.export_dict(include_metadata=False, include_id=False)), headers=cls.JSON_HEADERS) return json_decode(response.body)
[docs] @classmethod def DELETE_to_document_store(cls, rec_or_coll=None, _id=None): # pylint: disable=C0103 """DELETE to Document Store deletes record(s). Call without arguments to delete all records from all collections. :param rec_or_coll: Collection to delete from. :type rec_or_coll: str or None :param _id: ID of the record to delete. :type _id: str or None :returns: None """ if _id is not None and rec_or_coll is None: raise ValueError("Give collection with record id") if rec_or_coll is None: for coll in COLLECTIONS: cls.http_client.fetch(cls.get_record_url(coll), method="DELETE") return collection = rec_or_coll.get_collection() if\ hasattr(rec_or_coll, 'get_collection') else\ rec_or_coll url = cls.get_record_url(collection, _id) cls.http_client.fetch(url, method="DELETE") return
[docs] @classmethod def query_document_store(cls, rec_or_coll, query, query_type=None): """Execute query against Document Store query API. :param rec_or_coll: Collection to query. :type rec_or_coll: str or record class or record instance :param query: Query. :param query_type: Type of Query. :returns: query results :rtype: None if query returned no results, dict for results. """ url = cls.get_query_url(rec_or_coll, query_type) response = cls.http_client.fetch(url, method="POST", body=json_encode(query), headers=cls.JSON_HEADERS) if not response.body: return None # Note that if body contains multiple streamed JSON records, json_decode will fail. return json_decode(response.body)
[docs] @classmethod def get_collection_record_count(cls, rec_or_coll): """Return number or records for collection in Document Store. :param rec_or_coll: Document Store record, Document Store record class or collection. :returns: record count in Document Store. :rtype: int """ collection = rec_or_coll.get_collection() if\ hasattr(rec_or_coll, 'get_collection') else\ rec_or_coll url = cls.get_query_url(collection, Query.query_type_count) response = cls.http_client.fetch(url, method="POST", body=json_encode({}), headers=cls.JSON_HEADERS) _dict = json_decode(response.body) return _dict['count']
[docs] def assert_document_store_is_empty(self): """Assert Document Store contains no records. :raises: :exc:`AssertionError` if Document Store has records. """ for coll in COLLECTIONS: count = self.get_collection_record_count(coll) self.assertEqual(count, 0)