Source code for kuha_common.document_store.mappings.ddi.lifecycle

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2025 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
#
"""Common base for DDI Lifecycle (version 3.x) mapping profiles"""
from itertools import chain
from kuha_common.document_store.mappings.exceptions import (
    UnknownXMLRoot,
    MappingError,
)
from kuha_common.document_store.mappings.xmlbase import (
    XMLParserBase,
    MappedParams,
    MissingRequiredAttribute,
    as_valid_identifier,
    element_remove_whitespaces,
    fixed_value,
    str_equals,
    get_preferred_publication_id_agency_pair,
)


_MSG_IMPL_SUBCLASS = "Implement in subclass"


[docs] class DDILifecycleParserBase(XMLParserBase): def __init__(self, root_element): self.study_unit_element = self._find_study_unit_element(root_element) # Runtime cache to optimize getters self._elements_cache = {} super().__init__(root_element) @classmethod def _DDIInstance_tag(cls): return "{%s}DDIInstance" % (cls.NS["ddi"],) @classmethod def _is_DDIInstance(cls, element): return element.tag == cls._DDIInstance_tag() @property def _ddiinstance_element(self): xpath = ".//ddi:DDIInstance" if xpath in self._elements_cache: return self._elements_cache[xpath] ddiinstance_el = self.root_element if self._is_DDIInstance(self.root_element) else self._find(xpath) self._elements_cache[xpath] = ddiinstance_el return ddiinstance_el @property def _study_unit_language(self): """Get language of StudyUnit element. Returns :attr:`root_language` if StudyUnit does not declare a language. :returns: Language :rtype: str """ return self._get_xmllang(self.study_unit_element, default=self.root_language) def _find_study_unit_element(self, root_element): raise NotImplementedError(_MSG_IMPL_SUBCLASS) def _iter_reference_values(self, xpath_to_parent, element, *elements): raise NotImplementedError(_MSG_IMPL_SUBCLASS) def _find_by_reference_value(self, ref_val, xpath, element=None): raise NotImplementedError(_MSG_IMPL_SUBCLASS) def _find_by_reference(self, ref_el, xpath, element=None): if ref_el is None or ref_el.text in ('', None): return None return self._find_by_reference_value(''.join(ref_el.itertext()).strip(), xpath, element=element) def _find_and_iter_referred_els(self, ref_xpath, target_xpath, *ref_elements, lookup_element=None): """Finds referred elements and yields them one by one. :param str ref_xpath: xpath to reference element :param str target_xpath: xpath to target element :param ref_elements: elements to search through for reference element :param lookup_element: find target from this element's children instead of root :returns: generator yielding referenced elements """ if ref_elements != (): for ref_val in self._iter_reference_values(ref_xpath, *ref_elements): candidate = self._find_by_reference_value(ref_val, target_xpath, element=lookup_element) if candidate is not None: yield candidate def _iter_archives_from_element(self, element): raise NotImplementedError(_MSG_IMPL_SUBCLASS) def _iter_funding_informations_from_element(self, study_unit_element): yield from self._findall("./r:FundingInformation", study_unit_element) def _get_study_number_from_study_unit_element(self, study_unit_element, raise_error_if_missing=True): archive_els = list(self._iter_archives_from_element(study_unit_element)) for candidate in self._findall_from_elements(archive_els, "./a:ArchiveSpecific/a:Collection/a:CallNumber"): if candidate is not None and candidate.text not in ["", None]: return candidate.text for candidate in self._findall_from_elements(archive_els, "./a:ArchiveSpecific/a:Item/a:CallNumber"): if candidate is not None and candidate.text not in ["", None]: return candidate.text candidate = study_unit_element.find("./r:UserID", self.NS) if candidate is not None and candidate.text not in ["", None]: return candidate.text if raise_error_if_missing: raise MissingRequiredAttribute( "./r:UserID", "./a:Archive/a:ArchiveSpecific/a:Collection/a:CallNumber", "./a:Archive/a:ArchiveSpecific/a:Item/a:CallNumber", msg="Unable to find study number from %s, %s or %s", ) return None def _parse_study_number(self): self.study_number = self._get_study_number_from_study_unit_element(self.study_unit_element) def _iter_data_collections_from_element(self, element): for ref_id_el in element.findall('./s:DataCollectionReference/r:ID', self.NS): candidate = self._find_by_reference(ref_id_el, './/dc:DataCollection') if candidate is not None: yield candidate yield from element.findall('./dc:DataCollection', self.NS) def _iter_collection_periods_as_mapped_params(self): """Generate collection periods as :obj:`MappedParams` Returns a generator which yields :obj:`MappedParams` instances containing collection periods. .. note:: DDI 3.1. supports only single DataCollectionDate for each CollectionEvent. That is not enforced here. :returns: Generator yielding collection periods. """ data_colls = self._iter_data_collections_from_element(self.study_unit_element) for dc_date in self._findall_from_elements(data_colls, "./dc:CollectionEvent/dc:DataCollectionDate"): # DataCollectionDate can contain either SimpleDate or StartDate and EndDate simple_date = dc_date.find("./r:SimpleDate", self.NS) if simple_date is not None: params = MappedParams(simple_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: "single"}) yield params continue start_date = dc_date.find("./r:StartDate", self.NS) if start_date is not None: params = MappedParams(start_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: "start"}) yield params end_date = dc_date.find("./r:EndDate", self.NS) if end_date is not None: # It is a violation of the DDI31 standard to have an EndDate without # a StartDate. Kuha won't mind however. params = MappedParams(end_date.text) params.set_language(self._study_unit_language) params.keyword_arguments.update({self._study_cls.collection_periods.attr_event.name: "end"}) yield params def _get_role_and_grant_numbers_from_funding_info_el(self, funding_info_el): role = funding_info_el.attrib.get('role') grant_numbers = [] for grant_number_el in self._findall('./r:GrantNumber', funding_info_el): grant_numbers.append(''.join(grant_number_el.itertext())) return role, grant_numbers def _iter_grant_numbers_as_mapped_params(self): for funding_info_el in self._iter_funding_informations_from_element(self.study_unit_element): role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el(funding_info_el) for grant_number in grant_numbers: params = MappedParams(grant_number) params.set_language(self._get_xmllang(funding_info_el, self.root_language)) params.keyword_arguments.update({self._study_cls.grant_numbers.attr_role.name: role}) yield params def _iter_params_from_othmat_properties( self, title_str_els, desc_str_els=None, uri=None, distribution_date=None, id_agency_pair=None ): desc_str_els = desc_str_els or [] _id, agency = id_agency_pair if id_agency_pair else (None, None) langs = [ self._get_xmllang(element, self._study_unit_language) for element in chain.from_iterable([title_str_els, desc_str_els]) ] mapped_langs_values = {lang: {} for lang in langs} for title_str_el in title_str_els: mapped_langs_values[self._get_xmllang(title_str_el, self._study_unit_language)].update( {'title': ''.join(title_str_el.itertext())} ) for desc_str_el in desc_str_els: mapped_langs_values[self._get_xmllang(desc_str_el, self._study_unit_language)].update( {'desc': ''.join(desc_str_el.itertext())} ) for lang, values in mapped_langs_values.items(): params = MappedParams(values.get('title')) params.set_language(lang) params.keyword_arguments.update( { self._study_cls.related_publications.attr_description.name: values.get('desc'), self._study_cls.related_publications.attr_uri.name: uri, self._study_cls.related_publications.attr_distribution_date.name: distribution_date, self._study_cls.related_publications.attr_identifier.name: _id, self._study_cls.related_publications.attr_identifier_agency.name: agency, } ) yield params @property def _variable_elements(self): """Variable elements generator. First look for logicalproducts defined as a child of study_unit_element. If none found, try to look them by references from all children of root_element. :returns: generator yielding Variable elements. """ logicalproducts = self.study_unit_element.findall("./l:LogicalProduct", self.NS) if logicalproducts == []: ref_id_els = self.study_unit_element.findall(".//s:LogicalProductReference/r:ID", self.NS) ref_id_els += self.study_unit_element.findall(".//pd:LogicalProductReference/r:ID", self.NS) for ref_id_el in set(ref_id_els): logicalproduct_el = self._find_by_reference(ref_id_el, ".//l:LogicalProduct") if logicalproduct_el is not None: logicalproducts.append(logicalproduct_el) for varscheme_ref_id_el in self._findall_from_elements(logicalproducts, ".//l:VariableSchemeReference/r:ID"): if varscheme_ref_id_el.text in [None, ""]: continue ref_id = varscheme_ref_id_el.text.strip() yield from self.root_element.findall(f".//l:VariableScheme[@id='{ref_id}']/l:Variable", self.NS) def _iter_code_elements_by_reference(self, ref_id): yield from self.root_element.findall(f".//l:CodeScheme[@id='{ref_id}']/l:Code", self.NS) @property def _variable_maps(self): return [ ( self._variable_cls.add_variable_name, self._map_single("./l:VariableName", required=True).set_value_conversion(as_valid_identifier), ), (self._variable_cls.add_variable_labels, self._map_multi("./r:Label")), ( self._variable_cls.add_question_identifiers, self._map_multi("./l:QuestionReference/r:ID", localizable=False).set_value_conversion( as_valid_identifier ), ), ] def _add_codelist_codes_to_variable(self, variable, var_el): codeschemeref_id_el = var_el.find( "./l:Representation/l:CodeRepresentation/r:CodeSchemeReference/r:ID", self.NS, ) if codeschemeref_id_el is None or codeschemeref_id_el.text in [None, ""]: return missing_values = set( var_el.find("./l:Representation/l:CodeRepresentation", self.NS).attrib.get("missingValue", "").split() ) for code_el in self._iter_code_elements_by_reference(codeschemeref_id_el.text.strip()): value_el = code_el.find("./l:Value", self.NS) code_value = value_el.text.strip() if value_el is not None and value_el.text not in [None, ""] else None missing_value = code_value in missing_values if missing_value: missing_values.remove(code_value) category_ref_id_el = code_el.find("./l:CategoryReference/r:ID", self.NS) if category_ref_id_el is None or category_ref_id_el.text in [None, ""]: if code_value is not None: variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value) continue category_el = self._find_by_reference(category_ref_id_el, ".//l:Category") label_els = category_el.findall("./r:Label", self.NS) if category_el is not None else [] if label_els == []: if code_value is not None: variable.add_codelist_codes(code_value, self._study_unit_language, missing=missing_value) continue for label_el in label_els: # label_text may be None or '' label_text = label_el.text.strip() if label_el.text is not None else None variable.add_codelist_codes( code_value, self._get_xmllang(label_el, default=self._study_unit_language), label=label_text, missing=missing_value, ) for missing_value in missing_values: variable.add_codelist_codes(missing_value, self._study_unit_language, missing=True) @property def variables(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Variable`. :returns: Generator to Populate Document Store Variable records. """ if self.study_number is None: self._parse_study_number() for var_el in self._variable_elements: variable = self._variable_cls() variable.add_study_number(self.study_number_identifier) self._map_to_record(variable, var_el, self._variable_maps) self._add_codelist_codes_to_variable(variable, var_el) yield variable @property def _question_maps(self): return [ ( self._question_cls.add_question_identifier, self._map_single("./r:UserID", required=True).set_value_conversion(as_valid_identifier), ), ( self._question_cls.add_question_texts, self._map_multi("./dc:QuestionText").set_value_getter(element_remove_whitespaces), ), ] def _iter_question_elements_by_reference_elements(self, ref_id_elements): for ref_id_el in ref_id_elements: question_el = self._find_by_reference(ref_id_el, ".//dc:QuestionItem") if question_el is not None: yield question_el @property def questions(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.Question`. :returns: Generator to Populate Document Store Question records. """ if self.study_number is None: self._parse_study_number() for var_el in self._variable_elements: for question_el in self._iter_question_elements_by_reference_elements( var_el.findall("./l:QuestionReference/r:ID", self.NS) ): question = self._question_cls() self._map_to_record(question, question_el, self._question_maps) question.add_study_number(self.study_number_identifier) var_name = var_el.find("./l:VariableName", self.NS) if var_name is not None and var_name.text not in [None, ""]: question.add_variable_name(as_valid_identifier(var_name.text.strip())) codelist_ref_el = question_el.find("./dc:CodeDomain/r:CodeSchemeReference/r:ID", self.NS) if codelist_ref_el is None or codelist_ref_el.text in [None, ""]: yield question continue for code_el in self._iter_code_elements_by_reference(codelist_ref_el.text.strip()): self._map_to_record( question, code_el, [ ( self._question_cls.add_codelist_references, self._map_multi("./l:Value"), ) ], ) yield question @property def _study_group_elements(self): """Generator iterates group elements which contain g:StudyUnit children. :returns: Generator yielding g:Group elements. """ for group_el in self.root_element.findall(".//g:Group", self.NS): if "{%s}StudyUnit" % (self.NS["g"],) in [_.tag for _ in group_el]: yield group_el @property def _study_group_maps(self): """These are relative to g:Group""" return [ ( self._studygroup_cls.add_study_group_identifier, self._map_single("./r:UserID", required=True).set_value_conversion(as_valid_identifier), ), ( self._studygroup_cls.add_study_group_names, self._map_multi("./r:Citation/r:Title"), ), ( self._studygroup_cls.add_descriptions, self._map_multi("./g:Abstract/r:Content"), ), ( self._studygroup_cls.add_uris, self._map_single(".", "externalReferenceDefaultURI", localizable=True), ), ] @property def study_groups(self): """Parse XML to create and populate :obj:`kuha_common.document_store.records.StudyGroup`. :returns: Generator to Populate Document Store StudyGroup records. """ for group_el in self._study_group_elements: study_group = self._studygroup_cls() self._map_to_record(study_group, group_el, self._study_group_maps) for g_study_unit_el in group_el.findall("./g:StudyUnit", self.NS): study_number = None if "{%s}StudyUnit" % (self.NS["s"],) in [_.tag for _ in g_study_unit_el]: study_number = self._get_study_number_from_study_unit_element( g_study_unit_el.find("./s:StudyUnit", self.NS), raise_error_if_missing=False, ) else: ref_id_el = g_study_unit_el.find("./g:Reference/r:ID", self.NS) referenced_study_unit_el = self._find_by_reference(ref_id_el, ".//s:StudyUnit") if referenced_study_unit_el is None: continue study_number = self._get_study_number_from_study_unit_element( referenced_study_unit_el, raise_error_if_missing=False ) if study_number is not None: study_group.add_study_numbers(as_valid_identifier(study_number)) yield study_group
[docs] class DDILifecycle32ParserBase(DDILifecycleParserBase): """Common base for DDI 3.2 & DDI 3.3 DDI 3.2 changes how references work when comparing to DDI 3.1. It also introduces FragmentInstances. """ _XPATH_REL_DESC_CONTENT = './r:Description/r:Content' _XPATH_REL_TYPEOFOBJECT = './r:TypeOfObject' @property def _att_cv_name(self): raise NotImplementedError(_MSG_IMPL_SUBCLASS) @property def _att_cv_urn(self): raise NotImplementedError(_MSG_IMPL_SUBCLASS) def _find_study_unit_element(self, root_element): expected_fragmentinstance_root = '{%s}FragmentInstance' % (self.NS['ddi'],) expected_studyunit_root = '{%s}StudyUnit' % (self.NS['s'],) if self._is_DDIInstance(root_element): study_unit_elements = list(root_element.iter('{%s}StudyUnit' % (self.NS['s'],))) study_unit_count = len(study_unit_elements) if study_unit_count > 1: # Currently supports only a single s:StudyUnit in xml metadata. raise MappingError("Unable to parse multiple StudyUnit elements") if study_unit_count < 1: raise MappingError("Unable to find StudyUnit element") return study_unit_elements.pop() if root_element.tag == expected_fragmentinstance_root: study_unit_elements = list( self._find_and_iter_referred_els( './ddi:TopLevelReference', './/s:StudyUnit', root_element, lookup_element=root_element ) ) study_unit_count = len(study_unit_elements) if study_unit_count > 1: # Currently supports only a single s:StudyUnit in xml metadata. raise MappingError("Unable to parse multiple StudyUnit elements") if study_unit_count < 1: raise MappingError("Unable to find StudyUnit element") return study_unit_elements.pop() if root_element.tag == expected_studyunit_root: return root_element raise UnknownXMLRoot( root_element.tag, expected_fragmentinstance_root, self._DDIInstance_tag(), expected_studyunit_root ) def _get_reference_urn_from_element(self, element): """Look for URN or (Agency, ID, Version) triplet from element's children. Returns it as an urn (urn:ddi:<agency>:<id>:<version>) :param element: Look up this element's children. :returns: URN or empty string if no URN is found. :rtype: str """ urn_el = self._find('./r:URN', element) if urn_el is not None and urn_el.text not in ('', None): return ''.join(urn_el.itertext()) # Look for Agency & ID & Version parts = '' for xpath in ('./r:Agency', './r:ID', './r:Version'): part_el = self._find(xpath, element) if part_el is None: return '' parts += ':%s' % (''.join(part_el.itertext()),) return f'urn:ddi{parts}' def _iter_reference_values(self, xpath_to_parent, element, *elements): elements = (element,) + elements for candidate_el in self._findall_from_elements(elements, xpath_to_parent): # Look for URN first urn_str = self._get_reference_urn_from_element(candidate_el) if urn_str == '': continue yield urn_str def _find_by_reference_value(self, ref_val, xpath, element=None): candidate_el = self._find(f'{xpath}/[r:URN="{ref_val}"]', element) if candidate_el is not None: return candidate_el agency_val, id_val, version_val = ref_val.split(':')[-3:] for candidate_el in self._findall(f'{xpath}/[r:ID="{id_val}"]', element=element): candidate_agency_el = self._find('./r:Agency', element=candidate_el) if candidate_agency_el is None or ''.join(candidate_agency_el.itertext()) != agency_val: continue candidate_version_el = self._find('./r:Version', element=candidate_el) if candidate_version_el is None or ''.join(candidate_version_el.itertext()) != version_val: continue return candidate_el def _iter_description_and_lang(self, element, default_lang=None): for content_el in self._findall(self._XPATH_REL_DESC_CONTENT, element): yield ''.join(content_el.itertext()), self._get_xmllang(content_el, default=default_lang) def _get_spatial_coverage_from_element(self, element): # StudyUnit may have 0 - 1 Coverage elements. # Coverage may have 0 - 1 SpatialCoverage/SpatialCoverageReference elements. # So StudyUnit may only have 0 - 1 SpatialCoverage elements. ref_el = self._find_by_reference( self._find('./r:Coverage/r:SpatialCoverageReference/r:URN', element), './/r:SpatialCoverage' ) return ref_el if ref_el is not None else self._find('./r:Coverage/r:SpatialCoverage', element) def _iter_study_area_countries_from_spatcov_el(self, spatial_coverage_el): countrycodes_content_els = [ {'countrycode': ''.join(cc_el.itertext()), 'content_els': []} for cc_el in self._findall('./r:CountryCode', spatial_coverage_el) ] content_index = 0 def _add_content_els(content_els): nonlocal content_index if content_els == []: return if len(countrycodes_content_els) <= content_index: countrycodes_content_els.append({'countrycode': None, 'content_els': content_els}) else: countrycodes_content_els[content_index]['content_els'] = content_els content_index += 1 for geographic_location_ref_el in spatial_coverage_el.findall('./r:GeographicLocationReference/r:URN', self.NS): # This is the primary lookup location as it supports multiple locations. geographic_location_el = self._find_by_reference(geographic_location_ref_el, './/r:GeographicLocation') _add_content_els(self._findall(self._XPATH_REL_DESC_CONTENT, geographic_location_el)) _add_content_els(self._findall(self._XPATH_REL_DESC_CONTENT, spatial_coverage_el)) for countrycode_content_els in countrycodes_content_els: if countrycode_content_els['content_els'] == []: params = MappedParams(None) params.set_language(self._get_xmllang(spatial_coverage_el, default=self.root_language)) params.keyword_arguments.update( { self._study_cls.study_area_countries.attr_abbreviation.name: countrycode_content_els[ 'countrycode' ] } ) yield params continue for content_el in countrycode_content_els['content_els']: params = MappedParams(''.join(content_el.itertext())) params.set_language(self._get_xmllang(content_el, default=self.root_language)) params.keyword_arguments.update( { self._study_cls.study_area_countries.attr_abbreviation.name: countrycode_content_els[ 'countrycode' ] } ) yield params def _get_topical_coverage_from_element(self, element): ref_el = self._find_by_reference( self._find('./r:Coverage/r:TopicalCoverageReference/r:URN', element), './/r:TopicalCoverage' ) return ref_el if ref_el is not None else self._find('./r:Coverage/r:TopicalCoverage', element) def _get_conceptual_components_from_element(self, element): cc_els = [] for cc_el in self._find_and_iter_referred_els( './r:ConceptualComponentReference', './/c:ConceptualComponent', element ): cc_els.append(cc_el) for cc_el in self._findall('./c:ConceptualComponent', element): if cc_el is not None: cc_els.append(cc_el) return cc_els def _iter_archives_from_element(self, element): yield from self._findall('./a:Archive', element) yield from self._find_and_iter_referred_els('./r:ArchiveReference', './/a:Archive', element) def _iter_data_collections_from_element(self, element): yield from self._findall('./dc:DataCollection', element) yield from self._find_and_iter_referred_els('./r:DataCollectionReference', './/dc:DataCollection', element) def _iter_physical_instances_from_element(self, element): yield from self._findall('./pi:PhysicalInstance', element) yield from self._find_and_iter_referred_els('./r:PhysicalInstanceReference', './/pi:PhysicalInstance', element) def _iter_methodologys_from_study_unit(self, study_unit_element): data_colls = list(self._iter_data_collections_from_element(study_unit_element)) for data_coll_el in data_colls: methodology_el = self._find('./dc:Methodology', data_coll_el) if methodology_el is not None: yield methodology_el yield from self._find_and_iter_referred_els('./dc:MethodologyReference', './/dc:Methodology', *data_colls) def _iter_groups_from_element(self, element): yield from self._findall('./g:Group', element) yield from self._find_and_iter_referred_els('./r:GroupReference', './/g:Group', element) def _iter_study_area_countries_as_mapped_params(self): spatial_coverage_el = self._get_spatial_coverage_from_element(self.study_unit_element) if spatial_coverage_el is None: return [] return self._iter_study_area_countries_from_spatcov_el(spatial_coverage_el) def _iter_identifiers_as_mapped_params(self): for element in self._findall('./r:UserID', self.study_unit_element): if element.get('typeOfUserID') == 'StudyNumber' and element.text not in ["", None]: # This is the primary and excludes all other locations params = MappedParams(''.join(element.itertext())) params.set_language(self._study_unit_language) yield params # Return in generator stops iteration return archive_els = list(self._iter_archives_from_element(self.study_unit_element)) for element in chain.from_iterable( ( self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Collection/a:CallNumber'), self._findall_from_elements(archive_els, './a:ArchiveSpecific/a:Item/a:CallNumber'), ) ): value = ''.join(element.itertext()) if not value: continue params = MappedParams(value) params.set_language(self._study_unit_language) yield params for inter_ident_el in self._findall('./r:Citation/r:InternationalIdentifier', self.study_unit_element): params = MappedParams(''.join(self._find('./r:IdentifierContent', element=inter_ident_el).itertext())) params.set_language(self._study_unit_language) params.keyword_arguments.update( { self._study_cls.identifiers.attr_agency.name: ''.join( self._find('./r:ManagingAgency', element=inter_ident_el).itertext() ) } ) yield params def _iter_principal_investigators_as_mapped_params(self): def _from_organization(string_el): params = MappedParams(None) params.keyword_arguments.update( {self._study_cls.principal_investigators.attr_organization.name: ''.join(string_el.itertext())} ) params.set_language(self._get_xmllang(string_el, default=self._study_unit_language)) return params def _from_individual(string_el): params = MappedParams(''.join(string_el.itertext())) params.set_language(self._get_xmllang(string_el, default=self._study_unit_language)) return params for creator_name_el in self._findall('./r:Citation/r:Creator/r:CreatorName', self.study_unit_element): for string_el in self._findall('./r:String', creator_name_el): params = MappedParams(''.join(string_el.itertext())) params.set_language(self._get_xmllang(string_el, default=self._study_unit_language)) self._get_attr_and_set_param( params, self._study_cls.principal_investigators.attr_organization.name, creator_name_el, 'affiliation', ) yield params for ref_el in self._findall('./r:Citation/r:Creator/r:CreatorReference', self.study_unit_element): ref_type = ''.join(self._find(self._XPATH_REL_TYPEOFOBJECT, ref_el).itertext()) xpath_to_referenced_el, xpath_to_string_el, getter = { 'Organization': ( './/a:Organization', './a:OrganizationIdentification/a:OrganizationName/r:String', _from_organization, ), 'Individual': ( './/a:Individual', './a:IndividualIdentification/a:IndividualName/a:FullName/r:String', _from_individual, ), }[ref_type] referenced_el = self._find_by_reference(self._find('./r:URN', ref_el), xpath_to_referenced_el) if referenced_el is None: continue for string_el in self._findall(xpath_to_string_el, referenced_el): yield getter(string_el) def _iter_classifications_as_mapped_params(self): topcov_el = self._get_topical_coverage_from_element(self.study_unit_element) for subject_el in self._findall('./r:Subject', topcov_el): params = MappedParams(None) params.set_language(self._get_xmllang(subject_el, default=self._study_unit_language)) params.keyword_arguments.update( { self._study_cls.classifications.attr_description.name: ''.join(subject_el.itertext()), self._study_cls.classifications.attr_system_name.name: subject_el.get(self._att_cv_name, None), self._study_cls.classifications.attr_uri.name: subject_el.get(self._att_cv_urn, None), } ) yield params def _iter_keywords_as_mapped_params(self): topcov_el = self._get_topical_coverage_from_element(self.study_unit_element) for subject_el in self._findall('./r:Keyword', topcov_el): params = MappedParams(None) params.set_language(self._get_xmllang(subject_el, default=self._study_unit_language)) params.keyword_arguments.update( { self._study_cls.classifications.attr_description.name: ''.join(subject_el.itertext()), self._study_cls.classifications.attr_system_name.name: subject_el.get(self._att_cv_name, None), self._study_cls.classifications.attr_uri.name: subject_el.get(self._att_cv_urn, None), } ) yield params @property def _study_maps(self): return { 'StudyUnit': [ (self._study_cls.add_study_titles, self._map_multi('./r:Citation/r:Title/r:String')), (self._study_cls.add_parallel_titles, self._map_multi('./r:Citation/r:AlternateTitle/r:String')), (self._study_cls.add_abstract, self._map_multi('./r:Abstract/r:Content')), ( self._study_cls.add_publication_years, self._map_multi('./r:Citation/r:PublicationDate/r:SimpleDate').add_attribute( self._study_cls.publication_years.attr_distribution_date.name, self._map_single('.') ), ), ( self._study_cls.add_analysis_units, self._map_multi('./r:AnalysisUnit') .set_value_conversion(fixed_value(None)) .add_attribute(self._study_cls.analysis_units.attr_description.name, self._map_single('.')) .add_attribute( self._study_cls.analysis_units.attr_system_name.name, self._map_single('.', self._att_cv_name) ) .add_attribute( self._study_cls.analysis_units.attr_uri.name, self._map_single('.', self._att_cv_urn) ), ), (self._study_cls.add_data_collection_copyrights, self._map_multi('./r:Citation/r:Copyright/r:String')), ], 'DDIInstance': [ (self._study_cls.add_document_titles, self._map_multi('./r:Citation/r:Title/r:String')), (self._study_cls.add_copyrights, self._map_multi('./r:Citation/r:Copyright/r:String')), ], } def _iter_study_uris_as_mapped_params(self): """Generate Study URIs :obj:`MappedParams` There is no single element to hold the URI that points to the study description web resource. Lookup multiple locations in the following order: 1. .//ddi:StudyUnit/a:Archive/a:ArchiveSpecific/a:Collection/a:URI * will also lookup archive-element by reference * a:Collection/a:CallNumber CDATA must match study_number 2. .//ddi:StudyUnit/a:Archive/a:ArchiveSpecific/a:Item/a:URI * will also lookup archive-element by reference * a:Item/a:CallNumber CDATA must match study_number 3. .//ddi:StudyUnit/r:UserID * typeOfUserID-attribute must be one of ['DOI', 'URL', 'URLServiceProvider'] """ def _dict_append(_dct): def _append(lang, key, value): valid_keys = ('description', 'location') if key not in valid_keys: raise ValueError( "Invalid key '%s'. Expecting one of %s" % (key, ', '.join("'%s'" % (x,) for x in valid_keys)) ) if lang not in _dct: _dct[lang] = {key: value} elif key not in _dct[lang]: _dct[lang].update({key: value}) else: _dct[lang][key] += ' ' + value return _append def _from_organization(org_el): langs_attrs = {} appender = _dict_append(langs_attrs) for org_name_str_el in self._findall('./a:OrganizationIdentification/a:OrganizationName/r:String', org_el): cur_lang = self._get_xmllang(org_name_str_el, self._study_unit_language) cur_str = ''.join(org_name_str_el.itertext()) appender(cur_lang, 'location', cur_str) for cur_str, cur_lang in self._iter_description_and_lang(org_el, default_lang=self._study_unit_language): appender(cur_lang, 'description', cur_str) return langs_attrs def _from_individual(ind_el): langs_attrs = {} appender = _dict_append(langs_attrs) for ind_name_str_el in self._findall( './a:IndividualIdentification/a:IndividualName/a:FullName/r:String', ind_el ): appender( self._get_xmllang(ind_name_str_el, self._study_unit_language), 'location', ''.join(ind_name_str_el.itertext()), ) for cur_str, cur_lang in self._iter_description_and_lang(ind_el, default_lang=self._study_unit_language): appender(cur_lang, 'description', cur_str) return langs_attrs for archive_spec_el in self._findall_from_elements( self._iter_archives_from_element(self.study_unit_element), './a:ArchiveSpecific' ): uri_str = None uri_el = self._find(f"./a:Collection/[a:CallNumber='{self.study_number}']/r:URI", archive_spec_el) if uri_el is None: uri_el = self._find(f"./a:Item/[a:CallNumber='{self.study_number}']/r:URI", archive_spec_el) if uri_el is not None: uri_str = ''.join(uri_el.itertext()) arch_org_ref_el = self._find('./a:ArchiveOrganizationReference', archive_spec_el) if arch_org_ref_el is None: if uri_el is not None: params = MappedParams(uri_str) params.set_language(self._study_unit_language) yield params continue ref_obj_type = ''.join( self._find('./a:ArchiveOrganizationReference/r:TypeOfObject', archive_spec_el).itertext() ) target_xpath, getter = { 'Organization': ('.//a:Organization', _from_organization), 'Individual': ('.//a:Individual', _from_individual), }[ref_obj_type] for referred_el in self._find_and_iter_referred_els( './a:ArchiveOrganizationReference', target_xpath, archive_spec_el ): for lang, attrs in getter(referred_el).items(): params = MappedParams(uri_str) params.set_language(lang) params.keyword_arguments.update( { self._study_cls.study_uris.attr_location.name: attrs.get('location'), self._study_cls.study_uris.attr_description.name: attrs.get('description'), } ) yield params for userid_el in self._findall('./r:UserID', self.study_unit_element): if userid_el.get('typeOfUserID') in ('DOI', 'URL', 'URLServiceProvider'): params = MappedParams(''.join(userid_el.itertext())) params.set_language(self._get_xmllang(userid_el, default=self._study_unit_language)) yield params def _iter_universes_from_conceptual_components(self, conceptual_component_elements): uni_scheme_els = [] for uni_scheme_el in self._find_and_iter_referred_els( './r:UniverseSchemeReference', './/c:UniverseScheme', *conceptual_component_elements ): uni_scheme_els.append(uni_scheme_el) for uni_scheme_el in self._findall_from_elements(conceptual_component_elements, './c:UniverseScheme'): if uni_scheme_el is not None: uni_scheme_els.append(uni_scheme_el) for uni_el in self._find_and_iter_referred_els('./r:UniverseReference', './/c:Universe', *uni_scheme_els): yield uni_el for uni_el in self._findall_from_elements(uni_scheme_els, './c:Universe'): if uni_el is not None: yield uni_el def _iter_universes_as_mapped_params(self): # ConceptualComponent / ConceptualComponentReference * # UniverseScheme / UniverseSchemeReference * # Universe / UniverseReference * # Actually even UniverseScheme may refer another UniverseScheme. Not going to recurse that deep in this point. inc_to_bool = str_equals('true', True) for uni_el in self._iter_universes_from_conceptual_components( self._get_conceptual_components_from_element(self.study_unit_element) ): included = inc_to_bool(uni_el.get('isInclusive')) for desc, lang in self._iter_description_and_lang(uni_el, default_lang=self._study_unit_language): params = MappedParams(desc) params.set_language(lang) params.keyword_arguments.update({self._study_cls.universes.attr_included.name: included}) yield params def _iter_publishers_as_mapped_params(self): for publisher_string_el in self._findall( './r:Citation/r:Publisher/r:PublisherName/r:String', self.study_unit_element ): params = MappedParams(''.join(publisher_string_el.itertext())) params.set_language(self._get_xmllang(publisher_string_el, default=self._study_unit_language)) yield params for publisher_ref_el in self._findall('./r:Citation/r:Publisher/r:PublisherReference', self.study_unit_element): xpath_to_referenced_el, xpath_to_string_el = { 'Organization': ('.//a:Organization', './a:OrganizationIdentification/a:OrganizationName/r:String'), 'Individual': ('.//a:Individual', './a:IndividualIdentification/a:IndividualName/a:FullName/r:String'), }[''.join(self._find(self._XPATH_REL_TYPEOFOBJECT, publisher_ref_el).itertext())] referenced_el = self._find_by_reference(self._find('./r:URN', publisher_ref_el), xpath_to_referenced_el) if referenced_el is None: continue for string_el in self._findall(xpath_to_string_el, element=referenced_el): params = MappedParams(''.join(string_el.itertext())) params.set_language(self._get_xmllang(string_el, default=self._study_unit_language)) yield params def __iter_cdata_and_lang_from_element_as_params(self, element, xpath, *xpaths): """Helper combines common functionality of data_access lookup""" for content_el in self._findall_by_priority(xpath, *xpaths, element=element): params = MappedParams(element_remove_whitespaces(content_el)) params.set_language(self._get_xmllang(content_el, default=self._study_unit_language)) yield params def _iter_data_access_as_mapped_params(self): for archive_el in self._iter_archives_from_element(self.study_unit_element): yield from self.__iter_cdata_and_lang_from_element_as_params( archive_el, # Primary lookup xpath './a:ArchiveSpecific/a:Item/a:Access/r:Description/r:Content', # Second lookup xpath './a:ArchiveSpecific/a:DefaultAccess/a:Restrictions/r:Content', # Third lookup xpath './a:ArchiveSpecific/a:Collection/a:DefaultAccess/a:Restrictions/r:Content', ) def _iter_data_access_descriptions_as_mapped_params(self): for archive_el in self._iter_archives_from_element(self.study_unit_element): yield from self.__iter_cdata_and_lang_from_element_as_params( archive_el, # Primary lookup xpath './a:ArchiveSpecific/a:DefaultAccess/a:AccessConditions/r:Content', # Secondary lookup xpath './a:ArchiveSpecific/a:Collection/a:DefaultAccess/a:AccessConditions/r:Content', ) def _iter_citation_requirements_as_mapped_params(self): for archive_el in self._iter_archives_from_element(self.study_unit_element): yield from self.__iter_cdata_and_lang_from_element_as_params( archive_el, # Primary './a:ArchiveSpecific/a:DefaultAccess/a:CitationRequirement/r:Content', # Secondary './a:ArchiveSpecific/a:Collection/a:DefaultAccess/a:CitationRequirement/r:Content', ) def _iter_deposit_requirements_as_mapped_params(self): for archive_el in self._iter_archives_from_element(self.study_unit_element): yield from self.__iter_cdata_and_lang_from_element_as_params( archive_el, # Primary './a:ArchiveSpecific/a:DefaultAccess/a:DepositRequirement/r:Content', # Secondary './a:ArchiveSpecific/a:Collection/a:DefaultAccess/a:DepositRequirement/r:Content', ) def _get_info_from_typeof_el(self, type_of_el): if type_of_el is not None: value = element_remove_whitespaces(type_of_el) cv_name = type_of_el.get(self._att_cv_name) cv_urn = type_of_el.get(self._att_cv_urn) else: value = None cv_name = None cv_urn = None return value, cv_name, cv_urn def _iter_time_methods_as_mapped_params(self): for time_method in self._findall_from_elements( self._iter_methodologys_from_study_unit(self.study_unit_element), './dc:TimeMethod' ): type_of_el = self._find('./dc:TypeOfTimeMethod', time_method) descs_langs = list(self._iter_description_and_lang(time_method, default_lang=self._study_unit_language)) if (type_of_el, descs_langs) == (None, []): continue value, cv_name, cv_urn = self._get_info_from_typeof_el(type_of_el) if not descs_langs: params = MappedParams(value) params.set_language(self._study_unit_language) params.keyword_arguments.update( { self._study_cls.time_methods.attr_system_name.name: cv_name, self._study_cls.time_methods.attr_uri.name: cv_urn, } ) yield params for desc, lang in descs_langs: params = MappedParams(value) params.set_language(lang) params.keyword_arguments.update( { self._study_cls.time_methods.attr_description.name: desc, self._study_cls.time_methods.attr_system_name.name: cv_name, self._study_cls.time_methods.attr_uri.name: cv_urn, } ) yield params def _iter_sampling_procedures_as_mapped_params(self): for samp_proc_el in self._findall_from_elements( self._iter_methodologys_from_study_unit(self.study_unit_element), './dc:SamplingProcedure' ): type_of_el = self._find('./dc:TypeOfSamplingProcedure', samp_proc_el) descs_langs = list(self._iter_description_and_lang(samp_proc_el, default_lang=self._study_unit_language)) if (type_of_el, descs_langs) == (None, []): continue value, cv_name, cv_urn = self._get_info_from_typeof_el(type_of_el) if not descs_langs: params = MappedParams(value) params.set_language(self._study_unit_language) params.keyword_arguments.update( { self._study_cls.sampling_procedures.attr_system_name.name: cv_name, self._study_cls.sampling_procedures.attr_uri.name: cv_urn, } ) yield params for desc, lang in descs_langs: params = MappedParams(value) params.set_language(lang) params.keyword_arguments.update( { self._study_cls.sampling_procedures.attr_description.name: desc, self._study_cls.sampling_procedures.attr_system_name.name: cv_name, self._study_cls.sampling_procedures.attr_uri.name: cv_urn, } ) yield params def _iter_collection_modes_as_mapped_params(self): for mode_of_coll_el in self._findall_from_elements( self._iter_data_collections_from_element(self.study_unit_element), './dc:CollectionEvent/dc:ModeOfCollection', ): type_of_el = self._find('./dc:TypeOfModeOfCollection', mode_of_coll_el) descs_langs = list(self._iter_description_and_lang(mode_of_coll_el, default_lang=self._study_unit_language)) if (type_of_el, descs_langs) == (None, []): continue value, cv_name, cv_urn = self._get_info_from_typeof_el(type_of_el) if not descs_langs: params = MappedParams(value) params.set_language(self._study_unit_language) params.keyword_arguments.update( { self._study_cls.collection_modes.attr_system_name.name: cv_name, self._study_cls.collection_modes.attr_uri.name: cv_urn, } ) yield params for desc, lang in descs_langs: params = MappedParams(value) params.set_language(lang) params.keyword_arguments.update( { self._study_cls.collection_modes.attr_description.name: desc, self._study_cls.collection_modes.attr_system_name.name: cv_name, self._study_cls.collection_modes.attr_uri.name: cv_urn, } ) yield params def _iter_file_names_as_mapped_params(self): for physical_instance_el in self._iter_physical_instances_from_element(self.study_unit_element): # CESSDA instructs to maintain datafile languages in citation/language, but there is no way # to identify the file in that element. Making guesses here. filelangs = [] for language_el in self._findall('./r:Citation/r:Language', physical_instance_el): filelangs.append(''.join(language_el.itertext())) for datafileuri_el in self._findall('./pi:DataFileIdentification/pi:DataFileURI', physical_instance_el): params = MappedParams(''.join(datafileuri_el.itertext())) if not filelangs: params.set_language(self._study_unit_language) else: params.set_language(filelangs.pop(0)) yield params def __iter_params_from_group_properties(self, title_str_els, content_str_els, id_els, uri): """Helper digs out parameters from Group element properties""" ids = [''.join(id_el.itertext()) for id_el in id_els] if id_els != [] else [None] langs = [ self._get_xmllang(element, self._study_unit_language) for element in chain.from_iterable([title_str_els, content_str_els]) ] # Dict comprehension results in unique keys even if langs has duplicates. mapped_langs_values = {lang: {} for lang in langs} for title_str_el in title_str_els: mapped_langs_values[self._get_xmllang(title_str_el, self._study_unit_language)].update( {'title': ''.join(title_str_el.itertext())} ) for content_str_el in content_str_els: mapped_langs_values[self._get_xmllang(content_str_el, self._study_unit_language)].update( {'desc': ''.join(content_str_el.itertext())} ) for _id in ids: for lang, values in mapped_langs_values.items(): params = MappedParams(_id) params.set_language(lang) params.keyword_arguments.update( { self._study_cls.study_groups.attr_name.name: values.get('title'), self._study_cls.study_groups.attr_description.name: values.get('desc'), self._study_cls.study_groups.attr_uri.name: uri, } ) yield params def _iter_study_groups_as_mapped_params(self): if self._ddiinstance_element is None: return for group_el in self._iter_groups_from_element(self._ddiinstance_element): title_str_els = self._findall('./r:Citation/r:Title/r:String', group_el) content_str_els = self._findall('./r:Abstract/r:Content', group_el) id_els = self._findall('./r:Citation/r:InternationalIdentifier/r:IdentifierContent', group_el) uri = group_el.get('externalReferenceDefaultURI') yield from self.__iter_params_from_group_properties(title_str_els, content_str_els, id_els, uri) def _iter_other_materials_for_document_uris(self): # DDI3.2 OtherMaterial element is a direct child of StudyUnit # while DDI3.3 wraps it inside OtherMaterialScheme. # Implement this in subclass. raise NotImplementedError(_MSG_IMPL_SUBCLASS) def _iter_document_uris_as_mapped_params(self): for oth_mat_el in self._iter_other_materials_for_document_uris(): descriptions = list(self._iter_description_and_lang(oth_mat_el, default_lang=self._study_unit_language)) for url_el in self._findall('./r:ExternalURLReference', oth_mat_el): url = ''.join(url_el.itertext()) if not descriptions: param = MappedParams(url) param.set_language(self._get_xmllang(url, self._study_unit_language)) yield param continue for desc, lang in descriptions: param = MappedParams(url) param.set_language(lang) param.keyword_arguments.update({self._study_cls.document_uris.attr_description.name: desc}) yield param def _iter_other_materials_for_related_publications(self): # DDI3.2 OtherMaterial element is a direct child of StudyUnit # while DDI3.3 wraps it inside OtherMaterialScheme. # Implement this in subclass. raise NotImplementedError(_MSG_IMPL_SUBCLASS) def _iter_related_publications_as_mapped_params(self): for oth_mat_el in self._iter_other_materials_for_related_publications(): title_str_els = self._findall('./r:Citation/r:Title/r:String', oth_mat_el) desc_str_els = self._findall(self._XPATH_REL_DESC_CONTENT, oth_mat_el) # Kuha Study model supports only single url for a publication. Take the first one. ext_url_ref_el = self._find('./r:ExternalURLReference', oth_mat_el) uri = ''.join(ext_url_ref_el.itertext()) if ext_url_ref_el is not None else None # Kuha Study model supports only single date for publication. Take the first one. simple_date_el = self._find('./r:Citation/r:PublicationDate/r:SimpleDate', oth_mat_el) distribution_date = ''.join(simple_date_el.itertext()) if simple_date_el is not None else None ids_agencys = [] for id_el in self._findall('./r:Citation/r:InternationalIdentifier', oth_mat_el): ids_agencys.append( ( ''.join(self._find('./r:IdentifierContent', id_el).itertext()), ''.join(self._find('./r:ManagingAgency', id_el).itertext()), ) ) yield from self._iter_params_from_othmat_properties( title_str_els, desc_str_els=desc_str_els, uri=uri, distribution_date=distribution_date, id_agency_pair=get_preferred_publication_id_agency_pair(ids_agencys), ) def _get_role_and_grant_numbers_from_funding_info_el(self, funding_info_el): funder_role_el = self._find('./r:FunderRole', funding_info_el) role = ''.join(funder_role_el.itertext()) if funder_role_el is not None else None grant_numbers = [] for grant_number_el in self._findall('./r:GrantNumber', funding_info_el): grant_numbers.append(''.join(grant_number_el.itertext())) return role, grant_numbers def _iter_funding_agencies_as_mapped_params(self): for funding_info_el in self._iter_funding_informations_from_element(self.study_unit_element): role, grant_numbers = self._get_role_and_grant_numbers_from_funding_info_el(funding_info_el) grant_number = grant_numbers.pop(0) if grant_numbers else None description = ( ' '.join( [''.join(elem.itertext()) for elem in self._findall(self._XPATH_REL_DESC_CONTENT, funding_info_el)] ) or None ) for agency_org_ref_el in self._findall('./r:AgencyOrganizationReference', funding_info_el): xpath_to_referenced_el, xpath_to_string_el = { 'Organization': ('.//a:Organization', './a:OrganizationIdentification/a:OrganizationName/r:String'), 'Individual': ( './/a:Individual', './a:IndividualIdentification/a:IndividualName/a:FullName/r:String', ), }[''.join(self._find(self._XPATH_REL_TYPEOFOBJECT, agency_org_ref_el).itertext())] urn_str = self._get_reference_urn_from_element(agency_org_ref_el) referenced_el = self._find_by_reference_value(urn_str, xpath_to_referenced_el) if referenced_el is None: # Did not find referenced element from this DDI XML. continue for str_el in self._findall(xpath_to_string_el, referenced_el): params = MappedParams(''.join(str_el.itertext())) params.set_language(self._get_xmllang(str_el, self.root_language)) params.keyword_arguments.update( { self._study_cls.funding_agencies.attr_grant_number.name: grant_number, self._study_cls.funding_agencies.attr_role.name: role, self._study_cls.funding_agencies.attr_description.name: description, } ) yield params def _iter_instruments_as_mapped_params_from_instrument_els(self, instrument_els): for instrument_el in instrument_els: type_of_instru_el = self._find('./dc:TypeOfInstrument', instrument_el) type_of_instru_value = None if type_of_instru_el is None else ''.join(type_of_instru_el.itertext()) for string_el in self._findall('./dc:InstrumentName/r:String', instrument_el): params = MappedParams(type_of_instru_value) params.set_language(self._get_xmllang(string_el, default=self._study_unit_language)) params.keyword_arguments.update( {self._study_cls.instruments.attr_instrument_name.name: ''.join(string_el.itertext())} ) yield params def _iter_instruments_as_mapped_params(self): raise NotImplementedError(_MSG_IMPL_SUBCLASS) @property def studies(self): if self.study_number is None: self._parse_study_number() study = self._study_cls() study.add_study_number(self.study_number_identifier) self._map_to_record( study, self.study_unit_element, self._study_maps['StudyUnit'], default_language=self._study_unit_language ) if self._ddiinstance_element is not None: self._map_to_record( study, self._ddiinstance_element, self._study_maps['DDIInstance'], default_language=self._study_unit_language, ) for add_func, mapping in [ (study.add_collection_periods, self._iter_collection_periods_as_mapped_params), (study.add_principal_investigators, self._iter_principal_investigators_as_mapped_params), (study.add_classifications, self._iter_classifications_as_mapped_params), (study.add_keywords, self._iter_keywords_as_mapped_params), (study.add_study_uris, self._iter_study_uris_as_mapped_params), (study.add_universes, self._iter_universes_as_mapped_params), (study.add_study_area_countries, self._iter_study_area_countries_as_mapped_params), (study.add_publishers, self._iter_publishers_as_mapped_params), (study.add_identifiers, self._iter_identifiers_as_mapped_params), (study.add_data_access, self._iter_data_access_as_mapped_params), (study.add_data_access_descriptions, self._iter_data_access_descriptions_as_mapped_params), (study.add_citation_requirements, self._iter_citation_requirements_as_mapped_params), (study.add_deposit_requirements, self._iter_deposit_requirements_as_mapped_params), (study.add_document_uris, self._iter_document_uris_as_mapped_params), (study.add_time_methods, self._iter_time_methods_as_mapped_params), (study.add_sampling_procedures, self._iter_sampling_procedures_as_mapped_params), (study.add_collection_modes, self._iter_collection_modes_as_mapped_params), (study.add_instruments, self._iter_instruments_as_mapped_params), (study.add_file_names, self._iter_file_names_as_mapped_params), (study.add_study_groups, self._iter_study_groups_as_mapped_params), (study.add_related_publications, self._iter_related_publications_as_mapped_params), (study.add_funding_agencies, self._iter_funding_agencies_as_mapped_params), (study.add_grant_numbers, self._iter_grant_numbers_as_mapped_params), ]: for params in mapping(): add_func(*params.arguments, **params.keyword_arguments) yield study