Source code for kuha_common.testing.testcases

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Test cases for Kuha
"""
import os
import random
import string
import unittest
import asyncio
import copy

from tornado.httpclient import HTTPClient
from tornado.escape import (
    json_encode,
    json_decode
)

from kuha_common import (
    conf,
    server
)
from kuha_common.document_store import (
    query,
    client
)
from kuha_common.document_store.query import Query
from kuha_common.document_store.records import (
    REC_STATUS_DELETED,
    record_by_collection,
    COLLECTIONS,
    Study,
    Variable,
    Question,
    StudyGroup
)


_STORE = {'eventloop': None}


[docs]class KuhaUnitTestCase(unittest.TestCase): """Base class for unittests. * Assertion methods to check record equality. * Helper methods to provide access to dummydata. """ #: Override in sublass to lookup dummydata from different directory. dummydata_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'dummydata') @classmethod def prepare_dummyvalues(cls): cls._dummyvalues = [None] cls.record_lang_1 = cls.set_val('lang_1') cls.record_lang_2 = cls.set_val('lang_2') @classmethod def discard_dummyvalues(cls): delattr(cls, 'record_lang_1') delattr(cls, 'record_lang_2') cls._dummyvalues = [None]
[docs] @classmethod def setUpClass(cls): cls.prepare_dummyvalues() super(KuhaUnitTestCase, cls).setUpClass()
[docs] @classmethod def tearDownClass(cls): cls.discard_dummyvalues() super(KuhaUnitTestCase, cls).tearDownClass()
[docs] @classmethod def get_dummydata_path(cls, path): """Get absolute path to dummydatafile :param path: Path. Gets turned into an absolute if it isn't :returns: Absolute path. :rtype: str """ if not os.path.isabs(path): path = os.path.abspath(os.path.join(cls.dummydata_dir, path)) return path
[docs] @classmethod def get_dummydata(cls, path): """Get dummydata by reading file from ``path`` :param path: path to file. :returns: Contents of the file. """ path = cls.get_dummydata_path(path) with open(path, 'r') as _f: data = _f.read() return data
[docs] @classmethod def remove_dummyfile_if_exists(cls, path): """Remove dummyfile from ``path`` if it exists. :param path: path to dummyfile. :returns: None """ path = cls.get_dummydata_path(path) if os.path.exists(path): os.remove(path)
[docs] @classmethod def set_val(cls, value): """Assign value as dummyvalue. :param value: Value to assign :returns: value """ if value in cls._dummyvalues: raise ValueError('%s already in use' % (value,)) cls._dummyvalues.append(value) return value
[docs] @classmethod def gen_val(cls, lenght=None, unique=False, chars=None): """Generate & assign dummyvalue. :param lenght: lenght of the value :type lenght: int or None :param unique: should the value be unique :type unique: bool :param chars: use specific characters. :type chars: str or None. :returns: generated value :rtype: str """ lenght = lenght if lenght else random.randint(1, 50) if not chars: whitespace = ' ' chars = string.ascii_letters + whitespace + string.digits def _val(): return ''.join( random.choice(chars) for _ in range(lenght) ) value = _val() if unique: retries = 100 while value in cls._dummyvalues: if not retries: raise ValueError("Maximum retries exceeded in gen_val()") value = _val() retries -= 1 cls._dummyvalues.append(value) return value
[docs] @classmethod def gen_id(cls): """Generate Id. :returns: Generated id. :rtype: str """ return cls.gen_val(10, True, string.ascii_letters + string.digits)
[docs] @classmethod def generate_dummy_study(cls): """Generate and return a Study with dummydata. :returns: study with dummydata :rtype: :obj:`kuha_common.document_store.records.Study` """ study = Study() study.add_study_number(cls.gen_id()) study.add_persistent_identifiers(cls.gen_val()) study.add_persistent_identifiers(cls.gen_val()) study.add_identifiers(cls.gen_val(), cls.record_lang_1, **{Study.identifiers.attr_agency.name: cls.gen_val()}) study.add_identifiers(cls.gen_val(), cls.record_lang_1, **{Study.identifiers.attr_agency.name: cls.gen_val()}) study.add_study_titles(cls.gen_val(), cls.record_lang_1) study.add_parallel_titles(cls.gen_val(), cls.record_lang_1) study.add_principal_investigators( cls.gen_val(), cls.record_lang_1, **{Study.principal_investigators.attr_organization.name: cls.gen_val(), Study.principal_investigators.attr_external_link.name: cls.gen_val(), Study.principal_investigators.attr_external_link_title.name: cls.gen_val(), Study.principal_investigators.attr_external_link_role.name: cls.gen_val(), Study.principal_investigators.attr_external_link_uri.name: cls.gen_val()}) study.add_publishers(cls.gen_val(), cls.record_lang_1, **{Study.publishers.attr_abbreviation.name: cls.gen_val()}) study.add_distributors(cls.gen_val(), cls.record_lang_1, **{Study.distributors.attr_abbreviation.name: cls.gen_val(), Study.distributors.attr_uri.name: cls.gen_val()}) study.add_document_uris(cls.gen_val(), cls.record_lang_1, **{Study.document_uris.attr_location.name: cls.gen_val(), Study.document_uris.attr_description.name: cls.gen_val()}) study.add_publication_dates(cls.gen_val(), cls.record_lang_1) study.add_publication_years(cls.gen_val(), cls.record_lang_1, **{Study.publication_years.attr_distribution_date.name: cls.gen_val()}) study.add_abstract(cls.gen_val(), cls.record_lang_1) study.add_classifications(cls.gen_val(), cls.record_lang_1, **{Study.classifications.attr_system_name.name: cls.gen_val(), Study.classifications.attr_uri.name: cls.gen_val(), Study.classifications.attr_description.name: cls.gen_val()}) study.add_keywords(cls.gen_val(), cls.record_lang_1, **{Study.keywords.attr_system_name.name: cls.gen_val(), Study.keywords.attr_uri.name: cls.gen_val(), Study.keywords.attr_description.name: cls.gen_val()}) study.add_time_methods(cls.gen_val(), cls.record_lang_1, **{Study.time_methods.attr_system_name.name: cls.gen_val(), Study.time_methods.attr_uri.name: cls.gen_val(), Study.time_methods.attr_description.name: cls.gen_val()}) study.add_sampling_procedures(cls.gen_val(), cls.record_lang_1, **{Study.sampling_procedures.attr_system_name.name: cls.gen_val(), Study.sampling_procedures.attr_uri.name: cls.gen_val(), Study.sampling_procedures.attr_description.name: cls.gen_val()}) study.add_collection_modes(cls.gen_val(), cls.record_lang_1, **{Study.collection_modes.attr_system_name.name: cls.gen_val(), Study.collection_modes.attr_uri.name: cls.gen_val(), Study.collection_modes.attr_description.name: cls.gen_val()}) study.add_analysis_units(cls.gen_val(), cls.record_lang_1, **{Study.analysis_units.attr_system_name.name: cls.gen_val(), Study.analysis_units.attr_uri.name: cls.gen_val(), Study.analysis_units.attr_description.name: cls.gen_val()}) study.add_collection_periods(cls.gen_val(), cls.record_lang_1, **{Study.collection_periods.attr_event.name: cls.gen_val()}) study.add_study_area_countries(cls.gen_val(), cls.record_lang_1, **{Study.study_area_countries.attr_abbreviation.name: cls.gen_val()}) study.add_universes(cls.gen_val(), cls.record_lang_1, **{Study.universes.attr_included.name: True}) study.add_universes(cls.gen_val(), cls.record_lang_1, **{Study.universes.attr_included.name: False}) study.add_data_access(cls.gen_val(), cls.record_lang_1) study.add_data_access_descriptions(cls.gen_val(), cls.record_lang_1) study.add_file_names(cls.gen_val(), cls.record_lang_1) study.add_instruments(cls.gen_val(), cls.record_lang_1, **{Study.instruments.attr_instrument_name.name: cls.gen_val()}) study.add_study_groups(cls.gen_id(), cls.record_lang_1, **{Study.study_groups.attr_name.name: cls.gen_val()}) study.add_copyrights(cls.gen_val(), cls.record_lang_1) return study
[docs] @classmethod def generate_dummy_variable(cls): """Generate and return a Variable with dummydata. :returns: variable with dummydata :rtype: :obj:`kuha_common.document_store.records.Variable` """ var = Variable() var.add_study_number(cls.gen_id()) var.add_variable_name(cls.gen_id()) var.add_question_identifiers(cls.gen_val()) var.add_variable_labels(cls.gen_val(), cls.record_lang_1) var.add_variable_labels(cls.gen_val(), cls.record_lang_2) var.add_codelist_codes(cls.gen_val(), cls.record_lang_1, **{Variable.codelist_codes.attr_label.name: cls.gen_val(), Variable.codelist_codes.attr_missing.name: True}) var.add_codelist_codes(cls.gen_val(), cls.record_lang_1, **{Variable.codelist_codes.attr_label.name: cls.gen_val(), Variable.codelist_codes.attr_missing.name: False}) return var
[docs] @classmethod def generate_dummy_question(cls): """Generate and return a Question with dummydata. :returns: question with dummydata :rtype: :obj:`kuha_common.document_store.records.Question` """ question = Question() question.add_study_number(cls.gen_id()) question.add_question_identifier(cls.gen_id()) question.add_variable_name(cls.gen_val()) question.add_question_texts(cls.gen_val(), cls.record_lang_1) question.add_research_instruments(cls.gen_val(), cls.record_lang_1) question.add_codelist_references(cls.gen_val(), cls.record_lang_1) return question
[docs] @classmethod def generate_dummy_studygroup(cls): """Generate and return a StudyGroup with dummydata. :returns: studygroup with dummydata. :rtype: :obj:`kuha_common.document_store.records.StudyGroup` """ studygroup = StudyGroup() studygroup.add_study_group_identifier(cls.gen_id()) studygroup.add_study_group_names(cls.gen_val(), cls.record_lang_1) studygroup.add_study_numbers(cls.gen_id()) return studygroup
@staticmethod def _to_json_bytestring(value): return bytes(json_encode(value), encoding='utf8')
[docs] @staticmethod def run_until_complete(coro): """Run coroutine until it's completed :param coro: Coroutine function. """ asyncio.run(coro)
[docs] def setUp(self): """Format testcase values and initialize event loop. Call asynchronous code synchronously:: self.run_until_complete(coro()) """ super().setUp() if _STORE['eventloop'] is None: _STORE['eventloop'] = asyncio.new_event_loop() asyncio.set_event_loop(_STORE['eventloop']) self._patchers = [] self._resets = [] self._stored_result = None self._isolate_conf()
[docs] def tearDown(self): """Stop patchers. """ for patcher in self._patchers: patcher.stop() for reset in self._resets: reset() client.JSONStreamClient.connect_timeout = client.DS_CLIENT_CONNECT_TIMEOUT client.JSONStreamClient.request_timeout = client.DS_CLIENT_REQUEST_TIMEOUT client.JSONStreamClient.max_clients = client.DS_CLIENT_MAX_CLIENTS server._server_conf['server_process_count'] = server.DEFAULT_PROCESS_COUNT query.Query.base_url = None super().tearDown()
def _isolate_conf(self): stored = copy.deepcopy(conf._STORED) def reset(): setattr(conf, '_STORED', stored) self._resets.append(reset) return reset
[docs] async def await_and_store_result(self, coro): """Await coroutine and store returning result. Example:: self.run_until_complete(self.await_and_store_result(coro())) :param coro: Coroutine or Future to await """ self._stored_result = await coro
# Patchers
[docs] def init_patcher(self, patcher): """Initialize patcher, store for later use, return it. :param patcher: Patch to start. :type patcher: :obj:`unittest.mock._patch` :returns: MagicMock acting as patched object. :rtype: :class:`unittest.mock.MagicMock` """ _mock = patcher.start() self._patchers.append(patcher) return _mock
# Assertions
[docs] def assert_records_are_equal(self, first, second, msg=None): """Assert two Document Store records are equal. :param first: First record to compare. :param second: Second record to compare. :param msg: Optional message to output on assertion. """ if msg is None: msg = "Record instances are not equal. Expecting equal." self.assertEqual(first.export_dict(include_metadata=False, include_id=False), second.export_dict(include_metadata=False, include_id=False), msg=msg)
[docs] def assert_records_are_not_equal(self, first, second, msg=None): """Assert two Document Store records are not equal. :param first: First record to compare. :param second: Second record to compare. :param msg: Optional message to output on assertion. """ if msg is None: msg = "Record instances are equal. Expecting different instances." self.assertNotEqual(first.export_dict(include_metadata=False, include_id=False), second.export_dict(include_metadata=False, include_id=False), msg=msg)
[docs] def assert_mock_meth_has_calls(self, mock_meth, call, *calls): """Assert moch_meth was called with arguments. This calls Mock.assert_has_calls and tests for call count. The actual benefit of using this method over the built-in assert_has_calls is that this method tries to pinpoint the actual call that was missing when assert_has_calls raised AssertionError. This is useful when mock_meth has had multiple calls. The built-in assert_has_calls will notify of all calls that the mock_meth has had, while this method will notify of the actual call that was missing. :param mock_meth: Mocked method that is target of testing. :param call: Call that should be found. Instance of :class:`unittest.mock._Call` Repeat this argument to test for multiple calls. :raises: :exc:`AssertionError` if calls not found. """ exception = None calls = list((call,) + calls) self.assertEqual(mock_meth.call_count, len(calls)) try: mock_meth.assert_has_calls(calls) except AssertionError as exc: exception = exc if exception is None: return None # Try to pinpoint the actual call that was not found. # This will not work properly in all conditions # so we must make sure to raise the original assertionerror # unconditionally try: for single_call in calls: mock_meth.assert_has_calls([single_call]) except AssertionError as exc: raise exc from exception raise exception
[docs]class KuhaEndToEndTestCase(KuhaUnitTestCase): """Base class for end-to-end tests. * HTTPClient for interacting with Document Store. * Assertion methods to check returning payload and status codes. """ JSON_HEADERS = {'Content-Type': 'application/json'} POST_FORM_HEADERS = {'Content-Type': 'application/x-www-form-urlencoded'} unsupported_args = None http_client = None
[docs] @classmethod def setUpClass(cls): if 'KUHA_DS_URL' not in os.environ: raise unittest.SkipTest("KUHA_DS_URL environment variable is not set.") try: conf.get_conf('document_store_url') except AttributeError: conf.add_conf('document_store_url', os.environ['KUHA_DS_URL']) cls.http_client = HTTPClient() super(KuhaEndToEndTestCase, cls).setUpClass()
[docs] @classmethod def tearDownClass(cls): cls.http_client.close() conf._STORED['conf'] = conf.configargparse.Namespace() super(KuhaEndToEndTestCase, cls).tearDownClass()
@staticmethod def settings(): return conf.get_conf()
[docs] @staticmethod def get_record_url(rec_or_coll, _id=None): """Get URL to Document Store records or single record. :param rec_or_coll: record, record class or collection :param _id: Optional record ID. :type _id: str or None :returns: URL to Document Store collection or single record. :rtype: str """ collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll if _id is not None: return '/'.join([conf.get_conf('document_store_url'), collection, _id]) return '/'.join([conf.get_conf('document_store_url'), collection])
[docs] @staticmethod def get_query_url(rec_or_coll, query_type=None): """Get URL to Document Store query endpoint for ``collection`` :param rec_or_coll: Collection to query. :type rec_or_coll: str, record, or record class :param query_type: Optional query type :returns: URL to query endpoint. :rtype: str """ collection = rec_or_coll.get_collection() if hasattr(rec_or_coll, 'get_collection') else rec_or_coll url = '/'.join([conf.get_conf('document_store_url'), 'query', collection]) if not query_type: return url if not Query.is_valid_query_type(query_type): raise ValueError("Invalid query-type {}".format(query_type)) return url + '?query_type={}'.format(query_type)
# Communication with Document Store
[docs] @classmethod def GET_to_document_store(cls, rec_or_coll, _id=None): # pylint: disable=C0103 """GET to Document Store returns record(s). :param rec_or_coll: record or collection to get. :param _id: Optional ObjectId. Will take precedence over ``rec_or_coll`` id. :returns: response body """ if _id is None and hasattr(rec_or_coll, 'get_id'): try: _id = rec_or_coll.get_id() except TypeError: pass url = cls.get_record_url(rec_or_coll, _id) response = cls.http_client.fetch(url) if not response.body: return None # Note that if body contains multiple streamed JSON records, json_decode will fail. return json_decode(response.body)
[docs] @classmethod def POST_to_document_store(cls, record): # pylint: disable=C0103 """POST to Document Store creates record. :param record: Record to post. :returns: response body """ url = cls.get_record_url(record.get_collection()) response = cls.http_client.fetch(url, method='POST', body=json_encode(record.export_dict(include_metadata=False, include_id=False)), headers=cls.JSON_HEADERS) return json_decode(response.body)
[docs] @classmethod def DELETE_to_document_store(cls, rec_or_coll=None, _id=None, hard_delete=True): # pylint: disable=C0103 """DELETE to Document Store deletes record(s). Call without arguments to delete all records from all collections. :param str or None rec_or_coll: Collection to delete from. :param str or None _id: ID of the record to delete. :param bool hard_delete: Set to False to do a logical delete instead. :returns: Response body or bodies wrapped in dict ({<collection>: <resp_body>}) """ if _id is not None and rec_or_coll is None: raise ValueError("Give collection with record id") def _url(rec_or_coll, _id=None): delete_type = 'hard' if hard_delete else 'soft' return '%s?delete_type=%s' % (cls.get_record_url(rec_or_coll, _id), delete_type) if rec_or_coll is None: rval = {} for coll in COLLECTIONS: resp = cls.http_client.fetch(_url(coll), method="DELETE") rval.update({coll: json_decode(resp.body)}) return rval collection = rec_or_coll.get_collection() if\ hasattr(rec_or_coll, 'get_collection') else\ rec_or_coll url = _url(collection, _id) resp = cls.http_client.fetch(url, method="DELETE") return json_decode(resp.body)
[docs] @classmethod def query_document_store(cls, rec_or_coll, query, query_type=None): """Execute query against Document Store query API. :param rec_or_coll: Collection to query. :type rec_or_coll: str or record class or record instance :param query: Query. :param query_type: Type of Query. :returns: query results :rtype: None if query returned no results, dict for results. """ url = cls.get_query_url(rec_or_coll, query_type) response = cls.http_client.fetch(url, method="POST", body=json_encode(query), headers=cls.JSON_HEADERS) if not response.body: return None # Note that if body contains multiple streamed JSON records, json_decode will fail. return json_decode(response.body)
[docs] @classmethod def get_collection_record_count(cls, rec_or_coll, exclude_deleted=True): """Return number or records for collection in Document Store. :param rec_or_coll: Document Store record, Document Store record class or collection. :param bool exclude_deleted: exclude logically deleted records from count. :returns: record count in Document Store. :rtype: int """ collection = rec_or_coll.get_collection() if\ hasattr(rec_or_coll, 'get_collection') else\ rec_or_coll url = cls.get_query_url(collection, Query.query_type_count) query_dict = {} if exclude_deleted: rec = record_by_collection(collection) query_dict = { query.Query.k_filter: { rec._metadata.attr_status.path: { query.FilterKeyConstants.not_equal: REC_STATUS_DELETED}}} response = cls.http_client.fetch(url, method="POST", body=json_encode(query_dict), headers=cls.JSON_HEADERS) _dict = json_decode(response.body) return _dict['count']
[docs] def assert_document_store_is_empty(self, exclude_deleted=True): """Assert Document Store contains no records. :param bool exclude_deleted: exclude logically deleted records from count. :raises: :exc:`AssertionError` if Document Store has records. """ for coll in COLLECTIONS: count = self.get_collection_record_count(coll, exclude_deleted=exclude_deleted) self.assertEqual(count, 0)