#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2025 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
#
"""Mapping profiles for DDI 3.1."""
from kuha_common.document_store.mappings.exceptions import UnknownXMLRoot, MappingError
from kuha_common.document_store.mappings.xmlbase import (
MappedParams,
str_equals,
fixed_value,
element_remove_whitespaces,
get_preferred_publication_id_agency_pair,
)
from kuha_common.document_store.mappings.ddi.lifecycle import DDILifecycleParserBase
_XPATH_REL_CITATION_TITLE = './r:Citation/r:Title'
[docs]
class DDI31RecordParser(DDILifecycleParserBase):
"""Parse Document Store records from DDI 3.1. XML
Check the root element. Expects either ddi:DDIInstance or s:StudyUnit.
Currently supports only single s:StudyUnit element within the root.
:param root_element: XML root element.
:type root_element: :obj:`xml.etree.ElementTree.Element`
:raises: :exc:`UnknownXMLRoot` for unexpected root element.
:raises: :exc:`MappingError` if root contains more or less that exactly one
s:StudyUnit child.
"""
#: XML namespaces
NS = {
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'xml': 'http://www.w3.org/XML/1998/namespace',
'xhtml': 'http://www.w3.org/1999/xhtml',
'ddi': 'ddi:instance:3_1',
's': 'ddi:studyunit:3_1',
'pd': 'ddi:physicaldataproduct:3_1',
'pi': 'ddi:physicalinstance:3_1',
'c': 'ddi:conceptualcomponent:3_1',
'l': 'ddi:logicalproduct:3_1',
'r': 'ddi:reusable:3_1',
'g': 'ddi:group:3_1',
'dc': 'ddi:datacollection:3_1',
'a': 'ddi:archive:3_1',
}
def _find_study_unit_element(self, root_element):
expected_studyunit_root = '{%s}StudyUnit' % (self.NS['s'],)
if self._is_DDIInstance(root_element):
study_unit_elements = list(root_element.iter('{%s}StudyUnit' % (self.NS['s'],)))
study_unit_count = len(study_unit_elements)
if study_unit_count > 1:
# Currently supports only a single s:StudyUnit in xml metadata.
raise MappingError("Unable to parse multiple StudyUnit elements")
if study_unit_count < 1:
raise MappingError("Unable to find StudyUnit element")
return study_unit_elements.pop()
elif root_element.tag == expected_studyunit_root:
return root_element
else:
raise UnknownXMLRoot(root_element.tag, self._DDIInstance_tag(), expected_studyunit_root)
def _iter_reference_values(self, xpath_to_parent, element, *elements):
xpath = '%s/r:ID' % (xpath_to_parent,)
elements = (element,) + elements
for ref_el in self._findall_from_elements(elements, xpath):
if ref_el is not None or ref_el.text not in ('', None):
yield ''.join(ref_el.itertext())
def _find_by_reference_value(self, ref_val, xpath, element=None):
element = element or self.root_element
return element.find('{base}[@id="{ref_id}"]'.format(base=xpath, ref_id=ref_val), self.NS)
def _get_spatial_coverage_from_study_unit(self, study_unit_element):
"""Get SpatialCoverage element from StudyUnit element.
The SpatialCoverage element may be an inline child element of
StudyUnit or referenced by SpatialCoverageReference.
:param study_unit_element: s:StudyUnit element
:type study_unit_element: :obj:`xml.etree.ElementTree.Element`
:returns: SpatialCoverage of the StudyUnit.
:rtype: :obj:`xml.etree.ElementTree.Element` or None
"""
ref_id_el = study_unit_element.find('./r:Coverage/r:SpatialCoverageReference/r:ID', self.NS)
candidate = self._find_by_reference(ref_id_el, './/r:SpatialCoverage')
if candidate is None:
candidate = study_unit_element.find('./r:Coverage/r:SpatialCoverage', self.NS)
return candidate
def _iter_physical_instances_from_study_unit(self, study_unit_element):
for ref_id_el in study_unit_element.findall('./s:PhysicalInstanceReference/r:ID', self.NS):
candidate = self._find_by_reference(ref_id_el, './/pi:PhysicalInstance')
if candidate is not None:
yield candidate
for physical_instance_el in study_unit_element.findall('./pi:PhysicalInstance', self.NS):
yield physical_instance_el
def _iter_archives_from_element(self, element):
archive_el = element.find('./a:Archive', self.NS)
if archive_el is not None:
yield archive_el
for archive_el in self._find_and_iter_referred_els('./s:ArchiveReference', './/a:Archive', element):
yield archive_el
def _iter_other_materials_from_study_unit(self, study_unit_element):
for oth_mat_el in self._findall('./r:OtherMaterial', study_unit_element):
yield oth_mat_el
def _iter_funding_informations_from_study_unit(self, study_unit_element):
yield from self._findall('./r:FundingInformation', study_unit_element)
def _iter_study_uris_as_mapped_params(self):
"""Generate Study Uris as :obj:`MappedParams`
:returns: Generator yielding :obj:`MappedParams`
"""
archive_els = self._iter_archives_from_element(self.study_unit_element)
for archive_spec_el in self._findall_from_elements(archive_els, './a:ArchiveSpecific'):
# First get the actual URI.
uri_el = archive_spec_el.find("./a:Collection/[a:CallNumber='{}']/a:URI".format(self.study_number), self.NS)
if uri_el is None:
uri_el = archive_spec_el.find("./a:Item/[a:CallNumber='{}']/a:URI".format(self.study_number), self.NS)
if uri_el is not None:
uri = uri_el.text
uri_lang = self._get_xmllang(uri_el, default=self._study_unit_language)
else:
uri = None
org_reference_id_el = archive_spec_el.find('./a:ArchiveOrganizationReference/r:ID', self.NS)
if org_reference_id_el is None:
if uri_el is None:
continue
param = MappedParams(uri)
param.set_language(uri_lang)
yield param
continue
# Then get the organization if referenced.
org_el = self._find_by_reference(org_reference_id_el, './/a:Organization')
if org_el is None:
# Unable to find referenced organization.
continue
for org_name_el in org_el.findall('./a:OrganizationName', self.NS):
param = MappedParams(uri)
param.set_language(self._get_xmllang(org_name_el, default=self._study_unit_language))
param.keyword_arguments.update({self._study_cls.study_uris.attr_location.name: org_name_el.text})
# Try to find localized element.
org_desc_el = org_el.find("./r:Description[@xml:lang='{}']".format(param.get_language()), self.NS)
if org_desc_el is not None:
param.keyword_arguments.update({self._study_cls.study_uris.attr_description.name: org_desc_el.text})
elif param.get_language() == self._study_unit_language:
# If language equals the language of the StudyUnit,
# accept Description without locale.
org_desc_el = org_el.find('./r:Description', self.NS)
if org_desc_el is not None:
param.keyword_arguments.update(
{self._study_cls.study_uris.attr_description.name: org_desc_el.text}
)
yield param
def _iter_universes_as_mapped_params(self):
"""Generate Universes as :obj:`MappedParams`.
:returns: Generator yielding :obj:`MappedParams`
"""
inc_to_bool = str_equals('true', True)
for ref_id_el in self.study_unit_element.findall('./r:UniverseReference/r:ID', self.NS):
universe_el = self._find_by_reference(ref_id_el, ".//c:Universe")
if universe_el is None:
continue
included = inc_to_bool(universe_el.attrib.get('isInclusive'))
for desc in universe_el.findall('./c:HumanReadable', self.NS):
param = MappedParams(desc.text)
param.set_language(self._get_xmllang(desc, default=self._study_unit_language))
param.keyword_arguments.update({self._study_cls.universes.attr_included.name: included})
yield param
def _iter_identifiers_as_mapped_params(self):
"""Generate Identifiers as :obj:`MappedParams`
Will not discard duplicates.
:returns: Generator yielding :obj:`MappedParams`
"""
# Cast to list because iterabing multiple times througt
archive_els = list(self._iter_archives_from_element(self.study_unit_element))
def _param_from_els(els):
for element in els:
if element.text in [None, '']:
continue
param = MappedParams(''.join(element.itertext()))
param.set_language(self._get_xmllang(element, default=self._study_unit_language))
yield param
for param in _param_from_els(
self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber')
):
yield param
for param in _param_from_els(
self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber')
):
yield param
for param in _param_from_els(
self.study_unit_element.findall('./r:Citation/r:InternationalIdentifier', self.NS)
):
yield param
def _iter_mapped_params_from_geography_element(self, *geography_els):
geography_els = filter(lambda x: x is not None, geography_els)
for name_el in self._findall_from_elements(geography_els, './r:Level/r:Name'):
if name_el.text in [None, '']:
continue
params = MappedParams("".join(name_el.itertext()))
params.set_language(self._get_xmllang(name_el, default=self._study_unit_language))
yield params
def _iter_mapped_params_from_geographyvalue_elements(self, geogvalue_els):
for geogvalue_el in geogvalue_els:
code_el = geogvalue_el.find('./r:GeographyCode/r:Value', self.NS)
code_value = code_el.text if code_el is not None and code_el.text not in [None, ''] else None
for name_el in geogvalue_el.findall('./r:GeographyName', self.NS):
params = MappedParams(''.join(name_el.itertext()))
params.set_language(self._get_xmllang(name_el, default=self._study_unit_language))
if code_value is not None:
params.keyword_arguments.update(
{self._study_cls.study_area_countries.attr_abbreviation.name: code_value}
)
yield params
def _iter_study_area_countries_as_mapped_params(self):
spatial_coverage_el = self._get_spatial_coverage_from_study_unit(self.study_unit_element)
if spatial_coverage_el is None:
return
# TopLevelReference & LowestLevelReference are mandatory children of SpatialCoverage
for parentlevel_ref_el in (
spatial_coverage_el.find('./r:TopLevelReference', self.NS),
spatial_coverage_el.find('./r:LowestLevelReference', self.NS),
):
ref_id_el = parentlevel_ref_el.find('./r:LevelReference/r:ID', self.NS)
if ref_id_el is None or ref_id_el.text in [None, '']:
level_name_el = parentlevel_ref_el.find('./r:LevelName', self.NS)
if level_name_el is None or level_name_el.text in [None, '']:
continue
params = MappedParams("".join(level_name_el.itertext()))
params.set_language(self._study_unit_language)
yield params
continue
geostruct_el = self._find_by_reference(ref_id_el, './/r:GeographicStructure')
if geostruct_el is None:
continue
# Geography elements inline.
for params in self._iter_mapped_params_from_geography_element(
*geostruct_el.findall('./r:Geography', self.NS)
):
yield params
# Geography elements by reference.
for ref_id_el in geostruct_el.findall('./r:GeographyReference/r:ID', self.NS):
if ref_id_el.text in [None, '']:
continue
for params in self._iter_mapped_params_from_geography_element(
self._find_by_reference(ref_id_el, './/r:Geography')
):
yield params
for ref_id_el in spatial_coverage_el.findall('./r:GeographicLocationReference/r:ID', self.NS):
if ref_id_el.text in [None, '']:
continue
for params in self._iter_mapped_params_from_geographyvalue_elements(
self.root_element.findall(
'.//r:GeographicLocation[@id="{ref_id}"]/r:Values/r:GeographyValue'.format(
ref_id=ref_id_el.text.strip()
),
self.NS,
)
):
yield params
def _iter_funding_agencies_as_mapped_params(self):
for funding_info_el in self._iter_funding_informations_from_study_unit(self.study_unit_element):
role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el(funding_info_el)
# Data model currently supports only a single grant number attribute.
grant_number = grant_numbers.pop(0) if grant_numbers else None
description = (
''.join([elem.itertext() for elem in self._findall('./r:Description', funding_info_el)]) or None
)
for org_name_el in self._findall_from_elements(
self._find_and_iter_referred_els(
'./r:AgencyOrganizationReference', './/a:Organization', funding_info_el
),
'./a:OrganizationName',
):
params = MappedParams(''.join(org_name_el.itertext()))
params.set_language(self._get_xmllang(org_name_el, self.root_language))
params.keyword_arguments.update(
{
self._study_cls.funding_agencies.attr_grant_number.name: grant_number,
self._study_cls.funding_agencies.attr_role.name: role,
self._study_cls.funding_agencies.attr_description.name: description,
}
)
yield params
def _iter_other_materials_for_related_publications(self):
found = False
for oth_mat_el in self._iter_other_materials_from_study_unit(self.study_unit_element):
if oth_mat_el.attrib.get('type') == 'Related Publication':
yield oth_mat_el
found = True
if not found and self._ddiinstance_element is not None:
for oth_mat_el in self._findall('./g:ResourcePackage/r:OtherMaterial', self._ddiinstance_element):
if oth_mat_el.attrib.get('type') == 'Related Publication':
yield oth_mat_el
def _iter_related_publications_as_mapped_params(self):
for oth_mat_el in self._iter_other_materials_for_related_publications():
title_str_els = self._findall(_XPATH_REL_CITATION_TITLE, oth_mat_el)
ext_url_ref_el = self._find('r:ExternalURLReference', oth_mat_el)
uri = ''.join(ext_url_ref_el.itertext()) if ext_url_ref_el is not None else None
simple_date_el = self._find('./r:Citation/r:PublicationDate/r:SimpleDate', oth_mat_el)
distribution_date = ''.join(simple_date_el.itertext()) if simple_date_el is not None else None
ids_agencys = []
for id_el in self._findall('./r:Citation/r:InternationalIdentifier', oth_mat_el):
ids_agencys.append((''.join(id_el.itertext()), id_el.attrib.get('type')))
yield from self._iter_params_from_othmat_properties(
title_str_els,
uri=uri,
distribution_date=distribution_date,
id_agency_pair=get_preferred_publication_id_agency_pair(ids_agencys),
)
@property
def _study_maps(self):
return [
(self._study_cls.add_study_titles, self._map_multi(_XPATH_REL_CITATION_TITLE)),
(
self._study_cls.add_abstract,
self._map_multi('./s:Abstract/r:Content').set_value_getter(element_remove_whitespaces),
),
(
self._study_cls.add_principal_investigators,
self._map_multi('./r:Citation/r:Creator').add_attribute(
self._study_cls.principal_investigators.attr_organization.name, self._map_single('.', 'affiliation')
),
),
(self._study_cls.add_publishers, self._map_multi('./r:Citation/r:Publisher')),
(
self._study_cls.add_publication_years,
self._map_multi('./r:Citation/r:PublicationDate/r:SimpleDate').add_attribute(
self._study_cls.publication_years.attr_distribution_date.name, self._map_single('.')
),
),
(
self._study_cls.add_classifications,
self._map_multi('./r:Coverage/r:TopicalCoverage/r:Subject')
.set_value_conversion(fixed_value(None)) # DDI31 contains no @id.
.
# DDI31 contains no codelistname
# DDI31 contains no codelisturn
add_attribute(
self._study_cls.classifications.attr_system_name.name, self._map_single('.', 'codeListID')
).add_attribute(self._study_cls.classifications.attr_description.name, self._map_single('.')),
),
(
self._study_cls.add_keywords,
self._map_multi('./r:Coverage/r:TopicalCoverage/r:Keyword')
.set_value_conversion(fixed_value(None)) # DDI31 contains no @id.
.
# DDI31 contains no codelistname
# DDI31 contains no codelisturn
add_attribute(
self._study_cls.keywords.attr_system_name.name, self._map_single('.', 'codeListID')
).add_attribute(self._study_cls.keywords.attr_description.name, self._map_single('.')),
),
(
self._study_cls.add_analysis_units,
self._map_multi('./r:AnalysisUnit')
.add_attribute(
self._study_cls.analysis_units.attr_system_name.name, self._map_single('.', 'codeListID')
)
.add_attribute(self._study_cls.analysis_units.attr_uri.name, self._map_single('.', 'codeListURN')),
),
]
@property
def studies(self):
"""Parse XML to create and populate :obj:`kuha_common.document_store.records.Study`.
:returns: Generator to Populate Document Store Study record.
"""
if self.study_number is None:
self._parse_study_number()
study = self._study_cls()
study.add_study_number(self.study_number_identifier)
if self._is_DDIInstance(self.root_element):
# DDIInstance must be root of the document. If there is a DDIInstance, map its title into
# Study.document_titles.
# DDIInstance may only have a single r:Citation and r:Citation may only have a single
# r:Title
# Get r:Citation/r:AlternateTitle too.
self._map_to_record(
study,
self.root_element,
[
(
self._study_cls.add_document_titles,
self._map_single(_XPATH_REL_CITATION_TITLE, localizable=True),
),
(self._study_cls.add_document_titles, self._map_multi('./r:Citation/r:AlternateTitle')),
],
)
self._map_to_record(
study, self.study_unit_element, self._study_maps, default_language=self._study_unit_language
)
# DDI3 has references and is much too complex to use XMLMapper for all elements.
# Use custom mappings for certain elements.
# These are relative to dc:DataCollection
data_coll_maps = [
(
self._study_cls.add_time_methods,
self._map_multi('./dc:Methodology/dc:TimeMethod')
.set_value_getter(self.child_text('r:UserID'))
.add_attribute(
self._study_cls.time_methods.attr_description.name,
self._map_single('./r:Content', localizable=True),
provides_main_lang=True,
)
.add_attribute(
self._study_cls.time_methods.attr_system_name.name, self._map_single('./r:UserID', 'type')
),
),
(
self._study_cls.add_sampling_procedures,
self._map_multi('./dc:Methodology/dc:SamplingProcedure')
.set_value_getter(self.child_text('r:UserID'))
.add_attribute(
self._study_cls.sampling_procedures.attr_description.name,
self._map_single('./r:Content', localizable=True),
provides_main_lang=True,
)
.add_attribute(
self._study_cls.sampling_procedures.attr_system_name.name, self._map_single('./r:UserID', 'type')
),
),
(
self._study_cls.add_collection_modes,
self._map_multi('./dc:CollectionEvent/dc:ModeOfCollection')
.set_value_getter(self.child_text('r:UserID'))
.add_attribute(
self._study_cls.collection_modes.attr_description.name,
self._map_single('./r:Content', localizable=True),
provides_main_lang=True,
)
.add_attribute(
self._study_cls.collection_modes.attr_system_name.name, self._map_single('./r:UserID', 'type')
),
),
]
for data_collection_element in self._iter_data_collections_from_element(self.study_unit_element):
self._map_to_record(
study, data_collection_element, data_coll_maps, default_language=self._study_unit_language
)
# These are relative to pi:PhysicalInstance
physical_instance_maps = [
(self._study_cls.add_file_names, self._map_multi('./pi:DataFileIdentification/pi:URI'))
]
for physical_instance_element in self._iter_physical_instances_from_study_unit(self.study_unit_element):
self._map_to_record(
study, physical_instance_element, physical_instance_maps, default_language=self._study_unit_language
)
# These are relative to a:Archive
archive_maps = [
(self._study_cls.add_data_access, self._map_multi('./a:ArchiveSpecific/a:DefaultAccess/a:Restrictions')),
(
self._study_cls.add_data_access_descriptions,
self._map_multi('./a:ArchiveSpecific/a:DefaultAccess/a:AccessConditions'),
),
]
for archive_element in self._iter_archives_from_element(self.study_unit_element):
self._map_to_record(study, archive_element, archive_maps, default_language=self._study_unit_language)
for add_func, mapping in [
(study.add_collection_periods, self._iter_collection_periods_as_mapped_params),
(study.add_study_uris, self._iter_study_uris_as_mapped_params),
(study.add_universes, self._iter_universes_as_mapped_params),
(study.add_study_area_countries, self._iter_study_area_countries_as_mapped_params),
(study.add_identifiers, self._iter_identifiers_as_mapped_params),
(study.add_funding_agencies, self._iter_funding_agencies_as_mapped_params),
(study.add_grant_numbers, self._iter_grant_numbers_as_mapped_params),
(study.add_related_publications, self._iter_related_publications_as_mapped_params),
]:
for params in mapping():
add_func(*params.arguments, **params.keyword_arguments)
yield study