#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2022 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
#
"""Mapping profiles for DDI codebook versions."""
from itertools import chain
from kuha_common.document_store.mappings.exceptions import (
UnknownXMLRoot,
MissingRequiredAttribute,
MappingError
)
from kuha_common.document_store.mappings.xmlbase import (
MappedParams,
XMLParserBase,
as_valid_identifier,
str_equals,
fixed_value,
element_remove_whitespaces,
get_preferred_publication_id_agency_pair
)
_XPATH_REL_CITATION_TITLE = './r:Citation/r:Title'
[docs]class DDI31RecordParser(XMLParserBase):
"""Parse Document Store records from DDI 3.1. XML
Check the root element. Expects either ddi:DDIInstance or s:StudyUnit.
Currently supports only single s:StudyUnit element within the root.
:param root_element: XML root element.
:type root_element: :obj:`xml.etree.ElementTree.Element`
:raises: :exc:`UnknownXMLRoot` for unexpected root element.
:raises: :exc:`MappingError` if root contains more or less that exactly one
s:StudyUnit child.
"""
#: XML namespaces
NS = {'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'xml': 'http://www.w3.org/XML/1998/namespace',
'xhtml': 'http://www.w3.org/1999/xhtml',
'ddi': 'ddi:instance:3_1',
's': 'ddi:studyunit:3_1',
'pd': 'ddi:physicaldataproduct:3_1',
'pi': 'ddi:physicalinstance:3_1',
'c': 'ddi:conceptualcomponent:3_1',
'l': 'ddi:logicalproduct:3_1',
'r': 'ddi:reusable:3_1',
'g': 'ddi:group:3_1',
'dc': 'ddi:datacollection:3_1',
'a': 'ddi:archive:3_1'}
def __init__(self, root_element):
self._find_study_unit_element(root_element)
super().__init__(root_element)
@classmethod
def _DDIInstance_tag(cls):
return '{%s}DDIInstance' % (cls.NS['ddi'],)
@classmethod
def _is_DDIInstance(cls, element):
return element.tag == cls._DDIInstance_tag()
def _find_study_unit_element(self, root_element):
expected_studyunit_root = '{%s}StudyUnit' % (self.NS['s'],)
if self._is_DDIInstance(root_element):
study_unit_elements = list(root_element.iter('{%s}StudyUnit' % (self.NS['s'],)))
study_unit_count = len(study_unit_elements)
if study_unit_count > 1:
# Currently supports only a single s:StudyUnit in xml metadata.
raise MappingError("Unable to parse multiple StudyUnit elements")
if study_unit_count < 1:
raise MappingError("Unable to find StudyUnit element")
self.study_unit_element = study_unit_elements.pop()
elif root_element.tag == expected_studyunit_root:
self.study_unit_element = root_element
else:
raise UnknownXMLRoot(root_element.tag, self._DDIInstance_tag(), expected_studyunit_root)
@property
def _study_unit_language(self):
"""Get language of StudyUnit element. Returns :attr:`root_language` if
StudyUnit does not declare a language.
:returns: Language
:rtype: str
"""
return self._get_xmllang(self.study_unit_element, default=self.root_language)
def _get_ddiinstance_el(self):
if self._is_DDIInstance(self.root_element):
return self.root_element
return self._find('.//ddi:DDIInstance')
def _iter_reference_values(self, xpath_to_parent, element, *elements):
xpath = '%s/r:ID' % (xpath_to_parent,)
elements = (element,) + elements
for ref_el in self._findall_from_elements(elements, xpath):
if ref_el is not None or ref_el.text not in ('', None):
yield ''.join(ref_el.itertext())
def _find_by_reference_value(self, ref_val, xpath, element=None):
element = element or self.root_element
return element.find('{base}[@id="{ref_id}"]'.format(base=xpath, ref_id=ref_val),
self.NS)
def _find_by_reference(self, ref_el, xpath, element=None):
if ref_el is None or ref_el.text in ('', None):
return None
return self._find_by_reference_value(''.join(ref_el.itertext()).strip(), xpath, element=element)
def _find_and_iter_referred_els(self, ref_xpath, target_xpath, *ref_elements, lookup_element=None):
"""Finds referred elements and yields them one by one.
:param str ref_xpath: xpath to reference element
:param str target_xpath: xpath to target element
:param ref_elements: elements to search through for reference element
:param lookup_element: find target from this element's children instead of root
:returns: generator yielding referenced elements
"""
if ref_elements != ():
for ref_val in self._iter_reference_values(ref_xpath, *ref_elements):
candidate = self._find_by_reference_value(ref_val, target_xpath, element=lookup_element)
if candidate is not None:
yield candidate
def _get_study_number_from_study_unit_element(self, study_unit_element, raise_error_if_missing=True):
archive_els = list(self._iter_archives_from_study_unit(study_unit_element))
for candidate in self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber'):
if candidate is not None and candidate.text not in ['', None]:
return candidate.text
for candidate in self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber'):
if candidate is not None and candidate.text not in ['', None]:
return candidate.text
candidate = study_unit_element.find('./r:UserID', self.NS)
if candidate is not None and candidate.text not in ['', None]:
return candidate.text
if raise_error_if_missing:
raise MissingRequiredAttribute('./r:UserID',
'./a:Archive/a:ArchiveSpecific/a:Collection/a:CallNumber',
'./a:Archive/a:ArchiveSpecific/a:Item/a:CallNumber',
msg='Unable to find study number from %s, %s or %s')
return None
def _parse_study_number(self):
self.study_number = self._get_study_number_from_study_unit_element(self.study_unit_element)
def _get_spatial_coverage_from_study_unit(self, study_unit_element):
"""Get SpatialCoverage element from StudyUnit element.
The SpatialCoverage element may be an inline child element of
StudyUnit or referenced by SpatialCoverageReference.
:param study_unit_element: s:StudyUnit element
:type study_unit_element: :obj:`xml.etree.ElementTree.Element`
:returns: SpatialCoverage of the StudyUnit.
:rtype: :obj:`xml.etree.ElementTree.Element` or None
"""
ref_id_el = study_unit_element.find('./r:Coverage/r:SpatialCoverageReference/r:ID', self.NS)
candidate = self._find_by_reference(ref_id_el, './/r:SpatialCoverage')
if candidate is None:
candidate = study_unit_element.find('./r:Coverage/r:SpatialCoverage', self.NS)
return candidate
def _iter_data_collections_from_study_unit(self, study_unit_element):
for ref_id_el in study_unit_element.findall('./s:DataCollectionReference/r:ID', self.NS):
candidate = self._find_by_reference(ref_id_el, './/dc:DataCollection')
if candidate is not None:
yield candidate
for data_coll_el in study_unit_element.findall('./dc:DataCollection', self.NS):
yield data_coll_el
def _iter_physical_instances_from_study_unit(self, study_unit_element):
for ref_id_el in study_unit_element.findall('./s:PhysicalInstanceReference/r:ID', self.NS):
candidate = self._find_by_reference(ref_id_el, './/pi:PhysicalInstance')
if candidate is not None:
yield candidate
for physical_instance_el in study_unit_element.findall('./pi:PhysicalInstance', self.NS):
yield physical_instance_el
def _iter_archives_from_study_unit(self, study_unit_element):
archive_el = study_unit_element.find('./a:Archive', self.NS)
if archive_el is not None:
yield archive_el
for archive_el in self._find_and_iter_referred_els('./s:ArchiveReference', './/a:Archive', study_unit_element):
yield archive_el
def _iter_other_materials_from_study_unit(self, study_unit_element):
for oth_mat_el in self._findall('./r:OtherMaterial', study_unit_element):
yield oth_mat_el
def _iter_funding_informations_from_study_unit(self, study_unit_element):
yield from self._findall('./r:FundingInformation', study_unit_element)
def _iter_collection_periods_as_mapped_params(self):
"""Generate collection periods as :obj:`MappedParams`
Returns a generator which yields :obj:`MappedParams` instances
containing collection periods.
.. note:: DDI 3.1. supports only single DataCollectionDate
for each CollectionEvent. That is not enforced here.
:returns: Generator yielding collection periods.
"""
data_colls = self._iter_data_collections_from_study_unit(self.study_unit_element)
for dc_date in self._findall_from_elements(data_colls, './dc:CollectionEvent/dc:DataCollectionDate'):
simple_date = dc_date.find('./r:SimpleDate', self.NS)
if simple_date is not None:
params = MappedParams(simple_date.text)
params.set_language(self._study_unit_language)
params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: 'single'})
yield params
continue # DataCollectionDate can contain either SimpleDate or StartDate and EndDate
start_date = dc_date.find('./r:StartDate', self.NS)
if start_date is not None:
params = MappedParams(start_date.text)
params.set_language(self._study_unit_language)
params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: 'start'})
yield params
end_date = dc_date.find('./r:EndDate', self.NS)
if end_date is not None:
# It is a violation of the DDI31 standard to have an EndDate without
# a StartDate. Kuha won't mind however.
params = MappedParams(end_date.text)
params.set_language(self._study_unit_language)
params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: 'end'})
yield params
def _iter_study_uris_as_mapped_params(self):
"""Generate Study Uris as :obj:`MappedParams`
:returns: Generator yielding :obj:`MappedParams`
"""
archive_els = self._iter_archives_from_study_unit(self.study_unit_element)
for archive_spec_el in self._findall_from_elements(archive_els, './a:ArchiveSpecific'):
# First get the actual URI.
uri_el = archive_spec_el.find("./a:Collection/[a:CallNumber='{}']/a:URI".
format(self.study_number), self.NS)
if uri_el is None:
uri_el = archive_spec_el.find("./a:Item/[a:CallNumber='{}']/a:URI".
format(self.study_number), self.NS)
if uri_el is not None:
uri = uri_el.text
uri_lang = self._get_xmllang(uri_el, default=self._study_unit_language)
else:
uri = None
org_reference_id_el = archive_spec_el.find('./a:ArchiveOrganizationReference/r:ID', self.NS)
if org_reference_id_el is None:
if uri_el is None:
continue
param = MappedParams(uri)
param.set_language(uri_lang)
yield param
continue
# Then get the organization if referenced.
org_el = self._find_by_reference(org_reference_id_el, './/a:Organization')
if org_el is None:
# Unable to find referenced organization.
continue
for org_name_el in org_el.findall('./a:OrganizationName', self.NS):
param = MappedParams(uri)
param.set_language(self._get_xmllang(org_name_el, default=self._study_unit_language))
param.keyword_arguments.update({self._study_cls.study_uris.attr_location.name: org_name_el.text})
# Try to find localized element.
org_desc_el = org_el.find("./r:Description[@xml:lang='{}']".format(param.get_language()), self.NS)
if org_desc_el is not None:
param.keyword_arguments.update({
self._study_cls.study_uris.attr_description.name: org_desc_el.text})
elif param.get_language() == self._study_unit_language:
# If language equals the language of the StudyUnit,
# accept Description without locale.
org_desc_el = org_el.find('./r:Description', self.NS)
if org_desc_el is not None:
param.keyword_arguments.update({
self._study_cls.study_uris.attr_description.name: org_desc_el.text})
yield param
def _iter_universes_as_mapped_params(self):
"""Generate Universes as :obj:`MappedParams`.
:returns: Generator yielding :obj:`MappedParams`
"""
inc_to_bool = str_equals('true', True)
for ref_id_el in self.study_unit_element.findall('./r:UniverseReference/r:ID',
self.NS):
universe_el = self._find_by_reference(ref_id_el, ".//c:Universe")
if universe_el is None:
continue
included = inc_to_bool(universe_el.attrib.get('isInclusive'))
for desc in universe_el.findall('./c:HumanReadable', self.NS):
param = MappedParams(desc.text)
param.set_language(self._get_xmllang(desc, default=self._study_unit_language))
param.keyword_arguments.update({self._study_cls.universes.attr_included.name: included})
yield param
def _iter_identifiers_as_mapped_params(self):
"""Generate Identifiers as :obj:`MappedParams`
Will not discard duplicates.
:returns: Generator yielding :obj:`MappedParams`
"""
# Cast to list because iterabing multiple times througt
archive_els = list(self._iter_archives_from_study_unit(self.study_unit_element))
def _param_from_els(els):
for element in els:
if element.text in [None, '']:
continue
param = MappedParams(''.join(element.itertext()))
param.set_language(self._get_xmllang(element, default=self._study_unit_language))
yield param
for param in _param_from_els(
self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber')):
yield param
for param in _param_from_els(
self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber')):
yield param
for param in _param_from_els(
self.study_unit_element.findall('./r:Citation/r:InternationalIdentifier', self.NS)):
yield param
def _iter_mapped_params_from_geography_element(self, *geography_els):
geography_els = filter(lambda x: x is not None, geography_els)
for name_el in self._findall_from_elements(geography_els, './r:Level/r:Name'):
if name_el.text in [None, '']:
continue
params = MappedParams("".join(name_el.itertext()))
params.set_language(self._get_xmllang(name_el, default=self._study_unit_language))
yield params
def _iter_mapped_params_from_geographyvalue_elements(self, geogvalue_els):
for geogvalue_el in geogvalue_els:
code_el = geogvalue_el.find('./r:GeographyCode/r:Value', self.NS)
code_value = code_el.text if code_el is not None and code_el.text not in [None, ''] else None
for name_el in geogvalue_el.findall('./r:GeographyName', self.NS):
params = MappedParams(''.join(name_el.itertext()))
params.set_language(self._get_xmllang(name_el, default=self._study_unit_language))
if code_value is not None:
params.keyword_arguments.update(
{self._study_cls.study_area_countries.attr_abbreviation.name: code_value})
yield params
def _iter_study_area_countries_as_mapped_params(self):
spatial_coverage_el = self._get_spatial_coverage_from_study_unit(self.study_unit_element)
if spatial_coverage_el is None:
return
# TopLevelReference & LowestLevelReference are mandatory children of SpatialCoverage
for parentlevel_ref_el in (spatial_coverage_el.find('./r:TopLevelReference', self.NS),
spatial_coverage_el.find('./r:LowestLevelReference', self.NS)):
ref_id_el = parentlevel_ref_el.find('./r:LevelReference/r:ID', self.NS)
if ref_id_el is None or ref_id_el.text in [None, '']:
level_name_el = parentlevel_ref_el.find('./r:LevelName', self.NS)
if level_name_el is None or level_name_el.text in [None, '']:
continue
params = MappedParams("".join(level_name_el.itertext()))
params.set_language(self._study_unit_language)
yield params
continue
geostruct_el = self._find_by_reference(ref_id_el, './/r:GeographicStructure')
if geostruct_el is None:
continue
# Geography elements inline.
for params in self._iter_mapped_params_from_geography_element(
*geostruct_el.findall('./r:Geography', self.NS)):
yield params
# Geography elements by reference.
for ref_id_el in geostruct_el.findall('./r:GeographyReference/r:ID', self.NS):
if ref_id_el.text in [None, '']:
continue
for params in self._iter_mapped_params_from_geography_element(
self._find_by_reference(ref_id_el, './/r:Geography')):
yield params
for ref_id_el in spatial_coverage_el.findall('./r:GeographicLocationReference/r:ID', self.NS):
if ref_id_el.text in [None, '']:
continue
for params in self._iter_mapped_params_from_geographyvalue_elements(self.root_element.findall(
'.//r:GeographicLocation[@id="{ref_id}"]/r:Values/r:GeographyValue'
.format(ref_id=ref_id_el.text.strip()),
self.NS)):
yield params
def _get_role_and_grant_numbers_from_funding_info_el(self, funding_info_el):
role = funding_info_el.attrib.get('role')
grant_numbers = []
for grant_number_el in self._findall('./r:GrantNumber', funding_info_el):
grant_numbers.append(''.join(grant_number_el.itertext()))
return role, grant_numbers
def _iter_funding_agencies_as_mapped_params(self):
for funding_info_el in self._iter_funding_informations_from_study_unit(self.study_unit_element):
role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el(
funding_info_el)
# Data model currently supports only a single grant number attribute.
grant_number = grant_numbers.pop(0) if grant_numbers else None
description = ''.join([elem.itertext() for elem in self._findall('./r:Description', funding_info_el)])\
or None
for org_name_el in self._findall_from_elements(
self._find_and_iter_referred_els('./r:AgencyOrganizationReference',
'.//a:Organization',
funding_info_el),
'./a:OrganizationName'):
params = MappedParams(''.join(org_name_el.itertext()))
params.set_language(self._get_xmllang(org_name_el, self.root_language))
params.keyword_arguments.update({
self._study_cls.funding_agencies.attr_grant_number.name: grant_number,
self._study_cls.funding_agencies.attr_role.name: role,
self._study_cls.funding_agencies.attr_description.name: description})
yield params
def _iter_grant_numbers_as_mapped_params(self):
for funding_info_el in self._iter_funding_informations_from_study_unit(self.study_unit_element):
role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el(funding_info_el)
for grant_number in grant_numbers:
params = MappedParams(grant_number)
params.set_language(self._get_xmllang(funding_info_el, self.root_language))
params.keyword_arguments.update({
self._study_cls.grant_numbers.attr_role.name: role
})
yield params
def __iter_other_materials_for_related_publications(self):
found = False
for oth_mat_el in self._iter_other_materials_from_study_unit(self.study_unit_element):
if oth_mat_el.attrib.get('type') == 'Related Publication':
yield oth_mat_el
found = True
if not found:
ddi_instance_el = self._get_ddiinstance_el()
if ddi_instance_el:
for oth_mat_el in self._findall('./g:ResourcePackage/r:OtherMaterial',
ddi_instance_el):
if oth_mat_el.attrib.get('type') == 'Related Publication':
yield oth_mat_el
def _iter_params_from_othmat_properties(self, title_str_els, desc_str_els=None, uri=None,
distribution_date=None, id_agency_pair=None):
desc_str_els = desc_str_els or []
_id, agency = id_agency_pair if id_agency_pair else (None, None)
langs = [self._get_xmllang(element, self._study_unit_language) for element in
chain.from_iterable([title_str_els, desc_str_els])]
mapped_langs_values = {lang: {} for lang in langs}
for title_str_el in title_str_els:
mapped_langs_values[self._get_xmllang(title_str_el, self._study_unit_language)].update({
'title': ''.join(title_str_el.itertext())})
for desc_str_el in desc_str_els:
mapped_langs_values[self._get_xmllang(desc_str_el, self._study_unit_language)].update({
'desc': ''.join(desc_str_el.itertext())})
for lang, values in mapped_langs_values.items():
params = MappedParams(values.get('title'))
params.set_language(lang)
params.keyword_arguments.update({
self._study_cls.related_publications.attr_description.name: values.get('desc'),
self._study_cls.related_publications.attr_uri.name: uri,
self._study_cls.related_publications.attr_distribution_date.name: distribution_date,
self._study_cls.related_publications.attr_identifier.name: _id,
self._study_cls.related_publications.attr_identifier_agency.name: agency
})
yield params
def _iter_related_publications_as_mapped_params(self):
for oth_mat_el in self.__iter_other_materials_for_related_publications():
title_str_els = self._findall(_XPATH_REL_CITATION_TITLE, oth_mat_el)
ext_url_ref_el = self._find('r:ExternalURLReference', oth_mat_el)
uri = ''.join(ext_url_ref_el.itertext()) if ext_url_ref_el is not None else None
simple_date_el = self._find('./r:Citation/r:PublicationDate/r:SimpleDate', oth_mat_el)
distribution_date = ''.join(simple_date_el.itertext()) if simple_date_el is not None else None
ids_agencys = []
for id_el in self._findall('./r:Citation/r:InternationalIdentifier', oth_mat_el):
ids_agencys.append((''.join(id_el.itertext()),
id_el.attrib.get('type')))
yield from self._iter_params_from_othmat_properties(
title_str_els, uri=uri, distribution_date=distribution_date,
id_agency_pair=get_preferred_publication_id_agency_pair(ids_agencys))
@property
def _study_maps(self):
return [
(self._study_cls.add_study_titles, self._map_multi(_XPATH_REL_CITATION_TITLE)),
(self._study_cls.add_abstract, self._map_multi('./s:Abstract/r:Content').
set_value_getter(element_remove_whitespaces)),
(self._study_cls.add_principal_investigators, self._map_multi('./r:Citation/r:Creator').
add_attribute(self._study_cls.principal_investigators.attr_organization.name,
self._map_single('.', 'affiliation'))),
(self._study_cls.add_publishers, self._map_multi('./r:Citation/r:Publisher')),
(self._study_cls.add_publication_years, self._map_multi('./r:Citation/r:PublicationDate/r:SimpleDate').
add_attribute(self._study_cls.publication_years.attr_distribution_date.name, self._map_single('.'))),
(self._study_cls.add_classifications, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Subject').
set_value_conversion(fixed_value(None)). # DDI31 contains no @id.
# DDI31 contains no codelistname
# DDI31 contains no codelisturn
add_attribute(self._study_cls.classifications.attr_system_name.name, self._map_single('.', 'codeListID')).
add_attribute(self._study_cls.classifications.attr_description.name, self._map_single('.'))),
(self._study_cls.add_keywords, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Keyword').
set_value_conversion(fixed_value(None)). # DDI31 contains no @id.
# DDI31 contains no codelistname
# DDI31 contains no codelisturn
add_attribute(self._study_cls.keywords.attr_system_name.name, self._map_single('.', 'codeListID')).
add_attribute(self._study_cls.keywords.attr_description.name, self._map_single('.'))),
(self._study_cls.add_analysis_units, self._map_multi('./r:AnalysisUnit').
add_attribute(self._study_cls.analysis_units.attr_system_name.name, self._map_single('.', 'codeListID')).
add_attribute(self._study_cls.analysis_units.attr_uri.name, self._map_single('.', 'codeListURN')))
]
@property
def studies(self):
"""Parse XML to create and populate :obj:`kuha_common.document_store.records.Study`.
:returns: Generator to Populate Document Store Study record.
"""
if self.study_number is None:
self._parse_study_number()
study = self._study_cls()
study.add_study_number(self.study_number_identifier)
if self._is_DDIInstance(self.root_element):
# DDIInstance must be root of the document. If there is a DDIInstance, map its title into
# Study.document_titles.
# DDIInstance may only have a single r:Citation and r:Citation may only have a single
# r:Title
# Get r:Citation/r:AlternateTitle too.
self._map_to_record(study, self.root_element, [
(self._study_cls.add_document_titles, self._map_single(_XPATH_REL_CITATION_TITLE, localizable=True)),
(self._study_cls.add_document_titles, self._map_multi('./r:Citation/r:AlternateTitle'))
])
self._map_to_record(study, self.study_unit_element, self._study_maps,
default_language=self._study_unit_language)
# DDI3 has references and is much too complex to use XMLMapper for all elements.
# Use custom mappings for certain elements.
# These are relative to dc:DataCollection
data_coll_maps = [
(self._study_cls.add_time_methods, self._map_multi('./dc:Methodology/dc:TimeMethod').
set_value_getter(self.child_text('r:UserID')).
add_attribute(self._study_cls.time_methods.attr_description.name,
self._map_single('./r:Content', localizable=True),
provides_main_lang=True).
add_attribute(self._study_cls.time_methods.attr_system_name.name,
self._map_single('./r:UserID', 'type'))),
(self._study_cls.add_sampling_procedures, self._map_multi('./dc:Methodology/dc:SamplingProcedure').
set_value_getter(self.child_text('r:UserID')).
add_attribute(self._study_cls.sampling_procedures.attr_description.name,
self._map_single('./r:Content', localizable=True),
provides_main_lang=True).
add_attribute(self._study_cls.sampling_procedures.attr_system_name.name,
self._map_single('./r:UserID', 'type'))),
(self._study_cls.add_collection_modes, self._map_multi('./dc:CollectionEvent/dc:ModeOfCollection').
set_value_getter(self.child_text('r:UserID')).
add_attribute(self._study_cls.collection_modes.attr_description.name,
self._map_single('./r:Content', localizable=True),
provides_main_lang=True).
add_attribute(self._study_cls.collection_modes.attr_system_name.name,
self._map_single('./r:UserID', 'type'))),
]
for data_collection_element in self._iter_data_collections_from_study_unit(self.study_unit_element):
self._map_to_record(study, data_collection_element, data_coll_maps,
default_language=self._study_unit_language)
# These are relative to pi:PhysicalInstance
physical_instance_maps = [
(self._study_cls.add_file_names, self._map_multi('./pi:DataFileIdentification/pi:URI'))
]
for physical_instance_element in self._iter_physical_instances_from_study_unit(self.study_unit_element):
self._map_to_record(study, physical_instance_element, physical_instance_maps,
default_language=self._study_unit_language)
# These are relative to a:Archive
archive_maps = [
(self._study_cls.add_data_access, self._map_multi('./a:ArchiveSpecific/a:DefaultAccess/a:Restrictions')),
(self._study_cls.add_data_access_descriptions, self._map_multi(
'./a:ArchiveSpecific/a:DefaultAccess/a:AccessConditions'))
]
for archive_element in self._iter_archives_from_study_unit(self.study_unit_element):
self._map_to_record(study, archive_element, archive_maps,
default_language=self._study_unit_language)
for add_func, mapping in [(study.add_collection_periods, self._iter_collection_periods_as_mapped_params),
(study.add_study_uris, self._iter_study_uris_as_mapped_params),
(study.add_universes, self._iter_universes_as_mapped_params),
(study.add_study_area_countries, self._iter_study_area_countries_as_mapped_params),
(study.add_identifiers, self._iter_identifiers_as_mapped_params),
(study.add_funding_agencies, self._iter_funding_agencies_as_mapped_params),
(study.add_grant_numbers, self._iter_grant_numbers_as_mapped_params),
(study.add_related_publications, self._iter_related_publications_as_mapped_params)]:
for params in mapping():
add_func(*params.arguments, **params.keyword_arguments)
yield study
@property
def _variable_elements(self):
"""Variable elements generator.
First look for logicalproducts defined as a child of study_unit_element.
If none found, try to look them by references from all children of
root_element.
:returns: generator yielding Variable elements.
"""
logicalproducts = self.study_unit_element.findall('./l:LogicalProduct', self.NS)
if logicalproducts == []:
ref_id_els = self.study_unit_element.findall('.//s:LogicalProductReference/r:ID', self.NS)
ref_id_els += self.study_unit_element.findall('.//pd:LogicalProductReference/r:ID', self.NS)
for ref_id_el in set(ref_id_els):
logicalproduct_el = self._find_by_reference(ref_id_el, ".//l:LogicalProduct")
if logicalproduct_el is not None:
logicalproducts.append(logicalproduct_el)
for varscheme_ref_id_el in self._findall_from_elements(logicalproducts, './/l:VariableSchemeReference/r:ID'):
if varscheme_ref_id_el.text in [None, '']:
continue
ref_id = varscheme_ref_id_el.text.strip()
for variable_el in self.root_element.findall(".//l:VariableScheme[@id='{ref_id}']/l:Variable"
.format(ref_id=ref_id), self.NS):
yield variable_el
def _iter_code_elements_by_reference(self, ref_id):
for code_el in self.root_element.findall(".//l:CodeScheme[@id='{ref_id}']/l:Code"
.format(ref_id=ref_id), self.NS):
yield code_el
@property
def _variable_maps(self):
return [
(self._variable_cls.add_variable_name, self._map_single('./l:VariableName', required=True)
.set_value_conversion(as_valid_identifier)),
(self._variable_cls.add_variable_labels, self._map_multi('./r:Label')),
(self._variable_cls.add_question_identifiers,
self._map_multi('./l:QuestionReference/r:ID', localizable=False)
.set_value_conversion(as_valid_identifier))
]
def _add_codelist_codes_to_variable(self, variable, var_el):
codeschemeref_id_el = var_el.find('./l:Representation/l:CodeRepresentation/r:CodeSchemeReference/r:ID',
self.NS)
if codeschemeref_id_el is None or codeschemeref_id_el.text in [None, '']:
return
missing_values = set(var_el.find('./l:Representation/l:CodeRepresentation',
self.NS).attrib.get('missingValue', '').split())
for code_el in self._iter_code_elements_by_reference(codeschemeref_id_el.text.strip()):
value_el = code_el.find('./l:Value', self.NS)
code_value = value_el.text.strip() if value_el is not None and value_el.text not in [None, ''] else None
missing_value = code_value in missing_values
if missing_value:
missing_values.remove(code_value)
category_ref_id_el = code_el.find('./l:CategoryReference/r:ID', self.NS)
if category_ref_id_el is None or category_ref_id_el.text in [None, '']:
if code_value is not None:
variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value)
continue
category_el = self._find_by_reference(category_ref_id_el, ".//l:Category")
label_els = category_el.findall('./r:Label', self.NS) if category_el is not None else []
if label_els == []:
if code_value is not None:
variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value)
continue
for label_el in label_els:
# label_text may be None or ''
label_text = label_el.text.strip() if label_el.text is not None else None
variable.add_codelist_codes(code_value,
self._get_xmllang(label_el, default=self._study_unit_language),
label=label_text,
missing=missing_value)
for missing_value in missing_values:
variable.add_codelist_codes(missing_value, self._study_unit_language, missing=True)
@property
def variables(self):
"""Parse XML to create and populate :obj:`kuha_common.document_store.records.Variable`.
:returns: Generator to Populate Document Store Variable records.
"""
if self.study_number is None:
self._parse_study_number()
for var_el in self._variable_elements:
variable = self._variable_cls()
variable.add_study_number(self.study_number_identifier)
self._map_to_record(variable, var_el, self._variable_maps)
self._add_codelist_codes_to_variable(variable, var_el)
yield variable
@property
def _question_maps(self):
return [
(self._question_cls.add_question_identifier, self._map_single('./r:UserID', required=True).
set_value_conversion(as_valid_identifier)),
(self._question_cls.add_question_texts, self._map_multi('./dc:QuestionText').
set_value_getter(element_remove_whitespaces))
]
def _iter_question_elements_by_reference_elements(self, ref_id_elements):
for ref_id_el in ref_id_elements:
question_el = self._find_by_reference(ref_id_el, ".//dc:QuestionItem")
if question_el is not None:
yield question_el
@property
def questions(self):
"""Parse XML to create and populate :obj:`kuha_common.document_store.records.Question`.
:returns: Generator to Populate Document Store Question records.
"""
if self.study_number is None:
self._parse_study_number()
for var_el in self._variable_elements:
for question_el in self._iter_question_elements_by_reference_elements(
var_el.findall('./l:QuestionReference/r:ID', self.NS)):
question = self._question_cls()
self._map_to_record(question, question_el, self._question_maps)
question.add_study_number(self.study_number_identifier)
var_name = var_el.find('./l:VariableName', self.NS)
if var_name is not None and var_name.text not in [None, '']:
question.add_variable_name(as_valid_identifier(var_name.text.strip()))
codelist_ref_el = question_el.find('./dc:CodeDomain/r:CodeSchemeReference/r:ID', self.NS)
if codelist_ref_el is None or codelist_ref_el.text in [None, '']:
yield question
continue
for code_el in self._iter_code_elements_by_reference(codelist_ref_el.text.strip()):
self._map_to_record(question, code_el,
[(self._question_cls.add_codelist_references, self._map_multi('./l:Value'))])
yield question
@property
def _study_group_elements(self):
"""Generator iterates group elements which contain g:StudyUnit children.
:returns: Generator yielding g:Group elements.
"""
for group_el in self.root_element.findall('.//g:Group', self.NS):
if '{%s}StudyUnit' % (self.NS['g'],) in [_.tag for _ in group_el]:
yield group_el
@property
def _study_group_maps(self):
"""These are relative to g:Group"""
return [
(self._studygroup_cls.add_study_group_identifier, self._map_single('./r:UserID', required=True)
.set_value_conversion(as_valid_identifier)),
(self._studygroup_cls.add_study_group_names, self._map_multi(_XPATH_REL_CITATION_TITLE)),
(self._studygroup_cls.add_descriptions, self._map_multi('./g:Abstract/r:Content')),
(self._studygroup_cls.add_uris, self._map_single('.', 'externalReferenceDefaultURI', localizable=True))
]
@property
def study_groups(self):
"""Parse XML to create and populate :obj:`kuha_common.document_store.records.StudyGroup`.
:returns: Generator to Populate Document Store StudyGroup records.
"""
for group_el in self._study_group_elements:
study_group = self._studygroup_cls()
self._map_to_record(study_group, group_el, self._study_group_maps)
for g_study_unit_el in group_el.findall('./g:StudyUnit', self.NS):
study_number = None
if '{%s}StudyUnit' % (self.NS['s'],) in [_.tag for _ in g_study_unit_el]:
study_number = self._get_study_number_from_study_unit_element(
g_study_unit_el.find('./s:StudyUnit', self.NS),
raise_error_if_missing=False)
else:
ref_id_el = g_study_unit_el.find('./g:Reference/r:ID', self.NS)
referenced_study_unit_el = self._find_by_reference(ref_id_el, ".//s:StudyUnit")
if referenced_study_unit_el is None:
continue
study_number = self._get_study_number_from_study_unit_element(referenced_study_unit_el,
raise_error_if_missing=False)
if study_number is not None:
study_group.add_study_numbers(as_valid_identifier(study_number))
yield study_group