Source code for kuha_common.document_store.mappings.ddi.ddi31

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2025 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
#
"""Mapping profiles for DDI 3.1."""

from kuha_common.document_store.mappings.exceptions import UnknownXMLRoot, MappingError
from kuha_common.document_store.mappings.xmlbase import (
    MappedParams,
    str_equals,
    fixed_value,
    element_remove_whitespaces,
    get_preferred_publication_id_agency_pair,
)
from kuha_common.document_store.mappings.ddi.lifecycle import DDILifecycleParserBase


_XPATH_REL_CITATION_TITLE = './r:Citation/r:Title'


[docs] class DDI31RecordParser(DDILifecycleParserBase): """Parse Document Store records from DDI 3.1. XML Check the root element. Expects either ddi:DDIInstance or s:StudyUnit. Currently supports only single s:StudyUnit element within the root. :param root_element: XML root element. :type root_element: :obj:`xml.etree.ElementTree.Element` :raises: :exc:`UnknownXMLRoot` for unexpected root element. :raises: :exc:`MappingError` if root contains more or less that exactly one s:StudyUnit child. """ #: XML namespaces NS = { 'xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'xml': 'http://www.w3.org/XML/1998/namespace', 'xhtml': 'http://www.w3.org/1999/xhtml', 'ddi': 'ddi:instance:3_1', 's': 'ddi:studyunit:3_1', 'pd': 'ddi:physicaldataproduct:3_1', 'pi': 'ddi:physicalinstance:3_1', 'c': 'ddi:conceptualcomponent:3_1', 'l': 'ddi:logicalproduct:3_1', 'r': 'ddi:reusable:3_1', 'g': 'ddi:group:3_1', 'dc': 'ddi:datacollection:3_1', 'a': 'ddi:archive:3_1', } def _find_study_unit_element(self, root_element): expected_studyunit_root = '{%s}StudyUnit' % (self.NS['s'],) if self._is_DDIInstance(root_element): study_unit_elements = list(root_element.iter('{%s}StudyUnit' % (self.NS['s'],))) study_unit_count = len(study_unit_elements) if study_unit_count > 1: # Currently supports only a single s:StudyUnit in xml metadata. raise MappingError("Unable to parse multiple StudyUnit elements") if study_unit_count < 1: raise MappingError("Unable to find StudyUnit element") return study_unit_elements.pop() elif root_element.tag == expected_studyunit_root: return root_element else: raise UnknownXMLRoot(root_element.tag, self._DDIInstance_tag(), expected_studyunit_root) def _iter_reference_values(self, xpath_to_parent, element, *elements): xpath = '%s/r:ID' % (xpath_to_parent,) elements = (element,) + elements for ref_el in self._findall_from_elements(elements, xpath): if ref_el is not None or ref_el.text not in ('', None): yield ''.join(ref_el.itertext()) def _find_by_reference_value(self, ref_val, xpath, element=None): element = element or self.root_element return element.find('{base}[@id="{ref_id}"]'.format(base=xpath, ref_id=ref_val), self.NS) def _get_spatial_coverage_from_study_unit(self, study_unit_element): """Get SpatialCoverage element from StudyUnit element. The SpatialCoverage element may be an inline child element of StudyUnit or referenced by SpatialCoverageReference. :param study_unit_element: s:StudyUnit element :type study_unit_element: :obj:`xml.etree.ElementTree.Element` :returns: SpatialCoverage of the StudyUnit. :rtype: :obj:`xml.etree.ElementTree.Element` or None """ ref_id_el = study_unit_element.find('./r:Coverage/r:SpatialCoverageReference/r:ID', self.NS) candidate = self._find_by_reference(ref_id_el, './/r:SpatialCoverage') if candidate is None: candidate = study_unit_element.find('./r:Coverage/r:SpatialCoverage', self.NS) return candidate def _iter_physical_instances_from_study_unit(self, study_unit_element): for ref_id_el in study_unit_element.findall('./s:PhysicalInstanceReference/r:ID', self.NS): candidate = self._find_by_reference(ref_id_el, './/pi:PhysicalInstance') if candidate is not None: yield candidate for physical_instance_el in study_unit_element.findall('./pi:PhysicalInstance', self.NS): yield physical_instance_el def _iter_archives_from_element(self, element): archive_el = element.find('./a:Archive', self.NS) if archive_el is not None: yield archive_el for archive_el in self._find_and_iter_referred_els('./s:ArchiveReference', './/a:Archive', element): yield archive_el def _iter_other_materials_from_study_unit(self, study_unit_element): for oth_mat_el in self._findall('./r:OtherMaterial', study_unit_element): yield oth_mat_el def _iter_funding_informations_from_study_unit(self, study_unit_element): yield from self._findall('./r:FundingInformation', study_unit_element) def _iter_study_uris_as_mapped_params(self): """Generate Study Uris as :obj:`MappedParams` :returns: Generator yielding :obj:`MappedParams` """ archive_els = self._iter_archives_from_element(self.study_unit_element) for archive_spec_el in self._findall_from_elements(archive_els, './a:ArchiveSpecific'): # First get the actual URI. uri_el = archive_spec_el.find("./a:Collection/[a:CallNumber='{}']/a:URI".format(self.study_number), self.NS) if uri_el is None: uri_el = archive_spec_el.find("./a:Item/[a:CallNumber='{}']/a:URI".format(self.study_number), self.NS) if uri_el is not None: uri = uri_el.text uri_lang = self._get_xmllang(uri_el, default=self._study_unit_language) else: uri = None org_reference_id_el = archive_spec_el.find('./a:ArchiveOrganizationReference/r:ID', self.NS) if org_reference_id_el is None: if uri_el is None: continue param = MappedParams(uri) param.set_language(uri_lang) yield param continue # Then get the organization if referenced. org_el = self._find_by_reference(org_reference_id_el, './/a:Organization') if org_el is None: # Unable to find referenced organization. continue for org_name_el in org_el.findall('./a:OrganizationName', self.NS): param = MappedParams(uri) param.set_language(self._get_xmllang(org_name_el, default=self._study_unit_language)) param.keyword_arguments.update({self._study_cls.study_uris.attr_location.name: org_name_el.text}) # Try to find localized element. org_desc_el = org_el.find("./r:Description[@xml:lang='{}']".format(param.get_language()), self.NS) if org_desc_el is not None: param.keyword_arguments.update({self._study_cls.study_uris.attr_description.name: org_desc_el.text}) elif param.get_language() == self._study_unit_language: # If language equals the language of the StudyUnit, # accept Description without locale. org_desc_el = org_el.find('./r:Description', self.NS) if org_desc_el is not None: param.keyword_arguments.update( {self._study_cls.study_uris.attr_description.name: org_desc_el.text} ) yield param def _iter_universes_as_mapped_params(self): """Generate Universes as :obj:`MappedParams`. :returns: Generator yielding :obj:`MappedParams` """ inc_to_bool = str_equals('true', True) for ref_id_el in self.study_unit_element.findall('./r:UniverseReference/r:ID', self.NS): universe_el = self._find_by_reference(ref_id_el, ".//c:Universe") if universe_el is None: continue included = inc_to_bool(universe_el.attrib.get('isInclusive')) for desc in universe_el.findall('./c:HumanReadable', self.NS): param = MappedParams(desc.text) param.set_language(self._get_xmllang(desc, default=self._study_unit_language)) param.keyword_arguments.update({self._study_cls.universes.attr_included.name: included}) yield param def _iter_identifiers_as_mapped_params(self): """Generate Identifiers as :obj:`MappedParams` Will not discard duplicates. :returns: Generator yielding :obj:`MappedParams` """ # Cast to list because iterabing multiple times througt archive_els = list(self._iter_archives_from_element(self.study_unit_element)) def _param_from_els(els): for element in els: if element.text in [None, '']: continue param = MappedParams(''.join(element.itertext())) param.set_language(self._get_xmllang(element, default=self._study_unit_language)) yield param for param in _param_from_els( self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber') ): yield param for param in _param_from_els( self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber') ): yield param for param in _param_from_els( self.study_unit_element.findall('./r:Citation/r:InternationalIdentifier', self.NS) ): yield param def _iter_mapped_params_from_geography_element(self, *geography_els): geography_els = filter(lambda x: x is not None, geography_els) for name_el in self._findall_from_elements(geography_els, './r:Level/r:Name'): if name_el.text in [None, '']: continue params = MappedParams("".join(name_el.itertext())) params.set_language(self._get_xmllang(name_el, default=self._study_unit_language)) yield params def _iter_mapped_params_from_geographyvalue_elements(self, geogvalue_els): for geogvalue_el in geogvalue_els: code_el = geogvalue_el.find('./r:GeographyCode/r:Value', self.NS) code_value = code_el.text if code_el is not None and code_el.text not in [None, ''] else None for name_el in geogvalue_el.findall('./r:GeographyName', self.NS): params = MappedParams(''.join(name_el.itertext())) params.set_language(self._get_xmllang(name_el, default=self._study_unit_language)) if code_value is not None: params.keyword_arguments.update( {self._study_cls.study_area_countries.attr_abbreviation.name: code_value} ) yield params def _iter_study_area_countries_as_mapped_params(self): spatial_coverage_el = self._get_spatial_coverage_from_study_unit(self.study_unit_element) if spatial_coverage_el is None: return # TopLevelReference & LowestLevelReference are mandatory children of SpatialCoverage for parentlevel_ref_el in ( spatial_coverage_el.find('./r:TopLevelReference', self.NS), spatial_coverage_el.find('./r:LowestLevelReference', self.NS), ): ref_id_el = parentlevel_ref_el.find('./r:LevelReference/r:ID', self.NS) if ref_id_el is None or ref_id_el.text in [None, '']: level_name_el = parentlevel_ref_el.find('./r:LevelName', self.NS) if level_name_el is None or level_name_el.text in [None, '']: continue params = MappedParams("".join(level_name_el.itertext())) params.set_language(self._study_unit_language) yield params continue geostruct_el = self._find_by_reference(ref_id_el, './/r:GeographicStructure') if geostruct_el is None: continue # Geography elements inline. for params in self._iter_mapped_params_from_geography_element( *geostruct_el.findall('./r:Geography', self.NS) ): yield params # Geography elements by reference. for ref_id_el in geostruct_el.findall('./r:GeographyReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue for params in self._iter_mapped_params_from_geography_element( self._find_by_reference(ref_id_el, './/r:Geography') ): yield params for ref_id_el in spatial_coverage_el.findall('./r:GeographicLocationReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue for params in self._iter_mapped_params_from_geographyvalue_elements( self.root_element.findall( './/r:GeographicLocation[@id="{ref_id}"]/r:Values/r:GeographyValue'.format( ref_id=ref_id_el.text.strip() ), self.NS, ) ): yield params def _iter_funding_agencies_as_mapped_params(self): for funding_info_el in self._iter_funding_informations_from_study_unit(self.study_unit_element): role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el(funding_info_el) # Data model currently supports only a single grant number attribute. grant_number = grant_numbers.pop(0) if grant_numbers else None description = ( ''.join([elem.itertext() for elem in self._findall('./r:Description', funding_info_el)]) or None ) for org_name_el in self._findall_from_elements( self._find_and_iter_referred_els( './r:AgencyOrganizationReference', './/a:Organization', funding_info_el ), './a:OrganizationName', ): params = MappedParams(''.join(org_name_el.itertext())) params.set_language(self._get_xmllang(org_name_el, self.root_language)) params.keyword_arguments.update( { self._study_cls.funding_agencies.attr_grant_number.name: grant_number, self._study_cls.funding_agencies.attr_role.name: role, self._study_cls.funding_agencies.attr_description.name: description, } ) yield params def _iter_other_materials_for_related_publications(self): found = False for oth_mat_el in self._iter_other_materials_from_study_unit(self.study_unit_element): if oth_mat_el.attrib.get('type') == 'Related Publication': yield oth_mat_el found = True if not found and self._ddiinstance_element is not None: for oth_mat_el in self._findall('./g:ResourcePackage/r:OtherMaterial', self._ddiinstance_element): if oth_mat_el.attrib.get('type') == 'Related Publication': yield oth_mat_el def _iter_related_publications_as_mapped_params(self): for oth_mat_el in self._iter_other_materials_for_related_publications(): title_str_els = self._findall(_XPATH_REL_CITATION_TITLE, oth_mat_el) ext_url_ref_el = self._find('r:ExternalURLReference', oth_mat_el) uri = ''.join(ext_url_ref_el.itertext()) if ext_url_ref_el is not None else None simple_date_el = self._find('./r:Citation/r:PublicationDate/r:SimpleDate', oth_mat_el) distribution_date = ''.join(simple_date_el.itertext()) if simple_date_el is not None else None ids_agencys = [] for id_el in self._findall('./r:Citation/r:InternationalIdentifier', oth_mat_el): ids_agencys.append((''.join(id_el.itertext()), id_el.attrib.get('type'))) yield from self._iter_params_from_othmat_properties( title_str_els, uri=uri, distribution_date=distribution_date, id_agency_pair=get_preferred_publication_id_agency_pair(ids_agencys), ) @property def _study_maps(self): return [ (self._study_cls.add_study_titles, self._map_multi(_XPATH_REL_CITATION_TITLE)), ( self._study_cls.add_abstract, self._map_multi('./s:Abstract/r:Content').set_value_getter(element_remove_whitespaces), ), ( self._study_cls.add_principal_investigators, self._map_multi('./r:Citation/r:Creator').add_attribute( self._study_cls.principal_investigators.attr_organization.name, self._map_single('.', 'affiliation') ), ), (self._study_cls.add_publishers, self._map_multi('./r:Citation/r:Publisher')), ( self._study_cls.add_publication_years, self._map_multi('./r:Citation/r:PublicationDate/r:SimpleDate').add_attribute( self._study_cls.publication_years.attr_distribution_date.name, self._map_single('.') ), ), ( self._study_cls.add_classifications, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Subject') .set_value_conversion(fixed_value(None)) # DDI31 contains no @id. . # DDI31 contains no codelistname # DDI31 contains no codelisturn add_attribute( self._study_cls.classifications.attr_system_name.name, self._map_single('.', 'codeListID') ).add_attribute(self._study_cls.classifications.attr_description.name, self._map_single('.')), ), ( self._study_cls.add_keywords, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Keyword') .set_value_conversion(fixed_value(None)) # DDI31 contains no @id. . # DDI31 contains no codelistname # DDI31 contains no codelisturn add_attribute( self._study_cls.keywords.attr_system_name.name, self._map_single('.', 'codeListID') ).add_attribute(self._study_cls.keywords.attr_description.name, self._map_single('.')), ), ( self._study_cls.add_analysis_units, self._map_multi('./r:AnalysisUnit') .add_attribute( self._study_cls.analysis_units.attr_system_name.name, self._map_single('.', 'codeListID') ) .add_attribute(self._study_cls.analysis_units.attr_uri.name, self._map_single('.', 'codeListURN')), ), ] @property def studies(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Study`. :returns: Generator to Populate Document Store Study record. """ if self.study_number is None: self._parse_study_number() study = self._study_cls() study.add_study_number(self.study_number_identifier) if self._is_DDIInstance(self.root_element): # DDIInstance must be root of the document. If there is a DDIInstance, map its title into # Study.document_titles. # DDIInstance may only have a single r:Citation and r:Citation may only have a single # r:Title # Get r:Citation/r:AlternateTitle too. self._map_to_record( study, self.root_element, [ ( self._study_cls.add_document_titles, self._map_single(_XPATH_REL_CITATION_TITLE, localizable=True), ), (self._study_cls.add_document_titles, self._map_multi('./r:Citation/r:AlternateTitle')), ], ) self._map_to_record( study, self.study_unit_element, self._study_maps, default_language=self._study_unit_language ) # DDI3 has references and is much too complex to use XMLMapper for all elements. # Use custom mappings for certain elements. # These are relative to dc:DataCollection data_coll_maps = [ ( self._study_cls.add_time_methods, self._map_multi('./dc:Methodology/dc:TimeMethod') .set_value_getter(self.child_text('r:UserID')) .add_attribute( self._study_cls.time_methods.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True, ) .add_attribute( self._study_cls.time_methods.attr_system_name.name, self._map_single('./r:UserID', 'type') ), ), ( self._study_cls.add_sampling_procedures, self._map_multi('./dc:Methodology/dc:SamplingProcedure') .set_value_getter(self.child_text('r:UserID')) .add_attribute( self._study_cls.sampling_procedures.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True, ) .add_attribute( self._study_cls.sampling_procedures.attr_system_name.name, self._map_single('./r:UserID', 'type') ), ), ( self._study_cls.add_collection_modes, self._map_multi('./dc:CollectionEvent/dc:ModeOfCollection') .set_value_getter(self.child_text('r:UserID')) .add_attribute( self._study_cls.collection_modes.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True, ) .add_attribute( self._study_cls.collection_modes.attr_system_name.name, self._map_single('./r:UserID', 'type') ), ), ] for data_collection_element in self._iter_data_collections_from_element(self.study_unit_element): self._map_to_record( study, data_collection_element, data_coll_maps, default_language=self._study_unit_language ) # These are relative to pi:PhysicalInstance physical_instance_maps = [ (self._study_cls.add_file_names, self._map_multi('./pi:DataFileIdentification/pi:URI')) ] for physical_instance_element in self._iter_physical_instances_from_study_unit(self.study_unit_element): self._map_to_record( study, physical_instance_element, physical_instance_maps, default_language=self._study_unit_language ) # These are relative to a:Archive archive_maps = [ (self._study_cls.add_data_access, self._map_multi('./a:ArchiveSpecific/a:DefaultAccess/a:Restrictions')), ( self._study_cls.add_data_access_descriptions, self._map_multi('./a:ArchiveSpecific/a:DefaultAccess/a:AccessConditions'), ), ] for archive_element in self._iter_archives_from_element(self.study_unit_element): self._map_to_record(study, archive_element, archive_maps, default_language=self._study_unit_language) for add_func, mapping in [ (study.add_collection_periods, self._iter_collection_periods_as_mapped_params), (study.add_study_uris, self._iter_study_uris_as_mapped_params), (study.add_universes, self._iter_universes_as_mapped_params), (study.add_study_area_countries, self._iter_study_area_countries_as_mapped_params), (study.add_identifiers, self._iter_identifiers_as_mapped_params), (study.add_funding_agencies, self._iter_funding_agencies_as_mapped_params), (study.add_grant_numbers, self._iter_grant_numbers_as_mapped_params), (study.add_related_publications, self._iter_related_publications_as_mapped_params), ]: for params in mapping(): add_func(*params.arguments, **params.keyword_arguments) yield study