Source code for kuha_common.document_store.mappings.ddi.ddi31

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2022 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
#
"""Mapping profiles for DDI codebook versions."""

from itertools import chain
from kuha_common.document_store.mappings.exceptions import (
    UnknownXMLRoot,
    MissingRequiredAttribute,
    MappingError
)
from kuha_common.document_store.mappings.xmlbase import (
    MappedParams,
    XMLParserBase,
    as_valid_identifier,
    str_equals,
    fixed_value,
    element_remove_whitespaces,
    get_preferred_publication_id_agency_pair
)


_XPATH_REL_CITATION_TITLE = './r:Citation/r:Title'


[docs]class DDI31RecordParser(XMLParserBase): """Parse Document Store records from DDI 3.1. XML Check the root element. Expects either ddi:DDIInstance or s:StudyUnit. Currently supports only single s:StudyUnit element within the root. :param root_element: XML root element. :type root_element: :obj:`xml.etree.ElementTree.Element` :raises: :exc:`UnknownXMLRoot` for unexpected root element. :raises: :exc:`MappingError` if root contains more or less that exactly one s:StudyUnit child. """ #: XML namespaces NS = {'xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'xml': 'http://www.w3.org/XML/1998/namespace', 'xhtml': 'http://www.w3.org/1999/xhtml', 'ddi': 'ddi:instance:3_1', 's': 'ddi:studyunit:3_1', 'pd': 'ddi:physicaldataproduct:3_1', 'pi': 'ddi:physicalinstance:3_1', 'c': 'ddi:conceptualcomponent:3_1', 'l': 'ddi:logicalproduct:3_1', 'r': 'ddi:reusable:3_1', 'g': 'ddi:group:3_1', 'dc': 'ddi:datacollection:3_1', 'a': 'ddi:archive:3_1'} def __init__(self, root_element): self._find_study_unit_element(root_element) super().__init__(root_element) @classmethod def _DDIInstance_tag(cls): return '{%s}DDIInstance' % (cls.NS['ddi'],) @classmethod def _is_DDIInstance(cls, element): return element.tag == cls._DDIInstance_tag() def _find_study_unit_element(self, root_element): expected_studyunit_root = '{%s}StudyUnit' % (self.NS['s'],) if self._is_DDIInstance(root_element): study_unit_elements = list(root_element.iter('{%s}StudyUnit' % (self.NS['s'],))) study_unit_count = len(study_unit_elements) if study_unit_count > 1: # Currently supports only a single s:StudyUnit in xml metadata. raise MappingError("Unable to parse multiple StudyUnit elements") if study_unit_count < 1: raise MappingError("Unable to find StudyUnit element") self.study_unit_element = study_unit_elements.pop() elif root_element.tag == expected_studyunit_root: self.study_unit_element = root_element else: raise UnknownXMLRoot(root_element.tag, self._DDIInstance_tag(), expected_studyunit_root) @property def _study_unit_language(self): """Get language of StudyUnit element. Returns :attr:`root_language` if StudyUnit does not declare a language. :returns: Language :rtype: str """ return self._get_xmllang(self.study_unit_element, default=self.root_language) def _get_ddiinstance_el(self): if self._is_DDIInstance(self.root_element): return self.root_element return self._find('.//ddi:DDIInstance') def _iter_reference_values(self, xpath_to_parent, element, *elements): xpath = '%s/r:ID' % (xpath_to_parent,) elements = (element,) + elements for ref_el in self._findall_from_elements(elements, xpath): if ref_el is not None or ref_el.text not in ('', None): yield ''.join(ref_el.itertext()) def _find_by_reference_value(self, ref_val, xpath, element=None): element = element or self.root_element return element.find('{base}[@id="{ref_id}"]'.format(base=xpath, ref_id=ref_val), self.NS) def _find_by_reference(self, ref_el, xpath, element=None): if ref_el is None or ref_el.text in ('', None): return None return self._find_by_reference_value(''.join(ref_el.itertext()).strip(), xpath, element=element) def _find_and_iter_referred_els(self, ref_xpath, target_xpath, *ref_elements, lookup_element=None): """Finds referred elements and yields them one by one. :param str ref_xpath: xpath to reference element :param str target_xpath: xpath to target element :param ref_elements: elements to search through for reference element :param lookup_element: find target from this element's children instead of root :returns: generator yielding referenced elements """ if ref_elements != (): for ref_val in self._iter_reference_values(ref_xpath, *ref_elements): candidate = self._find_by_reference_value(ref_val, target_xpath, element=lookup_element) if candidate is not None: yield candidate def _get_study_number_from_study_unit_element(self, study_unit_element, raise_error_if_missing=True): archive_els = list(self._iter_archives_from_study_unit(study_unit_element)) for candidate in self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber'): if candidate is not None and candidate.text not in ['', None]: return candidate.text for candidate in self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber'): if candidate is not None and candidate.text not in ['', None]: return candidate.text candidate = study_unit_element.find('./r:UserID', self.NS) if candidate is not None and candidate.text not in ['', None]: return candidate.text if raise_error_if_missing: raise MissingRequiredAttribute('./r:UserID', './a:Archive/a:ArchiveSpecific/a:Collection/a:CallNumber', './a:Archive/a:ArchiveSpecific/a:Item/a:CallNumber', msg='Unable to find study number from %s, %s or %s') return None def _parse_study_number(self): self.study_number = self._get_study_number_from_study_unit_element(self.study_unit_element) def _get_spatial_coverage_from_study_unit(self, study_unit_element): """Get SpatialCoverage element from StudyUnit element. The SpatialCoverage element may be an inline child element of StudyUnit or referenced by SpatialCoverageReference. :param study_unit_element: s:StudyUnit element :type study_unit_element: :obj:`xml.etree.ElementTree.Element` :returns: SpatialCoverage of the StudyUnit. :rtype: :obj:`xml.etree.ElementTree.Element` or None """ ref_id_el = study_unit_element.find('./r:Coverage/r:SpatialCoverageReference/r:ID', self.NS) candidate = self._find_by_reference(ref_id_el, './/r:SpatialCoverage') if candidate is None: candidate = study_unit_element.find('./r:Coverage/r:SpatialCoverage', self.NS) return candidate def _iter_data_collections_from_study_unit(self, study_unit_element): for ref_id_el in study_unit_element.findall('./s:DataCollectionReference/r:ID', self.NS): candidate = self._find_by_reference(ref_id_el, './/dc:DataCollection') if candidate is not None: yield candidate for data_coll_el in study_unit_element.findall('./dc:DataCollection', self.NS): yield data_coll_el def _iter_physical_instances_from_study_unit(self, study_unit_element): for ref_id_el in study_unit_element.findall('./s:PhysicalInstanceReference/r:ID', self.NS): candidate = self._find_by_reference(ref_id_el, './/pi:PhysicalInstance') if candidate is not None: yield candidate for physical_instance_el in study_unit_element.findall('./pi:PhysicalInstance', self.NS): yield physical_instance_el def _iter_archives_from_study_unit(self, study_unit_element): archive_el = study_unit_element.find('./a:Archive', self.NS) if archive_el is not None: yield archive_el for archive_el in self._find_and_iter_referred_els('./s:ArchiveReference', './/a:Archive', study_unit_element): yield archive_el def _iter_other_materials_from_study_unit(self, study_unit_element): for oth_mat_el in self._findall('./r:OtherMaterial', study_unit_element): yield oth_mat_el def _iter_funding_informations_from_study_unit(self, study_unit_element): yield from self._findall('./r:FundingInformation', study_unit_element) def _iter_collection_periods_as_mapped_params(self): """Generate collection periods as :obj:`MappedParams` Returns a generator which yields :obj:`MappedParams` instances containing collection periods. .. note:: DDI 3.1. supports only single DataCollectionDate for each CollectionEvent. That is not enforced here. :returns: Generator yielding collection periods. """ data_colls = self._iter_data_collections_from_study_unit(self.study_unit_element) for dc_date in self._findall_from_elements(data_colls, './dc:CollectionEvent/dc:DataCollectionDate'): simple_date = dc_date.find('./r:SimpleDate', self.NS) if simple_date is not None: params = MappedParams(simple_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: 'single'}) yield params continue # DataCollectionDate can contain either SimpleDate or StartDate and EndDate start_date = dc_date.find('./r:StartDate', self.NS) if start_date is not None: params = MappedParams(start_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: 'start'}) yield params end_date = dc_date.find('./r:EndDate', self.NS) if end_date is not None: # It is a violation of the DDI31 standard to have an EndDate without # a StartDate. Kuha won't mind however. params = MappedParams(end_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: 'end'}) yield params def _iter_study_uris_as_mapped_params(self): """Generate Study Uris as :obj:`MappedParams` :returns: Generator yielding :obj:`MappedParams` """ archive_els = self._iter_archives_from_study_unit(self.study_unit_element) for archive_spec_el in self._findall_from_elements(archive_els, './a:ArchiveSpecific'): # First get the actual URI. uri_el = archive_spec_el.find("./a:Collection/[a:CallNumber='{}']/a:URI". format(self.study_number), self.NS) if uri_el is None: uri_el = archive_spec_el.find("./a:Item/[a:CallNumber='{}']/a:URI". format(self.study_number), self.NS) if uri_el is not None: uri = uri_el.text uri_lang = self._get_xmllang(uri_el, default=self._study_unit_language) else: uri = None org_reference_id_el = archive_spec_el.find('./a:ArchiveOrganizationReference/r:ID', self.NS) if org_reference_id_el is None: if uri_el is None: continue param = MappedParams(uri) param.set_language(uri_lang) yield param continue # Then get the organization if referenced. org_el = self._find_by_reference(org_reference_id_el, './/a:Organization') if org_el is None: # Unable to find referenced organization. continue for org_name_el in org_el.findall('./a:OrganizationName', self.NS): param = MappedParams(uri) param.set_language(self._get_xmllang(org_name_el, default=self._study_unit_language)) param.keyword_arguments.update({self._study_cls.study_uris.attr_location.name: org_name_el.text}) # Try to find localized element. org_desc_el = org_el.find("./r:Description[@xml:lang='{}']".format(param.get_language()), self.NS) if org_desc_el is not None: param.keyword_arguments.update({ self._study_cls.study_uris.attr_description.name: org_desc_el.text}) elif param.get_language() == self._study_unit_language: # If language equals the language of the StudyUnit, # accept Description without locale. org_desc_el = org_el.find('./r:Description', self.NS) if org_desc_el is not None: param.keyword_arguments.update({ self._study_cls.study_uris.attr_description.name: org_desc_el.text}) yield param def _iter_universes_as_mapped_params(self): """Generate Universes as :obj:`MappedParams`. :returns: Generator yielding :obj:`MappedParams` """ inc_to_bool = str_equals('true', True) for ref_id_el in self.study_unit_element.findall('./r:UniverseReference/r:ID', self.NS): universe_el = self._find_by_reference(ref_id_el, ".//c:Universe") if universe_el is None: continue included = inc_to_bool(universe_el.attrib.get('isInclusive')) for desc in universe_el.findall('./c:HumanReadable', self.NS): param = MappedParams(desc.text) param.set_language(self._get_xmllang(desc, default=self._study_unit_language)) param.keyword_arguments.update({self._study_cls.universes.attr_included.name: included}) yield param def _iter_identifiers_as_mapped_params(self): """Generate Identifiers as :obj:`MappedParams` Will not discard duplicates. :returns: Generator yielding :obj:`MappedParams` """ # Cast to list because iterabing multiple times througt archive_els = list(self._iter_archives_from_study_unit(self.study_unit_element)) def _param_from_els(els): for element in els: if element.text in [None, '']: continue param = MappedParams(''.join(element.itertext())) param.set_language(self._get_xmllang(element, default=self._study_unit_language)) yield param for param in _param_from_els( self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber')): yield param for param in _param_from_els( self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber')): yield param for param in _param_from_els( self.study_unit_element.findall('./r:Citation/r:InternationalIdentifier', self.NS)): yield param def _iter_mapped_params_from_geography_element(self, *geography_els): geography_els = filter(lambda x: x is not None, geography_els) for name_el in self._findall_from_elements(geography_els, './r:Level/r:Name'): if name_el.text in [None, '']: continue params = MappedParams("".join(name_el.itertext())) params.set_language(self._get_xmllang(name_el, default=self._study_unit_language)) yield params def _iter_mapped_params_from_geographyvalue_elements(self, geogvalue_els): for geogvalue_el in geogvalue_els: code_el = geogvalue_el.find('./r:GeographyCode/r:Value', self.NS) code_value = code_el.text if code_el is not None and code_el.text not in [None, ''] else None for name_el in geogvalue_el.findall('./r:GeographyName', self.NS): params = MappedParams(''.join(name_el.itertext())) params.set_language(self._get_xmllang(name_el, default=self._study_unit_language)) if code_value is not None: params.keyword_arguments.update( {self._study_cls.study_area_countries.attr_abbreviation.name: code_value}) yield params def _iter_study_area_countries_as_mapped_params(self): spatial_coverage_el = self._get_spatial_coverage_from_study_unit(self.study_unit_element) if spatial_coverage_el is None: return # TopLevelReference & LowestLevelReference are mandatory children of SpatialCoverage for parentlevel_ref_el in (spatial_coverage_el.find('./r:TopLevelReference', self.NS), spatial_coverage_el.find('./r:LowestLevelReference', self.NS)): ref_id_el = parentlevel_ref_el.find('./r:LevelReference/r:ID', self.NS) if ref_id_el is None or ref_id_el.text in [None, '']: level_name_el = parentlevel_ref_el.find('./r:LevelName', self.NS) if level_name_el is None or level_name_el.text in [None, '']: continue params = MappedParams("".join(level_name_el.itertext())) params.set_language(self._study_unit_language) yield params continue geostruct_el = self._find_by_reference(ref_id_el, './/r:GeographicStructure') if geostruct_el is None: continue # Geography elements inline. for params in self._iter_mapped_params_from_geography_element( *geostruct_el.findall('./r:Geography', self.NS)): yield params # Geography elements by reference. for ref_id_el in geostruct_el.findall('./r:GeographyReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue for params in self._iter_mapped_params_from_geography_element( self._find_by_reference(ref_id_el, './/r:Geography')): yield params for ref_id_el in spatial_coverage_el.findall('./r:GeographicLocationReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue for params in self._iter_mapped_params_from_geographyvalue_elements(self.root_element.findall( './/r:GeographicLocation[@id="{ref_id}"]/r:Values/r:GeographyValue' .format(ref_id=ref_id_el.text.strip()), self.NS)): yield params def _get_role_and_grant_numbers_from_funding_info_el(self, funding_info_el): role = funding_info_el.attrib.get('role') grant_numbers = [] for grant_number_el in self._findall('./r:GrantNumber', funding_info_el): grant_numbers.append(''.join(grant_number_el.itertext())) return role, grant_numbers def _iter_funding_agencies_as_mapped_params(self): for funding_info_el in self._iter_funding_informations_from_study_unit(self.study_unit_element): role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el( funding_info_el) # Data model currently supports only a single grant number attribute. grant_number = grant_numbers.pop(0) if grant_numbers else None description = ''.join([elem.itertext() for elem in self._findall('./r:Description', funding_info_el)])\ or None for org_name_el in self._findall_from_elements( self._find_and_iter_referred_els('./r:AgencyOrganizationReference', './/a:Organization', funding_info_el), './a:OrganizationName'): params = MappedParams(''.join(org_name_el.itertext())) params.set_language(self._get_xmllang(org_name_el, self.root_language)) params.keyword_arguments.update({ self._study_cls.funding_agencies.attr_grant_number.name: grant_number, self._study_cls.funding_agencies.attr_role.name: role, self._study_cls.funding_agencies.attr_description.name: description}) yield params def _iter_grant_numbers_as_mapped_params(self): for funding_info_el in self._iter_funding_informations_from_study_unit(self.study_unit_element): role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el(funding_info_el) for grant_number in grant_numbers: params = MappedParams(grant_number) params.set_language(self._get_xmllang(funding_info_el, self.root_language)) params.keyword_arguments.update({ self._study_cls.grant_numbers.attr_role.name: role }) yield params def __iter_other_materials_for_related_publications(self): found = False for oth_mat_el in self._iter_other_materials_from_study_unit(self.study_unit_element): if oth_mat_el.attrib.get('type') == 'Related Publication': yield oth_mat_el found = True if not found: ddi_instance_el = self._get_ddiinstance_el() if ddi_instance_el: for oth_mat_el in self._findall('./g:ResourcePackage/r:OtherMaterial', ddi_instance_el): if oth_mat_el.attrib.get('type') == 'Related Publication': yield oth_mat_el def _iter_params_from_othmat_properties(self, title_str_els, desc_str_els=None, uri=None, distribution_date=None, id_agency_pair=None): desc_str_els = desc_str_els or [] _id, agency = id_agency_pair if id_agency_pair else (None, None) langs = [self._get_xmllang(element, self._study_unit_language) for element in chain.from_iterable([title_str_els, desc_str_els])] mapped_langs_values = {lang: {} for lang in langs} for title_str_el in title_str_els: mapped_langs_values[self._get_xmllang(title_str_el, self._study_unit_language)].update({ 'title': ''.join(title_str_el.itertext())}) for desc_str_el in desc_str_els: mapped_langs_values[self._get_xmllang(desc_str_el, self._study_unit_language)].update({ 'desc': ''.join(desc_str_el.itertext())}) for lang, values in mapped_langs_values.items(): params = MappedParams(values.get('title')) params.set_language(lang) params.keyword_arguments.update({ self._study_cls.related_publications.attr_description.name: values.get('desc'), self._study_cls.related_publications.attr_uri.name: uri, self._study_cls.related_publications.attr_distribution_date.name: distribution_date, self._study_cls.related_publications.attr_identifier.name: _id, self._study_cls.related_publications.attr_identifier_agency.name: agency }) yield params def _iter_related_publications_as_mapped_params(self): for oth_mat_el in self.__iter_other_materials_for_related_publications(): title_str_els = self._findall(_XPATH_REL_CITATION_TITLE, oth_mat_el) ext_url_ref_el = self._find('r:ExternalURLReference', oth_mat_el) uri = ''.join(ext_url_ref_el.itertext()) if ext_url_ref_el is not None else None simple_date_el = self._find('./r:Citation/r:PublicationDate/r:SimpleDate', oth_mat_el) distribution_date = ''.join(simple_date_el.itertext()) if simple_date_el is not None else None ids_agencys = [] for id_el in self._findall('./r:Citation/r:InternationalIdentifier', oth_mat_el): ids_agencys.append((''.join(id_el.itertext()), id_el.attrib.get('type'))) yield from self._iter_params_from_othmat_properties( title_str_els, uri=uri, distribution_date=distribution_date, id_agency_pair=get_preferred_publication_id_agency_pair(ids_agencys)) @property def _study_maps(self): return [ (self._study_cls.add_study_titles, self._map_multi(_XPATH_REL_CITATION_TITLE)), (self._study_cls.add_abstract, self._map_multi('./s:Abstract/r:Content'). set_value_getter(element_remove_whitespaces)), (self._study_cls.add_principal_investigators, self._map_multi('./r:Citation/r:Creator'). add_attribute(self._study_cls.principal_investigators.attr_organization.name, self._map_single('.', 'affiliation'))), (self._study_cls.add_publishers, self._map_multi('./r:Citation/r:Publisher')), (self._study_cls.add_publication_years, self._map_multi('./r:Citation/r:PublicationDate/r:SimpleDate'). add_attribute(self._study_cls.publication_years.attr_distribution_date.name, self._map_single('.'))), (self._study_cls.add_classifications, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Subject'). set_value_conversion(fixed_value(None)). # DDI31 contains no @id. # DDI31 contains no codelistname # DDI31 contains no codelisturn add_attribute(self._study_cls.classifications.attr_system_name.name, self._map_single('.', 'codeListID')). add_attribute(self._study_cls.classifications.attr_description.name, self._map_single('.'))), (self._study_cls.add_keywords, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Keyword'). set_value_conversion(fixed_value(None)). # DDI31 contains no @id. # DDI31 contains no codelistname # DDI31 contains no codelisturn add_attribute(self._study_cls.keywords.attr_system_name.name, self._map_single('.', 'codeListID')). add_attribute(self._study_cls.keywords.attr_description.name, self._map_single('.'))), (self._study_cls.add_analysis_units, self._map_multi('./r:AnalysisUnit'). add_attribute(self._study_cls.analysis_units.attr_system_name.name, self._map_single('.', 'codeListID')). add_attribute(self._study_cls.analysis_units.attr_uri.name, self._map_single('.', 'codeListURN'))) ] @property def studies(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Study`. :returns: Generator to Populate Document Store Study record. """ if self.study_number is None: self._parse_study_number() study = self._study_cls() study.add_study_number(self.study_number_identifier) if self._is_DDIInstance(self.root_element): # DDIInstance must be root of the document. If there is a DDIInstance, map its title into # Study.document_titles. # DDIInstance may only have a single r:Citation and r:Citation may only have a single # r:Title # Get r:Citation/r:AlternateTitle too. self._map_to_record(study, self.root_element, [ (self._study_cls.add_document_titles, self._map_single(_XPATH_REL_CITATION_TITLE, localizable=True)), (self._study_cls.add_document_titles, self._map_multi('./r:Citation/r:AlternateTitle')) ]) self._map_to_record(study, self.study_unit_element, self._study_maps, default_language=self._study_unit_language) # DDI3 has references and is much too complex to use XMLMapper for all elements. # Use custom mappings for certain elements. # These are relative to dc:DataCollection data_coll_maps = [ (self._study_cls.add_time_methods, self._map_multi('./dc:Methodology/dc:TimeMethod'). set_value_getter(self.child_text('r:UserID')). add_attribute(self._study_cls.time_methods.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True). add_attribute(self._study_cls.time_methods.attr_system_name.name, self._map_single('./r:UserID', 'type'))), (self._study_cls.add_sampling_procedures, self._map_multi('./dc:Methodology/dc:SamplingProcedure'). set_value_getter(self.child_text('r:UserID')). add_attribute(self._study_cls.sampling_procedures.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True). add_attribute(self._study_cls.sampling_procedures.attr_system_name.name, self._map_single('./r:UserID', 'type'))), (self._study_cls.add_collection_modes, self._map_multi('./dc:CollectionEvent/dc:ModeOfCollection'). set_value_getter(self.child_text('r:UserID')). add_attribute(self._study_cls.collection_modes.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True). add_attribute(self._study_cls.collection_modes.attr_system_name.name, self._map_single('./r:UserID', 'type'))), ] for data_collection_element in self._iter_data_collections_from_study_unit(self.study_unit_element): self._map_to_record(study, data_collection_element, data_coll_maps, default_language=self._study_unit_language) # These are relative to pi:PhysicalInstance physical_instance_maps = [ (self._study_cls.add_file_names, self._map_multi('./pi:DataFileIdentification/pi:URI')) ] for physical_instance_element in self._iter_physical_instances_from_study_unit(self.study_unit_element): self._map_to_record(study, physical_instance_element, physical_instance_maps, default_language=self._study_unit_language) # These are relative to a:Archive archive_maps = [ (self._study_cls.add_data_access, self._map_multi('./a:ArchiveSpecific/a:DefaultAccess/a:Restrictions')), (self._study_cls.add_data_access_descriptions, self._map_multi( './a:ArchiveSpecific/a:DefaultAccess/a:AccessConditions')) ] for archive_element in self._iter_archives_from_study_unit(self.study_unit_element): self._map_to_record(study, archive_element, archive_maps, default_language=self._study_unit_language) for add_func, mapping in [(study.add_collection_periods, self._iter_collection_periods_as_mapped_params), (study.add_study_uris, self._iter_study_uris_as_mapped_params), (study.add_universes, self._iter_universes_as_mapped_params), (study.add_study_area_countries, self._iter_study_area_countries_as_mapped_params), (study.add_identifiers, self._iter_identifiers_as_mapped_params), (study.add_funding_agencies, self._iter_funding_agencies_as_mapped_params), (study.add_grant_numbers, self._iter_grant_numbers_as_mapped_params), (study.add_related_publications, self._iter_related_publications_as_mapped_params)]: for params in mapping(): add_func(*params.arguments, **params.keyword_arguments) yield study @property def _variable_elements(self): """Variable elements generator. First look for logicalproducts defined as a child of study_unit_element. If none found, try to look them by references from all children of root_element. :returns: generator yielding Variable elements. """ logicalproducts = self.study_unit_element.findall('./l:LogicalProduct', self.NS) if logicalproducts == []: ref_id_els = self.study_unit_element.findall('.//s:LogicalProductReference/r:ID', self.NS) ref_id_els += self.study_unit_element.findall('.//pd:LogicalProductReference/r:ID', self.NS) for ref_id_el in set(ref_id_els): logicalproduct_el = self._find_by_reference(ref_id_el, ".//l:LogicalProduct") if logicalproduct_el is not None: logicalproducts.append(logicalproduct_el) for varscheme_ref_id_el in self._findall_from_elements(logicalproducts, './/l:VariableSchemeReference/r:ID'): if varscheme_ref_id_el.text in [None, '']: continue ref_id = varscheme_ref_id_el.text.strip() for variable_el in self.root_element.findall(".//l:VariableScheme[@id='{ref_id}']/l:Variable" .format(ref_id=ref_id), self.NS): yield variable_el def _iter_code_elements_by_reference(self, ref_id): for code_el in self.root_element.findall(".//l:CodeScheme[@id='{ref_id}']/l:Code" .format(ref_id=ref_id), self.NS): yield code_el @property def _variable_maps(self): return [ (self._variable_cls.add_variable_name, self._map_single('./l:VariableName', required=True) .set_value_conversion(as_valid_identifier)), (self._variable_cls.add_variable_labels, self._map_multi('./r:Label')), (self._variable_cls.add_question_identifiers, self._map_multi('./l:QuestionReference/r:ID', localizable=False) .set_value_conversion(as_valid_identifier)) ] def _add_codelist_codes_to_variable(self, variable, var_el): codeschemeref_id_el = var_el.find('./l:Representation/l:CodeRepresentation/r:CodeSchemeReference/r:ID', self.NS) if codeschemeref_id_el is None or codeschemeref_id_el.text in [None, '']: return missing_values = set(var_el.find('./l:Representation/l:CodeRepresentation', self.NS).attrib.get('missingValue', '').split()) for code_el in self._iter_code_elements_by_reference(codeschemeref_id_el.text.strip()): value_el = code_el.find('./l:Value', self.NS) code_value = value_el.text.strip() if value_el is not None and value_el.text not in [None, ''] else None missing_value = code_value in missing_values if missing_value: missing_values.remove(code_value) category_ref_id_el = code_el.find('./l:CategoryReference/r:ID', self.NS) if category_ref_id_el is None or category_ref_id_el.text in [None, '']: if code_value is not None: variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value) continue category_el = self._find_by_reference(category_ref_id_el, ".//l:Category") label_els = category_el.findall('./r:Label', self.NS) if category_el is not None else [] if label_els == []: if code_value is not None: variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value) continue for label_el in label_els: # label_text may be None or '' label_text = label_el.text.strip() if label_el.text is not None else None variable.add_codelist_codes(code_value, self._get_xmllang(label_el, default=self._study_unit_language), label=label_text, missing=missing_value) for missing_value in missing_values: variable.add_codelist_codes(missing_value, self._study_unit_language, missing=True) @property def variables(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Variable`. :returns: Generator to Populate Document Store Variable records. """ if self.study_number is None: self._parse_study_number() for var_el in self._variable_elements: variable = self._variable_cls() variable.add_study_number(self.study_number_identifier) self._map_to_record(variable, var_el, self._variable_maps) self._add_codelist_codes_to_variable(variable, var_el) yield variable @property def _question_maps(self): return [ (self._question_cls.add_question_identifier, self._map_single('./r:UserID', required=True). set_value_conversion(as_valid_identifier)), (self._question_cls.add_question_texts, self._map_multi('./dc:QuestionText'). set_value_getter(element_remove_whitespaces)) ] def _iter_question_elements_by_reference_elements(self, ref_id_elements): for ref_id_el in ref_id_elements: question_el = self._find_by_reference(ref_id_el, ".//dc:QuestionItem") if question_el is not None: yield question_el @property def questions(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Question`. :returns: Generator to Populate Document Store Question records. """ if self.study_number is None: self._parse_study_number() for var_el in self._variable_elements: for question_el in self._iter_question_elements_by_reference_elements( var_el.findall('./l:QuestionReference/r:ID', self.NS)): question = self._question_cls() self._map_to_record(question, question_el, self._question_maps) question.add_study_number(self.study_number_identifier) var_name = var_el.find('./l:VariableName', self.NS) if var_name is not None and var_name.text not in [None, '']: question.add_variable_name(as_valid_identifier(var_name.text.strip())) codelist_ref_el = question_el.find('./dc:CodeDomain/r:CodeSchemeReference/r:ID', self.NS) if codelist_ref_el is None or codelist_ref_el.text in [None, '']: yield question continue for code_el in self._iter_code_elements_by_reference(codelist_ref_el.text.strip()): self._map_to_record(question, code_el, [(self._question_cls.add_codelist_references, self._map_multi('./l:Value'))]) yield question @property def _study_group_elements(self): """Generator iterates group elements which contain g:StudyUnit children. :returns: Generator yielding g:Group elements. """ for group_el in self.root_element.findall('.//g:Group', self.NS): if '{%s}StudyUnit' % (self.NS['g'],) in [_.tag for _ in group_el]: yield group_el @property def _study_group_maps(self): """These are relative to g:Group""" return [ (self._studygroup_cls.add_study_group_identifier, self._map_single('./r:UserID', required=True) .set_value_conversion(as_valid_identifier)), (self._studygroup_cls.add_study_group_names, self._map_multi(_XPATH_REL_CITATION_TITLE)), (self._studygroup_cls.add_descriptions, self._map_multi('./g:Abstract/r:Content')), (self._studygroup_cls.add_uris, self._map_single('.', 'externalReferenceDefaultURI', localizable=True)) ] @property def study_groups(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.StudyGroup`. :returns: Generator to Populate Document Store StudyGroup records. """ for group_el in self._study_group_elements: study_group = self._studygroup_cls() self._map_to_record(study_group, group_el, self._study_group_maps) for g_study_unit_el in group_el.findall('./g:StudyUnit', self.NS): study_number = None if '{%s}StudyUnit' % (self.NS['s'],) in [_.tag for _ in g_study_unit_el]: study_number = self._get_study_number_from_study_unit_element( g_study_unit_el.find('./s:StudyUnit', self.NS), raise_error_if_missing=False) else: ref_id_el = g_study_unit_el.find('./g:Reference/r:ID', self.NS) referenced_study_unit_el = self._find_by_reference(ref_id_el, ".//s:StudyUnit") if referenced_study_unit_el is None: continue study_number = self._get_study_number_from_study_unit_element(referenced_study_unit_el, raise_error_if_missing=False) if study_number is not None: study_group.add_study_numbers(as_valid_identifier(study_number)) yield study_group