Source code for kuha_common.document_store.mappings.ddi

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
#
# pylint: disable=C0302
"""Mapping profiles for DDI.

.. Note:: has strict dependency to :mod:`kuha_common.document_store.records`
"""
from kuha_common.document_store.records import (
    Study,
    Variable,
    Question,
    StudyGroup
)
from kuha_common.document_store.mappings.exceptions import (
    UnknownXMLRoot,
    MissingRequiredAttribute,
    MappingError
)
from kuha_common.document_store.mappings.xmlbase import (
    MappedParams,
    XMLParserBase,
    as_valid_identifier,
    str_equals,
    fixed_value,
    element_remove_whitespaces,
    element_strip_descendant_text,
)


[docs]class DDI122RecordParser(XMLParserBase): """Parse Document Store records from DDI 1.2.2. XML. """ _expected_root_tag = 'codeBook' def __init__(self, root_element): if root_element.tag != self._expected_root_tag: raise UnknownXMLRoot(self._expected_root_tag, root_element.tag) super().__init__(root_element) def _parse_study_number(self): """Parse and store study number. """ _map = self._map_single('./stdyDscr/citation/titlStmt/IDNo', required=True) study_number = next(_map(self.root_element, self.root_language, self.NS)).get_value() self.study_number = study_number @staticmethod def _iter_params_from_sernames_serinfos(study_group_id, study_group_uri, study_group_default_lang, sername_elements, serinfo_elements): """Iterate MappedParams for Study.study_groups using serName and serInfo elements. Common method for DDI122RecordParser and DDI25RecordParser. :param study_group_id: Study Group ID :type study_group_id: str :param study_group_uri: Study Group URI :type study_group_uri: str :param study_group_default_lang: Default language of the Study Group :type study_group_default_lang: str :param sername_elements: Iterable yielding serName elements :type sername_elements: iterable :param serinfo_elements: Iterable yielding serInfo elements :type serinfo_elements: iterable :returns: generator yielding MappedParams for Study.study_groups :rtype: generator """ # {<lang>: <description>} lang_descriptions = {} for serinfo_el in serinfo_elements: lang = serinfo_el.attrib.get('{%s}lang' % (XMLParserBase.NS['xml'],), study_group_default_lang) lang_descriptions[lang] = element_remove_whitespaces(serinfo_el) for sername_el in sername_elements: params = MappedParams(study_group_id) lang = sername_el.attrib.get('{%s}lang' % (XMLParserBase.NS['xml'],), study_group_default_lang) params.set_language(lang) params.keyword_arguments.update({ Study.study_groups.attr_name.name: "".join(sername_el.itertext()), Study.study_groups.attr_uri.name: study_group_uri, # Add description if same language. Pop() so we may later add the ones that are left. Study.study_groups.attr_description.name: lang_descriptions.pop(lang, None)}) yield params for lang, description in lang_descriptions.items(): # Add descriptions which have no other values. params = MappedParams(study_group_id) params.set_language(lang) params.keyword_arguments.update({ Study.study_groups.attr_description.name: description, Study.study_groups.attr_uri.name: study_group_uri }) yield params def _iter_study_study_groups_as_mapped_params(self): """Iterate mapped study groups Use serStmt@ID to get MappedParams value for study_group. Use serName to get study_group name and serName@xml:lang to get language. Use serInfo to get study_group description and compare language in serInfo@xml:lang with serName@xml:lang, if they differ construct a new MappedParams with that language. DDI 2.5 documentation on <serStmt>: Series statement for the work at the appropriate level: marked-up document; marked-up document source; study; study description, other material; other material for study. The URI attribute is provided to point to a central Internet repository of series information. Repeat this field if the study is part of more than one series. Repetition of the internal content should be used to support multiple languages only. - https://ddialliance.org/Specification/DDI-Codebook/2.5/XMLSchema/field_level_documentation.html :returns: generator iterating mapped Study.study_groups """ default_language = self.root_language for serstmt_element in self._findall('./stdyDscr/citation/serStmt'): study_group_id = serstmt_element.get('ID', None) study_group_uri = serstmt_element.get('URI', None) study_group_default_lang = serstmt_element.attrib.get('{%s}lang' % (self.NS['xml'],), default_language) for params in self._iter_params_from_sernames_serinfos( study_group_id, study_group_uri, study_group_default_lang, self._findall('./serName', serstmt_element), self._findall('./serInfo', serstmt_element)): yield params def _iter_related_publications_as_mapped_params(self): for relpubl_element in self._findall('./stdyDscr/othrStdyMat/relPubl'): description = element_strip_descendant_text(relpubl_element) lang = relpubl_element.attrib.get('{%s}lang' % (self.NS['xml'],), self.root_language) citation_elements = self._findall('./citation', relpubl_element) if citation_elements == []: params = MappedParams(None) params.set_language(lang) params.keyword_arguments[Study.related_publications.attr_description.name] = description yield params continue for citation_element in citation_elements: title_element = self._find('./titlStmt/titl', citation_element) title = ''.join(title_element.itertext()) params = MappedParams(title) params.set_language(lang) # distStmt may actually have multiple distDate-elements. Study model does not # support repetition inside contained element, so we can only support one # distDate for each relpubl. self._get_attr_and_set_param(params, Study.related_publications.attr_distribution_date.name, self._find('./distStmt/distDate', citation_element), 'date') # citation may have multiple holdings-elements. Study models does not support # repetition inside contained element, so we can only support one uri for # each relpubl self._get_attr_and_set_param(params, Study.related_publications.attr_uri.name, self._find('./holdings', citation_element), 'URI') params.keyword_arguments[Study.related_publications.attr_description.name] = description yield params @property def _study_maps(self): return [ (Study.add_identifiers, self._map_multi('./stdyDscr/citation/titlStmt/IDNo'). add_attribute(Study.identifiers.attr_agency.name, self._map_single('.', 'agency'))), (Study.add_study_titles, self._map_multi('./stdyDscr/citation/titlStmt/titl')), (Study.add_document_titles, self._map_multi('./docDscr/citation/titlStmt/titl')), (Study.add_parallel_titles, self._map_multi('./stdyDscr/citation/titlStmt/parTitl')), (Study.add_principal_investigators, self._map_multi('./stdyDscr/citation/rspStmt/AuthEnty'). add_attribute(Study.principal_investigators.attr_organization.name, self._map_single('.', 'affiliation'))), (Study.add_publishers, self._map_multi('./docDscr/citation/prodStmt/producer'). add_attribute(Study.publishers.attr_abbreviation.name, self._map_single('.', 'abbr'))), (Study.add_data_collection_copyrights, self._map_multi('./stdyDscr/citation/prodStmt/copyright')), (Study.add_document_uris, self._map_multi('./docDscr/citation/holdings', 'URI'). add_attribute(Study.document_uris.attr_location.name, self._map_single('.', 'location')). add_attribute(Study.document_uris.attr_description.name, self._map_single('.'))), (Study.add_study_uris, self._map_multi('./stdyDscr/citation/holdings', 'URI'). add_attribute(Study.document_uris.attr_location.name, self._map_single('.', 'location')). add_attribute(Study.document_uris.attr_description.name, self._map_single('.'))), (Study.add_distributors, self._map_multi('./stdyDscr/citation/distStmt/distrbtr'). add_attribute(Study.distributors.attr_abbreviation.name, self._map_single('.', 'abbr')). add_attribute(Study.distributors.attr_uri.name, self._map_single('.', 'URI'))), (Study.add_publication_dates, self._map_multi('./stdyDscr/citation/verStmt/version', 'date')), (Study.add_publication_years, self._map_multi('./stdyDscr/citation/prodStmt/prodDate'). add_attribute(Study.publication_years.attr_distribution_date.name, self._map_single('./stdyDscr/citation/distStmt/distDate', 'date'), False)), (Study.add_abstract, self._map_multi('./stdyDscr/stdyInfo/abstract'). set_value_getter(element_remove_whitespaces)), (Study.add_classifications, self._map_multi('./stdyDscr/stdyInfo/subject/topcClas', 'ID'). add_attribute(Study.classifications.attr_system_name.name, self._map_single('.', 'vocab')). add_attribute(Study.classifications.attr_uri.name, self._map_single('.', 'vocabURI')). add_attribute(Study.classifications.attr_description.name, self._map_single('.'))), (Study.add_keywords, self._map_multi('./stdyDscr/stdyInfo/subject/keyword', 'ID'). add_attribute(Study.keywords.attr_system_name.name, self._map_single('.', 'vocab')). add_attribute(Study.keywords.attr_uri.name, self._map_single('.', 'vocabURI')). add_attribute(Study.keywords.attr_description.name, self._map_single('.'))), (Study.add_collection_periods, self._map_multi('./stdyDscr/stdyInfo/sumDscr/collDate', 'date'). add_attribute(Study.collection_periods.attr_event.name, self._map_single('.', 'event'))), (Study.add_study_area_countries, self._map_multi('./stdyDscr/stdyInfo/sumDscr/nation'). add_attribute(Study.study_area_countries.attr_abbreviation.name, self._map_single('.', 'abbr'))), (Study.add_geographic_coverages, self._map_multi('./stdyDscr/stdyInfo/sumDscr/geogCover')), (Study.add_analysis_units, self._map_multi('./stdyDscr/stdyInfo/sumDscr/anlyUnit/concept'). add_attribute(Study.analysis_units.attr_description.name, self._map_single('/..'). set_value_getter(element_strip_descendant_text)). add_attribute(Study.analysis_units.attr_system_name.name, self._map_single('.', 'vocab')). add_attribute(Study.analysis_units.attr_uri.name, self._map_single('.', 'vocabUri'))), (Study.add_universes, self._map_multi('./stdyDscr/stdyInfo/sumDscr/universe'). add_attribute(Study.universes.attr_included.name, self._map_single('.', 'clusion'). set_value_conversion(str_equals('I', default=True)))), (Study.add_data_kinds, self._map_multi('./stdyDscr/stdyInfo/sumDscr/dataKind')), (Study.add_time_methods, self._map_multi('./stdyDscr/method/dataColl/timeMeth/concept'). add_attribute(Study.time_methods.attr_description.name, self._map_single('/..'). set_value_getter(element_strip_descendant_text)). add_attribute(Study.time_methods.attr_system_name.name, self._map_single('.', 'vocab')). add_attribute(Study.time_methods.attr_uri.name, self._map_single('.', 'vocabURI'))), (Study.add_sampling_procedures, self._map_multi('./stdyDscr/method/dataColl/sampProc/concept'). add_attribute(Study.sampling_procedures.attr_description.name, self._map_single('/..'). set_value_getter(element_strip_descendant_text)). add_attribute(Study.sampling_procedures.attr_system_name.name, self._map_single('.', 'vocab')). add_attribute(Study.sampling_procedures.attr_uri.name, self._map_single('.', 'vocabURI'))), (Study.add_collection_modes, self._map_multi('./stdyDscr/method/dataColl/collMode/concept'). add_attribute(Study.collection_modes.attr_description.name, self._map_single('/..'). set_value_getter(element_strip_descendant_text)). add_attribute(Study.collection_modes.attr_system_name.name, self._map_single('.', 'vocab')). add_attribute(Study.collection_modes.attr_uri.name, self._map_single('.', 'vocab'))), (Study.add_data_access, self._map_multi('./stdyDscr/dataAccs/useStmt/restrctn')), (Study.add_citation_requirements, self._map_multi('./stdyDscr/dataAccs/useStmt/citReq')), (Study.add_deposit_requirements, self._map_multi('./stdyDscr/dataAccs/useStmt/deposReq')), (Study.add_data_access_descriptions, self._map_multi('./stdyDscr/dataAccs/useStmt/conditions')), (Study.add_file_names, self._map_multi('./fileDscr/fileTxt/fileName')), (Study.add_instruments, self._map_multi('./stdyDscr/othrStdyMat/relMat/citation/titlStmt/IDNo'). add_attribute(Study.instruments.attr_instrument_name.name, self._map_single('/../titl'))), (Study.add_copyrights, self._map_multi('./docDscr/citation/prodStmt/copyright')) ] @property def studies(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Study`. :returns: Generator to Populate Document Store Study record. """ if self.study_number is None: self._parse_study_number() study = Study() study.add_study_number(self.study_number_identifier) self._map_to_record(study, self.root_element, self._study_maps) for mapped_study_group in self._iter_study_study_groups_as_mapped_params(): study.add_study_groups(*mapped_study_group.arguments, **mapped_study_group.keyword_arguments) for mapped_related_publications in self._iter_related_publications_as_mapped_params(): study.add_related_publications(*mapped_related_publications.arguments, **mapped_related_publications.keyword_arguments) yield study @property def _variable_maps(self): return [ (Variable.add_variable_name, self._map_single('.', 'name', required=True). set_value_conversion(as_valid_identifier)), (Variable.add_question_identifiers, self._map_multi('./qstn', 'ID', localizable=False) .set_value_conversion(as_valid_identifier)), (Variable.add_variable_labels, self._map_multi('./labl')), (Variable.add_codelist_codes, self._map_multi('./catgry', 'ID'). add_attribute(Variable.codelist_codes.attr_label.name, self._map_multi('./labl'), provides_main_lang=True). add_attribute(Variable.codelist_codes.attr_missing.name, self._map_single('.', 'missing'). set_value_conversion(str_equals('Y', False)))), ] @property def variables(self): """Parse XML to create and populate multiple :obj:`kuha_common.document_store.records.Variable` instances. :returns: Generator to populate multiple Document Store Variable records. """ if self.study_number is None: self._parse_study_number() maps = self._variable_maps for var_element in self._findall('./dataDscr/var'): variable = Variable() variable.add_study_number(self.study_number_identifier) self._map_to_record(variable, var_element, maps) yield variable @property def _question_maps(self): return [ (Question.add_question_identifier, self._map_single('.', 'ID', required=True) .set_value_conversion(as_valid_identifier)), (Question.add_question_texts, self._map_multi('./qstnLit')) ] @property def questions(self): """Parse XML to create and populate multiple :obj:`kuha_common.document_store.records.Question` instances. :returns: Generator to populate multiple Document Store Question records. """ if self.study_number is None: self._parse_study_number() research_instruments = [] resinstru_map = self._map_multi('./stdyDscr/method/dataColl/resInstru')\ .set_value_getter(element_remove_whitespaces) for instru_params in resinstru_map(self.root_element, self.root_language, self.NS): research_instruments.append(instru_params.arguments) maps = self._question_maps for var_element in self._findall('./dataDscr/var'): variable_name = as_valid_identifier(var_element.get('name')) codes = [] for code_params in self._map_multi('./code')(var_element, self.root_language, self.NS): codes.append(code_params.arguments) for qstn_element in self._findall('./qstn', var_element): question = Question() question.add_study_number(self.study_number_identifier) if variable_name: question.add_variable_name(variable_name) for instru in research_instruments: question.add_research_instruments(*instru) for _code in codes: question.add_codelist_references(*_code) self._map_to_record(question, qstn_element, maps) yield question @property def _study_group_maps(self): return [ (StudyGroup.add_study_group_identifier, self._map_single('.', 'ID', required=True) .set_value_conversion(as_valid_identifier)), (StudyGroup.add_uris, self._map_single('.', 'URI', localizable=True)), (StudyGroup.add_study_group_names, self._map_multi('./serName')), (StudyGroup.add_descriptions, self._map_multi('./serInfo') .set_value_getter(element_remove_whitespaces)) ] @property def study_groups(self): """Parse XML to create and populate multiple :obj:`kuha_common.document_store.records.StudyGroup` instances. :returns: Generator to populate multiple Document Store StudyGroup records. """ study_groups = [] def update_or_append_study_group(study_group): for primary_study_group in study_groups: if primary_study_group.updates(study_group): return study_groups.append(study_group) if self.study_number is None: self._parse_study_number() maps = self._study_group_maps for serstmt_element in self._findall('./stdyDscr/citation/serStmt'): study_group = StudyGroup() study_group.add_study_numbers(self.study_number_identifier) self._map_to_record(study_group, serstmt_element, maps) update_or_append_study_group(study_group) for study_group in study_groups: yield study_group
[docs]class DDI25RecordParser(DDI122RecordParser): """Parse Document Store records from DDI 2.5 XML. """ #: XML namespaces NS = {'xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'ddi': 'ddi:codebook:2_5', 'xml': 'http://www.w3.org/XML/1998/namespace'} _expected_root_tag = '{ddi:codebook:2_5}codeBook' @staticmethod def _prepend_xpath_steps(xpath): prepended = '' for index, step in enumerate(xpath.split('/')): if step in ('', '.', '..'): prepended += '%s' % (step,) if index == 0 else '/%s' % (step,) continue prepended += 'ddi:%s' % (step,) if index == 0 else '/ddi:%s' % (step,) return prepended def _map_single(self, xpath, from_attribute=None, required=False, localizable=False): return super()._map_single(self._prepend_xpath_steps(xpath), from_attribute=from_attribute, required=required, localizable=localizable) def _map_multi(self, xpath, from_attribute=None, localizable=True): return super()._map_multi(self._prepend_xpath_steps(xpath), from_attribute=from_attribute, localizable=localizable) def _find(self, xpath, element=None): """Every xpath step gets prepended with ddi namespace""" return super()._find(self._prepend_xpath_steps(xpath), element) def _findall(self, xpath, element=None): """Every xpath step gets prepended with ddi namespace""" return super()._findall(self._prepend_xpath_steps(xpath), element)
[docs]class DDI31RecordParser(XMLParserBase): """Parse Document Store records from DDI 3.1. XML Check the root element. Expects either ddi:DDIInstance or s:StudyUnit. Currently supports only single s:StudyUnit element within the root. :param root_element: XML root element. :type root_element: :obj:`xml.etree.ElementTree.Element` :raises: :exc:`UnknownXMLRoot` for unexpected root element. :raises: :exc:`MappingError` if root contains more or less that exactly one s:StudyUnit child. """ #: XML namespaces NS = {'xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'xml': 'http://www.w3.org/XML/1998/namespace', 'xhtml': 'http://www.w3.org/1999/xhtml', 'ddi': 'ddi:instance:3_1', 's': 'ddi:studyunit:3_1', 'pd': 'ddi:physicaldataproduct:3_1', 'pi': 'ddi:physicalinstance:3_1', 'c': 'ddi:conceptualcomponent:3_1', 'l': 'ddi:logicalproduct:3_1', 'r': 'ddi:reusable:3_1', 'g': 'ddi:group:3_1', 'dc': 'ddi:datacollection:3_1', 'a': 'ddi:archive:3_1'} def __init__(self, root_element): expected_ddiinstance_root = '{%s}DDIInstance' % (self.NS['ddi'],) expected_studyunit_root = '{%s}StudyUnit' % (self.NS['s'],) if root_element.tag == expected_ddiinstance_root: study_unit_elements = list(root_element.iter('{%s}StudyUnit' % (self.NS['s'],))) study_unit_count = len(study_unit_elements) if study_unit_count > 1: # Currently supports only a single s:StudyUnit in xml metadata. raise MappingError("Unable to parse multiple StudyUnit elements") if study_unit_count < 1: raise MappingError("Unable to find StudyUnit element") self.study_unit_element = study_unit_elements.pop() elif root_element.tag == expected_studyunit_root: self.study_unit_element = root_element else: raise UnknownXMLRoot(' or '.join([expected_ddiinstance_root, expected_studyunit_root]), root_element.tag) super().__init__(root_element) @property def _study_unit_language(self): """Get language of StudyUnit element. Returns :attr:`root_language` if StudyUnit does not declare a language. :returns: Language :rtype: str """ return self.study_unit_element.attrib.get('{%s}lang' % (self.NS['xml'],), self.root_language) def _get_study_number_from_study_unit_element(self, study_unit_element, raise_error_if_missing=True): archive_els = list(self._iter_archives_from_study_unit(study_unit_element)) for candidate in self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber'): if candidate is not None and candidate.text not in ['', None]: return candidate.text for candidate in self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber'): if candidate is not None and candidate.text not in ['', None]: return candidate.text candidate = study_unit_element.find('./r:UserID', self.NS) if candidate is not None and candidate.text not in ['', None]: return candidate.text if raise_error_if_missing: raise MissingRequiredAttribute('./r:UserID', './a:Archive/a:ArchiveSpecific/a:Collection/a:CallNumber', './a:Archive/a:ArchiveSpecific/a:Item/a:CallNumber', msg='Unable to find study number from %s, %s or %s') return None def _get_spatial_coverage_from_study_unit(self, study_unit_element): """Get SpatialCoverage element from StudyUnit element. The SpatialCoverage element may be an inline child element of StudyUnit or referenced by SpatialCoverageReference. :param study_unit_element: s:StudyUnit element :type study_unit_element: :obj:`xml.etree.ElementTree.Element` :returns: SpatialCoverage of the StudyUnit. :rtype: :obj:`xml.etree.ElementTree.Element` or None """ ref_id_el = study_unit_element.find('./r:Coverage/r:SpatialCoverageReference/r:ID', self.NS) if ref_id_el is not None and ref_id_el.text not in ['', None]: return self.root_element.find('.//r:SpatialCoverage[@id="{ref_id}"]' .format(ref_id=ref_id_el.text.strip()), self.NS) return study_unit_element.find('./r:Coverage/r:SpatialCoverage', self.NS) def _iter_data_collections_from_study_unit(self, study_unit_element): for ref_id_el in study_unit_element.findall('./s:DataCollectionReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue yield self.root_element.find('.//dc:DataCollection[@id="{ref_id}"]' .format(ref_id=ref_id_el.text.strip()), self.NS) for data_coll_el in study_unit_element.findall('./dc:DataCollection', self.NS): yield data_coll_el def _iter_physical_instances_from_study_unit(self, study_unit_element): for ref_id_el in study_unit_element.findall('./s:PhysicalInstanceReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue yield self.root_element.find('.//pi:PhysicalInstance[@id="{ref_id}"]' .format(ref_id=ref_id_el.text.strip()), self.NS) for physical_instance_el in study_unit_element.findall('./pi:PhysicalInstance', self.NS): yield physical_instance_el def _iter_archives_from_study_unit(self, study_unit_element): archive_el = study_unit_element.find('./a:Archive', self.NS) if archive_el is not None: yield archive_el for ref_id_el in study_unit_element.findall('./s:ArchiveReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue yield self.root_element.find('.//a:Archive[@id="{ref_id}"]' .format(ref_id=ref_id_el.text.strip()), self.NS) def _iter_collection_periods_as_mapped_params(self): """Generate collection periods as :obj:`MappedParams` Returns a generator which yields :obj:`MappedParams` instances containing collection periods. .. note:: DDI 3.1. supports only single DataCollectionDate for each CollectionEvent. That is not enforced here. :returns: Generator yielding collection periods. """ data_colls = self._iter_data_collections_from_study_unit(self.study_unit_element) for dc_date in self._findall_from_elements(data_colls, './dc:CollectionEvent/dc:DataCollectionDate'): simple_date = dc_date.find('./r:SimpleDate', self.NS) if simple_date is not None: params = MappedParams(simple_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({Study.collection_periods.attr_event.name: 'single'}) yield params continue # DataCollectionDate can contain either SimpleDate or StartDate and EndDate start_date = dc_date.find('./r:StartDate', self.NS) if start_date is not None: params = MappedParams(start_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({Study.collection_periods.attr_event.name: 'start'}) yield params end_date = dc_date.find('./r:EndDate', self.NS) if end_date is not None: # It is a violation of the DDI31 standard to have an EndDate without # a StartDate. Kuha won't mind however. params = MappedParams(end_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({Study.collection_periods.attr_event.name: 'end'}) yield params def _iter_study_uris_as_mapped_params(self): """Generate Study Uris as :obj:`MappedParams` :returns: Generator yielding :obj:`MappedParams` """ archive_els = self._iter_archives_from_study_unit(self.study_unit_element) for archive_spec_el in self._findall_from_elements(archive_els, './a:ArchiveSpecific'): # First get the actual URI. uri_el = archive_spec_el.find("./a:Collection/[a:CallNumber='{}']/a:URI". format(self.study_number), self.NS) if uri_el is None: uri_el = archive_spec_el.find("./a:Item/[a:CallNumber='{}']/a:URI". format(self.study_number), self.NS) if uri_el is not None: uri = uri_el.text uri_lang = uri_el.attrib.get('{%s}lang' % (self.NS['xml'],), self._study_unit_language) else: uri = None org_reference_id_el = archive_spec_el.find('./a:ArchiveOrganizationReference/r:ID', self.NS) if org_reference_id_el is None: if uri_el is None: continue param = MappedParams(uri) param.set_language(uri_lang) yield param continue # Then get the organization if referenced. xpath = ".//a:Organization[@id='{}']".format(org_reference_id_el.text.strip()) org_el = self.root_element.find(xpath, self.NS) for org_name_el in org_el.findall('./a:OrganizationName', self.NS): param = MappedParams(uri) param.set_language(org_name_el.attrib.get('{%s}lang' % (self.NS['xml'],), self._study_unit_language)) param.keyword_arguments.update({Study.document_uris.attr_location.name: org_name_el.text}) # Try to find localized element. org_desc_el = org_el.find("./r:Description[@xml:lang='{}']".format(param.get_language()), self.NS) if org_desc_el is not None: param.keyword_arguments.update({Study.document_uris.attr_description.name: org_desc_el.text}) elif param.get_language() == self._study_unit_language: # If language equals the language of the StudyUnit, # accept Description without locale. org_desc_el = org_el.find('./r:Description', self.NS) if org_desc_el is not None: param.keyword_arguments.update({Study.document_uris.attr_description.name: org_desc_el.text}) yield param def _iter_universes_as_mapped_params(self): """Generate Universes as :obj:`MappedParams`. :returns: Generator yielding :obj:`MappedParams` """ inc_to_bool = str_equals('true', True) for ref_id_el in self.study_unit_element.findall('./r:UniverseReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue universe_el = self.root_element.find(".//c:Universe[@id='{ref_id}']". format(ref_id=ref_id_el.text.strip()), self.NS) if universe_el is None: continue included = inc_to_bool(universe_el.attrib.get('isInclusive')) for desc in universe_el.findall('./c:HumanReadable', self.NS): param = MappedParams(desc.text) param.set_language(desc.attrib.get('{%s}lang' % (self.NS['xml'],), self._study_unit_language)) param.keyword_arguments.update({Study.universes.attr_included.name: included}) yield param def _iter_identifiers_as_mapped_params(self): """Generate Identifiers as :obj:`MappedParams` Will not discard duplicates. :returns: Generator yielding :obj:`MappedParams` """ # Cast to list because iterabing multiple times througt archive_els = list(self._iter_archives_from_study_unit(self.study_unit_element)) def _param_from_els(els): for element in els: if element.text in [None, '']: continue param = MappedParams(element.text) param.set_language(element.attrib.get('{%s}lang' % (self.NS['xml'],), self._study_unit_language)) yield param for param in _param_from_els( self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber')): yield param for param in _param_from_els( self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber')): yield param for param in _param_from_els( self.study_unit_element.findall('./r:Citation/r:InternationalIdentifier', self.NS)): yield param def _iter_mapped_params_from_geography_element(self, *geography_els): geography_els = filter(lambda x: x is not None, geography_els) for name_el in self._findall_from_elements(geography_els, './r:Level/r:Name'): if name_el.text in [None, '']: continue params = MappedParams("".join(name_el.itertext())) params.set_language(name_el.attrib.get('{%s}lang' % (self.NS['xml'],), self._study_unit_language)) yield params def _iter_mapped_params_from_geographyvalue_elements(self, geogvalue_els): for geogvalue_el in geogvalue_els: code_el = geogvalue_el.find('./r:GeographyCode/r:Value', self.NS) code_value = code_el.text if code_el is not None and code_el.text not in [None, ''] else None for name_el in geogvalue_el.findall('./r:GeographyName', self.NS): params = MappedParams(''.join(name_el.itertext())) params.set_language(name_el.attrib.get('{%s}lang' % (self.NS['xml'],), self._study_unit_language)) if code_value is not None: params.keyword_arguments.update( {Study.study_area_countries.attr_abbreviation.name: code_value}) yield params def _iter_study_area_countries_as_mapped_params(self): spatial_coverage_el = self._get_spatial_coverage_from_study_unit(self.study_unit_element) if spatial_coverage_el is None: return # TopLevelReference & LowestLevelReference are mandatory children of SpatialCoverage for parentlevel_ref_el in (spatial_coverage_el.find('./r:TopLevelReference', self.NS), spatial_coverage_el.find('./r:LowestLevelReference', self.NS)): ref_id_el = parentlevel_ref_el.find('./r:LevelReference/r:ID', self.NS) if ref_id_el is None or ref_id_el.text in [None, '']: level_name_el = parentlevel_ref_el.find('./r:LevelName', self.NS) if level_name_el is None or level_name_el.text in [None, '']: continue params = MappedParams("".join(level_name_el.itertext())) params.set_language(self._study_unit_language) yield params continue geostruct_el = self.root_element.find('.//r:GeographicStructure[@id="{ref_id}"]' .format(ref_id=ref_id_el.text.strip()), self.NS) if geostruct_el is None: continue # Geography elements inline. for params in self._iter_mapped_params_from_geography_element( *geostruct_el.findall('./r:Geography', self.NS)): yield params # Geography elements by reference. for ref_id_el in geostruct_el.findall('./r:GeographyReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue for params in self._iter_mapped_params_from_geography_element(self.root_element.find( './/r:Geography[@id="{ref_id}"]'.format(ref_id=ref_id_el.text.strip()), self.NS)): yield params for ref_id_el in spatial_coverage_el.findall('./r:GeographicLocationReference/r:ID', self.NS): if ref_id_el.text in [None, '']: continue for params in self._iter_mapped_params_from_geographyvalue_elements(self.root_element.findall( './/r:GeographicLocation[@id="{ref_id}"]/r:Values/r:GeographyValue' .format(ref_id=ref_id_el.text.strip()), self.NS)): yield params @property def _study_maps(self): return [ (Study.add_study_titles, self._map_multi('./r:Citation/r:Title')), (Study.add_abstract, self._map_multi('./s:Abstract/r:Content'). set_value_getter(element_remove_whitespaces)), (Study.add_principal_investigators, self._map_multi('./r:Citation/r:Creator'). add_attribute(Study.principal_investigators.attr_organization.name, self._map_single('.', 'affiliation'))), (Study.add_publishers, self._map_multi('./r:Citation/r:Publisher')), (Study.add_publication_years, self._map_multi('./r:Citation/r:PublicationDate/r:SimpleDate'). add_attribute(Study.publication_years.attr_distribution_date.name, self._map_single('.'))), (Study.add_classifications, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Subject'). set_value_conversion(fixed_value(None)). # DDI31 contains no @id. # DDI31 contains no codelistname # DDI31 contains no codelisturn add_attribute(Study.classifications.attr_system_name.name, self._map_single('.', 'codeListID')). add_attribute(Study.classifications.attr_description.name, self._map_single('.'))), (Study.add_keywords, self._map_multi('./r:Coverage/r:TopicalCoverage/r:Keyword'). set_value_conversion(fixed_value(None)). # DDI31 contains no @id. # DDI31 contains no codelistname # DDI31 contains no codelisturn add_attribute(Study.keywords.attr_system_name.name, self._map_single('.', 'codeListID')). add_attribute(Study.keywords.attr_description.name, self._map_single('.'))), (Study.add_analysis_units, self._map_multi('./r:AnalysisUnit'). add_attribute(Study.analysis_units.attr_system_name.name, self._map_single('.', 'codeListID')). add_attribute(Study.analysis_units.attr_uri.name, self._map_single('.', 'codeListURN'))) ] @property def studies(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Study`. :returns: Generator to Populate Document Store Study record. """ if self.study_number is None: self.study_number = self._get_study_number_from_study_unit_element(self.study_unit_element) study = Study() study.add_study_number(self.study_number_identifier) if self.root_element.tag == '{%s}DDIInstance' % (self.NS['ddi'],): # DDIInstance must be root of the document. If there is a DDIInstance, map its title into # Study.document_titles. # DDIInstance may only have a single r:Citation and r:Citation may only have a single # r:Title # Get r:Citation/r:AlternateTitle too. self._map_to_record(study, self.root_element, [ (Study.add_document_titles, self._map_single('./r:Citation/r:Title', localizable=True)), (Study.add_document_titles, self._map_multi('./r:Citation/r:AlternateTitle')) ]) self._map_to_record(study, self.study_unit_element, self._study_maps, default_language=self._study_unit_language) # DDI3 has references and is much too complex to use XMLMapper for all elements. # Use custom mappings for certain elements. # These are relative to dc:DataCollection data_coll_maps = [ (Study.add_time_methods, self._map_multi('./dc:Methodology/dc:TimeMethod'). set_value_getter(self.child_text('r:UserID')). add_attribute(Study.time_methods.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True). add_attribute(Study.time_methods.attr_system_name.name, self._map_single('./r:UserID', 'type'))), (Study.add_sampling_procedures, self._map_multi('./dc:Methodology/dc:SamplingProcedure'). set_value_getter(self.child_text('r:UserID')). add_attribute(Study.sampling_procedures.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True). add_attribute(Study.sampling_procedures.attr_system_name.name, self._map_single('./r:UserID', 'type'))), (Study.add_collection_modes, self._map_multi('./dc:CollectionEvent/dc:ModeOfCollection'). set_value_getter(self.child_text('r:UserID')). add_attribute(Study.collection_modes.attr_description.name, self._map_single('./r:Content', localizable=True), provides_main_lang=True). add_attribute(Study.collection_modes.attr_system_name.name, self._map_single('./r:UserID', 'type'))), ] for data_collection_element in self._iter_data_collections_from_study_unit(self.study_unit_element): self._map_to_record(study, data_collection_element, data_coll_maps, default_language=self._study_unit_language) # These are relative to pi:PhysicalInstance physical_instance_maps = [ (Study.add_file_names, self._map_multi('./pi:DataFileIdentification/pi:URI')) ] for physical_instance_element in self._iter_physical_instances_from_study_unit(self.study_unit_element): self._map_to_record(study, physical_instance_element, physical_instance_maps, default_language=self._study_unit_language) # These are relative to a:Archive archive_maps = [ (Study.add_data_access, self._map_multi('./a:ArchiveSpecific/a:DefaultAccess/a:Restrictions')), (Study.add_data_access_descriptions, self._map_multi( './a:ArchiveSpecific/a:DefaultAccess/a:AccessConditions')) ] for archive_element in self._iter_archives_from_study_unit(self.study_unit_element): self._map_to_record(study, archive_element, archive_maps, default_language=self._study_unit_language) for add_func, mapping in [(study.add_collection_periods, self._iter_collection_periods_as_mapped_params), (study.add_study_uris, self._iter_study_uris_as_mapped_params), (study.add_universes, self._iter_universes_as_mapped_params), (study.add_study_area_countries, self._iter_study_area_countries_as_mapped_params), (study.add_identifiers, self._iter_identifiers_as_mapped_params)]: for params in mapping(): add_func(*params.arguments, **params.keyword_arguments) yield study @property def _variable_elements(self): """Variable elements generator. First look for logicalproducts defined as a child of study_unit_element. If none found, try to look them by references from all children of root_element. :returns: generator yielding Variable elements. """ logicalproducts = self.study_unit_element.findall('./l:LogicalProduct', self.NS) if logicalproducts == []: ref_id_els = self.study_unit_element.findall('.//s:LogicalProductReference/r:ID', self.NS) ref_id_els += self.study_unit_element.findall('.//pd:LogicalProductReference/r:ID', self.NS) for ref_id_el in set(ref_id_els): if ref_id_el.text not in [None, '']: ref_id = ref_id_el.text.strip() logicalproduct_el = self.root_element.find(".//l:LogicalProduct[@id='{ref_id}']" .format(ref_id=ref_id), self.NS) if logicalproduct_el is not None: logicalproducts.append(logicalproduct_el) for varscheme_ref_id_el in self._findall_from_elements(logicalproducts, './/l:VariableSchemeReference/r:ID'): if varscheme_ref_id_el.text in [None, '']: continue ref_id = varscheme_ref_id_el.text.strip() for variable_el in self.root_element.findall(".//l:VariableScheme[@id='{ref_id}']/l:Variable" .format(ref_id=ref_id), self.NS): yield variable_el def _iter_code_elements_by_reference(self, ref_id): for code_el in self.root_element.findall(".//l:CodeScheme[@id='{ref_id}']/l:Code" .format(ref_id=ref_id), self.NS): yield code_el @property def _variable_maps(self): return [ (Variable.add_variable_name, self._map_single('./l:VariableName', required=True) .set_value_conversion(as_valid_identifier)), (Variable.add_variable_labels, self._map_multi('./r:Label')), (Variable.add_question_identifiers, self._map_multi('./l:QuestionReference/r:ID', localizable=False) .set_value_conversion(as_valid_identifier)) ] def _add_codelist_codes_to_variable(self, variable, var_el): codeschemeref_id_el = var_el.find('./l:Representation/l:CodeRepresentation/r:CodeSchemeReference/r:ID', self.NS) if codeschemeref_id_el is None or codeschemeref_id_el.text in [None, '']: return missing_values = set(var_el.find('./l:Representation/l:CodeRepresentation', self.NS).attrib.get('missingValue', '').split()) for code_el in self._iter_code_elements_by_reference(codeschemeref_id_el.text.strip()): value_el = code_el.find('./l:Value', self.NS) code_value = value_el.text.strip() if value_el is not None and value_el.text not in [None, ''] else None missing_value = code_value in missing_values if missing_value: missing_values.remove(code_value) category_ref_id_el = code_el.find('./l:CategoryReference/r:ID', self.NS) if category_ref_id_el is None or category_ref_id_el.text in [None, '']: if code_value is not None: variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value) continue category_el = self.root_element.find(".//l:Category[@id='{ref_id}']" .format(ref_id=category_ref_id_el.text.strip()), self.NS) label_els = category_el.findall('./r:Label', self.NS) if category_el is not None else [] if label_els == []: if code_value is not None: variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value) continue for label_el in label_els: # label_text may be None or '' label_text = label_el.text.strip() if label_el.text is not None else None variable.add_codelist_codes(code_value, label_el.attrib.get('{%s}lang' % (self.NS['xml'],), self._study_unit_language), label=label_text, missing=missing_value) for missing_value in missing_values: variable.add_codelist_codes(missing_value, self._study_unit_language, missing=True) @property def variables(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Variable`. :returns: Generator to Populate Document Store Variable records. """ if self.study_number is None: self.study_number = self._get_study_number_from_study_unit_element(self.study_unit_element) for var_el in self._variable_elements: variable = Variable() variable.add_study_number(self.study_number_identifier) self._map_to_record(variable, var_el, self._variable_maps) self._add_codelist_codes_to_variable(variable, var_el) yield variable @property def _question_maps(self): return [ (Question.add_question_identifier, self._map_single('./r:UserID', required=True). set_value_conversion(as_valid_identifier)), (Question.add_question_texts, self._map_multi('./dc:QuestionText'). set_value_getter(element_remove_whitespaces)) ] def _iter_question_elements_by_reference_elements(self, ref_id_elements): for ref_id_el in ref_id_elements: if ref_id_el.text in [None, '']: continue question_el = self.root_element.find(".//dc:QuestionItem[@id='{ref_id}']" .format(ref_id=ref_id_el.text.strip()), self.NS) if question_el is not None: yield question_el @property def questions(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Question`. :returns: Generator to Populate Document Store Question records. """ if self.study_number is None: self.study_number = self._get_study_number_from_study_unit_element(self.study_unit_element) for var_el in self._variable_elements: for question_el in self._iter_question_elements_by_reference_elements( var_el.findall('./l:QuestionReference/r:ID', self.NS)): question = Question() self._map_to_record(question, question_el, self._question_maps) question.add_study_number(self.study_number_identifier) var_name = var_el.find('./l:VariableName', self.NS) if var_name is not None and var_name.text not in [None, '']: question.add_variable_name(as_valid_identifier(var_name.text.strip())) codelist_ref_el = question_el.find('./dc:CodeDomain/r:CodeSchemeReference/r:ID', self.NS) if codelist_ref_el is None or codelist_ref_el.text in [None, '']: yield question continue for code_el in self._iter_code_elements_by_reference(codelist_ref_el.text.strip()): self._map_to_record(question, code_el, [(Question.add_codelist_references, self._map_multi('./l:Value'))]) yield question @property def _study_group_elements(self): """Generator iterates group elements which contain g:StudyUnit children. :returns: Generator yielding g:Group elements. """ for group_el in self.root_element.findall('.//g:Group', self.NS): if '{%s}StudyUnit' % (self.NS['g'],) in [_.tag for _ in group_el]: yield group_el @property def _study_group_maps(self): """These are relative to g:Group""" return [ (StudyGroup.add_study_group_identifier, self._map_single('./r:UserID', required=True) .set_value_conversion(as_valid_identifier)), (StudyGroup.add_study_group_names, self._map_multi('./r:Citation/r:Title')), (StudyGroup.add_descriptions, self._map_multi('./g:Abstract/r:Content')), (StudyGroup.add_uris, self._map_single('.', 'externalReferenceDefaultURI', localizable=True)) ] @property def study_groups(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.StudyGroup`. :returns: Generator to Populate Document Store StudyGroup records. """ for group_el in self._study_group_elements: study_group = StudyGroup() self._map_to_record(study_group, group_el, self._study_group_maps) for g_study_unit_el in group_el.findall('./g:StudyUnit', self.NS): study_number = None if '{%s}StudyUnit' % (self.NS['s'],) in [_.tag for _ in g_study_unit_el]: study_number = self._get_study_number_from_study_unit_element( g_study_unit_el.find('./s:StudyUnit', self.NS), raise_error_if_missing=False) else: ref_id_el = g_study_unit_el.find('./g:Reference/r:ID', self.NS) if ref_id_el is None or ref_id_el.text in [None, '']: continue referenced_study_unit_el = self.root_element.find(".//s:StudyUnit[@id='{ref_id}']" .format(ref_id=ref_id_el.text.strip()), self.NS) if referenced_study_unit_el is None: continue study_number = self._get_study_number_from_study_unit_element(referenced_study_unit_el, raise_error_if_missing=False) if study_number is not None: study_group.add_study_numbers(as_valid_identifier(study_number)) yield study_group