Source code for kuha_common.document_store.mappings.xmlbase

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Components to use for XML parsing & mapping to Document Store records.

Contains a base class to use for parsing XML to Document Store records.
Provides common functions useful in parsing XML data.
"""
import re
import copy
import xml.etree.ElementTree as ET

from kuha_common.document_store.records import (
    COLLECTIONS,
    Study,
    Variable,
    Question,
    StudyGroup
)
from kuha_common.document_store.mappings.exceptions import (
    MappingError,
    ParseError,
    InvalidMapperParameters,
    MissingRequiredAttribute,
    InvalidContent
)


[docs]class MappedParams: """Contains parameters ready to pass to record's add-methods. :class:`XMLMapper` retrieves parameters from XML record and stores them in an instance of this class. The record instances add-methods get called with stored parameters by using tuple and dict unpacking. Example:: mapped_params = MappedParams('study_identifier') mapped_params.set_language('en') mapped_params.keyword_arguments.update({'agency': 'archive'}) study = Study() study.add_identifiers(*mapped_params.arguments, **mapped_params.keyword_arguments) :param value: value used as the first argument :type value: str or None """ def __init__(self, value): self.arguments = (value,) self.keyword_arguments = {}
[docs] def has_language(self): """True if MappedParams has language argument :returns: True if has language, False if not. :rtype: bool """ return len(self.arguments) == 2
[docs] def set_language(self, language): """Set language argument. Will overwrite if previously set. :param language: Language to set. :type language: str """ self.arguments = (self.arguments[0], language)
[docs] def get_language(self): """Get language argument. :returns: Language :rtype: str """ return self.arguments[1]
[docs] def get_value(self): """Get value argument. :returns: value. :rtype: str or None """ return self.arguments[0]
[docs] def copy(self): """Make a copy of the object with contents and return the copy. :returns: copy of this :obj:`MappedParams` :rtype: :obj:`MappedParams` """ _copy = self.__class__(self.get_value()) if self.has_language(): _copy.set_language(self.get_language()) _copy.keyword_arguments = self.keyword_arguments.copy() return _copy
[docs] def has_arguments(self): """Return True if :obj:`MappedParams` has arguments or keyword_arguments. :returns: True if object has arguments or keyword_arguments. :rtype: bool """ if self.get_value() is None and all(val is None for val in self.keyword_arguments.values()): return False return True
[docs]class XMLMapper: """XMLMapper populates :obj:`MappedParams` instances from XML. :param xpath: XPath where to look for element containing value. :type xpath: str :param from_attribute: Look value from attribute of the element. :type from_attribute: str or None :param required: raises :exc:`MissingRequiredAttribute` if value is not found. :type required: bool :param localizable: True if the value is localizable, False if not. :type localizable: bool """ ATT_RELATION_PARENT = 'parent' ATT_RELATION_CHILD = 'child' ATT_RELATION_NONE = None ATT_POSITION_STR = '[{position}]' def __init__(self, xpath, from_attribute=None, required=False, localizable=True): self.xpath = xpath self.from_attribute = from_attribute self.required = required self.localizable = localizable self.attributes = [] self._value_conversion = None self._value_getter = None self._param_getter = None
[docs] def set_value_conversion(self, conv_func): """Set conversion callable. :note: `conv_func` must accept a string or None as a parameter and return the converted value. :param conv_func: Callable used for conversion. :type conv_func: callable. :returns: self """ self._value_conversion = conv_func return self
[docs] def set_value_getter(self, getter_func): """Set value getter callable. :note: `getter_func` must accept an XML element :obj:`xml.etree.ElementTree.Element` as a parameter and return the value. :param getter_func: Callable used for getting a value from XML element. :type getter_func: callable. :returns: self """ self._value_getter = getter_func return self
def _set_param_getter(self, getter): self._param_getter = getter return self
[docs] def expect_single_value(self): """This mapper will be expected to return a single value. :returns: self """ self._set_param_getter(self.value_params) return self
[docs] def expect_multiple_values(self): """This mapper will be expected to return multiple values. :returns: self """ self._set_param_getter(self.values_params) return self
[docs] def disable_attributes(self): """This mapper will not contain any attributes. :returns: self """ self.attributes = None return self
[docs] def iterate_attributes(self, *relations): r"""Iterate attributes to map. :param \*relations: optional parameters to iterate only attributes with a certain relation. :type \*relations: str :returns: A generator yielding tuples of each attribute in the format: (attribute_name, attribute_mapper, attribute_provides_main_lang) :rtype: generator """ if self.attributes is None: return if not relations: relations = (self.ATT_RELATION_CHILD, self.ATT_RELATION_NONE, self.ATT_RELATION_PARENT) for att_name, mapper, relation, provides_main_lang in self.attributes: if relation in relations: yield (att_name, mapper, provides_main_lang)
def _has_parent_or_unrelated_attributes(self): """Convenience method for checking if mapper has parent or unrelated attributes. :returns: True if mapper has parent or unrelated attributes. :rtype: bool """ for _ in self.iterate_attributes(self.ATT_RELATION_NONE, self.ATT_RELATION_PARENT): return True return False
[docs] def as_params(self, element, default_language, xml_namespaces): """Use mapping to construct a :obj:`MappedParams` from XML element. Use mapper's :attr:`_value_getter` and :attr:`_value_conversion` to get value from XML element. Construct a :obj:`MappedParams` from the value. If mapping :attr:`localizable` is True add language from XML elements xml:lang attribute. :param element: XML element. :type element: :obj:`xml.etree.ElementTree.Element` :param default_language: default language if element has none. :type default_language: str :param xml_namespaces: XML namespaces for the element. :type xml_namespaces: dict :returns: mapped parameters ready to pass to records add-method. :rtype: :obj:`MappedParams` """ if self._value_getter: value = self._value_getter(element) elif self.from_attribute: value = element.attrib.get(self.from_attribute) else: value = "".join(element.itertext()) if value is not None and self._value_conversion is None: value = value.strip() if value == "": value = None elif self._value_conversion: value = self._value_conversion(value) params = MappedParams(value) if self.localizable: lang = element.attrib.get('{%s}lang' % (xml_namespaces['xml'],), default_language) params.set_language(lang) return params
[docs] def add_attribute(self, att_name, mapper, relative=True, provides_main_lang=False): """Add attribute to mapper. Counts the correct xpath if attribute's mapper's xpath is a parent element (starting with '/..'). Includes all needed information of the attribute to a list of tuples contained in :attr:`attributes`. :param att_name: attribute name :type att_name: str :param mapper: mapper instance for mapping value for the attribute. :type mapper: :obj:`XMLMapper` :param relative: Is the attribute map's xpath relative to this element. Defaults to True. :type relative: bool :param provides_main_lang: Should the language of the attribute be used as a language when mapping this value. Defaults to False. :type provides_main_lang: bool :raises: :exc:`InvalidMapperParameters` for conflicting parameters such as: 1. Calling this method on a mapper which has disabled use of attributes. 2. Using :attr:`provides_main_lang` for a non-localizable mapper. 3. Setting :attr:`relative` to False on a mapper whose :attr:`xpath` refers to parent element. :returns: self """ if self.attributes is None: raise InvalidMapperParameters("{} does not support attributes".format(self.xpath)) if not self.localizable and provides_main_lang: raise InvalidMapperParameters("{} is not localizable and cannot use lang from attribute {}". format(self.xpath, att_name)) if mapper.xpath.startswith('/..'): if not relative: raise InvalidMapperParameters("Attribute from {} must be relative to {} since it refers to parent.". format(mapper.xpath, self.xpath)) # Parent element. Count relative xpath. relation = self.ATT_RELATION_PARENT rel_xpath_sections = self.xpath.split('/') if rel_xpath_sections[0] == '.': rel_xpath_sections.pop(0) att_xpath = mapper.xpath items = filter(None, att_xpath.split('/')) last_item = None for item in items: att_xpath = att_xpath.replace('/{}'.format(item), '', 1) if item == '..': rel_xpath_sections.pop() else: last_item = item break parent_xpath = '/'.join(rel_xpath_sections) if last_item: mapper.xpath = "./{parent_xpath}/{last_item}{position}{att_xpath}".\ format(parent_xpath=parent_xpath, last_item=last_item, position=self.ATT_POSITION_STR, att_xpath=att_xpath) else: mapper.xpath = "./{parent_xpath}{position}{att_xpath}".\ format(parent_xpath=parent_xpath, position=self.ATT_POSITION_STR, att_xpath=att_xpath) elif relative: relation = self.ATT_RELATION_CHILD else: relation = self.ATT_RELATION_NONE self.attributes.append((att_name, mapper, relation, provides_main_lang)) return self
[docs] def value_params(self, source_xml_element, default_language, xml_namespaces, position=None): """Generate sinle :obj:`MappedParams` object from source XML. :param source_xml_element: XML element. :type source_xml_element: :obj:`xml.etree.ElementTree.Element` :param default_language: Default language. :type default_language: str :param xml_namespaces: XML namespaces. :type xml_namespaces: dict :param position: Optional position for parent xpaths. :type position: int or None :returns: generator yielding :obj:`MappedParams`. :rtype: generator :raises: :exc:`MissingRequiredAttribute` if mapper's :attr:`required` is True, but xpath provides no element or the element provides no value. """ xpath = self.xpath.format(position=position) if position else self.xpath element = source_xml_element.find(xpath, xml_namespaces) if element is None: if self.required: raise MissingRequiredAttribute(self.xpath) return try: main_params = self.as_params(element, default_language, xml_namespaces) except Exception as exc: attr_msg = ' attribute: %s' % (self.from_attribute,) if self.from_attribute is not None else '' raise MappingError('Unexpected exception when parsing content from element: %s xpath: %s %s' % (element, self.xpath, attr_msg)) from exc if self.required and main_params.get_value() is None: raise MissingRequiredAttribute(self.xpath) yield main_params
[docs] def values_params(self, source_xml_element, default_language, xml_namespaces): """Generate multiple :obj:`MappedParams` objects from source XML. The generated MappedParams will contain attributes as keyword_arguments. :param source_xml_element: XML element. :type source_xml_element: :obj:`xml.etree.ElementTree.Element` :param default_language: Default language. :type default_language: str :param xml_namespaces: XML namespaces. :type xml_namespaces: dict :returns: generator yielding :obj:`MappedParams`. :rtype: generator :raises: :exc:`MissingRequiredAttribute` if mapper's :attr:`required` is True, but xpath provides no element or the element provides no value. """ elements = source_xml_element.findall(self.xpath, xml_namespaces) if self.required and not elements and self._has_parent_or_unrelated_attributes() is False: raise MissingRequiredAttribute(self.xpath) if not elements and self._has_parent_or_unrelated_attributes(): main_params = MappedParams(None) main_params.set_language(default_language) additional = {} self._values_from_parent(source_xml_element, default_language, xml_namespaces, main_params=main_params, additional=additional) self._values_from_unrelated(source_xml_element, default_language, xml_namespaces, main_params=main_params, additional=additional) add_list = self._additional_params_as_list(main_params, additional) if self.required and main_params.has_arguments() is False: raise MissingRequiredAttribute(self.xpath) yield main_params for add_params in add_list: yield add_params for elem_position, element in enumerate(elements, start=1): main_params = self.as_params(element, default_language, xml_namespaces) additional = {} self._values_from_parent(source_xml_element, default_language, xml_namespaces, main_params=main_params, additional=additional, position=elem_position) self._values_from_unrelated(source_xml_element, default_language, xml_namespaces, main_params=main_params, additional=additional) self._values_from_children(element, default_language, xml_namespaces, main_params=main_params, additional=additional) add_list = self._additional_params_as_list(main_params, additional) if self.required and main_params.has_arguments() is False: raise MissingRequiredAttribute(self.xpath) yield main_params for add_params in add_list: yield add_params
@staticmethod def _additional_params_as_list(main_params, additional): if not additional: return [] max_ = max([len(x) for x in additional.values()]) add_list = [] latest = None for parameter, values in additional.items(): for index in range(max_): try: mapped_params, provides_main_lang = values[index] latest = values[index] except IndexError: if latest is None: raise mapped_params, provides_main_lang = latest try: add_list[index].keyword_arguments.update({parameter: mapped_params.get_value()}) except IndexError: add_list.append(main_params.copy()) else: if provides_main_lang: add_list[index].set_language(mapped_params.get_language()) continue add_list[index].keyword_arguments.update({parameter: mapped_params.get_value()}) if provides_main_lang: add_list[index].set_language(mapped_params.get_language()) return add_list @staticmethod def _process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang): for att_index, att_param in enumerate(param_iterator): if att_index > 0: if parameter not in additional: additional.update({parameter: []}) additional[parameter].append((att_param, provides_main_lang)) continue main_params.keyword_arguments[parameter] = att_param.get_value() if provides_main_lang: main_params.set_language(att_param.get_language()) def _values_from_parent(self, *args, main_params, additional, position=None): for parameter, attr_mapper, provides_main_lang in self.iterate_attributes(self.ATT_RELATION_PARENT): if position is None: # attribute from parent and no position for child. # manipulate xpath and _param_getter for one time run. attr_xpath = attr_mapper.xpath attr_param_getter = attr_mapper._param_getter attr_mapper.xpath = attr_mapper.xpath.replace(self.ATT_POSITION_STR, '') attr_mapper._set_param_getter(attr_mapper.values_params) param_iterator = attr_mapper(*args) self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang) attr_mapper.xpath = attr_xpath attr_mapper._set_param_getter(attr_param_getter) continue param_iterator = attr_mapper(*args, position=position) self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang) def _values_from_unrelated(self, *args, main_params, additional): for parameter, attr_mapper, provides_main_lang in self.iterate_attributes(self.ATT_RELATION_NONE): param_iterator = attr_mapper(*args) self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang) def _values_from_children(self, *args, main_params, additional): for parameter, attr_mapper, provides_main_lang in self.iterate_attributes(self.ATT_RELATION_CHILD): main_params.keyword_arguments.update({parameter: None}) param_iterator = attr_mapper(*args) self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang) def __call__(self, *args, **kwargs): for params in self._param_getter(*args, **kwargs): if params.has_arguments(): yield params
[docs]class XMLParserBase: """Base class where parsers get derived from. Declares the public API to be used in callers. **Input:** * from_file() * from_string() **Output:** * studies * variables * questions * study_groups * all * select(collection=None) Provides common functionality to be used within subclasses which map XML-data to Document Store records. Subclasses must implement necessary generators that generate document store records. Use in subclass:: class XMLRecordParser(XMLParserBase): @property def studies(self): maps = [(Study.add_study_number, self._map_single(xpath_to_study_number, required=True)), (Study.add_study_titles, self._map_multi(xpath_to_study_title))] for study_element in self.root_element.findall(xpath_to_study_element, self.NS): study = Study() self._map_to_record(study, study_element, maps) yield study :param root_element: XML root. :type root_element: :obj:`xml.etree.ElementTree.Element` """ #: XML namespaces NS = {'xsi': 'http://www.w3.org/2001/XMLSchema-instance', 'xml': 'http://www.w3.org/XML/1998/namespace'} #: Default language. default_language = 'en' #: Classes used in parsing. Override to customize. _study_cls = Study _variable_cls = Variable _question_cls = Question _studygroup_cls = StudyGroup def __init__(self, root_element): self._root_element = root_element self._identifiers = {'study_number': (None, None)} @classmethod def _xpath(cls, xpath): """Override to manipulate xpath for element lookups""" return xpath @classmethod def _get_xmllang(cls, elem, default=None): return elem.attrib.get('{%s}lang' % (cls.NS['xml'],), default) # Implemented API @classmethod def _map_single(cls, xpath, from_attribute=None, required=False, localizable=False): """Returns an :obj:`XMLMapper` for a single value. :param xpath: xpath to find the element used for mapping. :type xpath: str :param from_attribute: Optional attribute name, if value should come from elements attribute. :type from_attribute: str or None :param required: True if the element is required. Default to False :type required: bool :param localizable: True if the element is required. Default to False :type localizable: bool :returns: Initiated :obj:`XMLMapper` :rtype: :obj:`XMLMapper` """ mapper = XMLMapper(cls._xpath(xpath), from_attribute=from_attribute, required=required, localizable=localizable) mapper.disable_attributes().expect_single_value() return mapper @classmethod def _map_multi(cls, xpath, from_attribute=None, localizable=True): """Returns an :obj:`XMLMapper` for localizable multi value. :param xpath: xpath to find the element used for mapping. :type xpath: str :param from_attribute: Optional attribute name, if value should come from elements attribute. :type from_attribute: str or None :param localizable: True if the element is localizable. Default to True. :type localizable: bool :returns: Initiated :obj:`XMLMapper` :rtype: :obj:`XMLMapper` """ mapper = XMLMapper(cls._xpath(xpath), from_attribute=from_attribute, required=False, localizable=localizable) mapper.expect_multiple_values() return mapper def _map_to_record(self, record_instance, mapping_root, mapping, default_language=None): """Map elements to `record_instance` using `mapping`. :param record_instance: Document Store record instance. :type record_instance: :obj:`kuha_common.document_store.records.Study` or :obj:`kuha_common.document_store.records.Variable` or :obj:`kuha_common.document_store.records.Question` or :obj:`kuha_common.document_store.records.StudyGroup` :param mapping_root: XML element considered root of the `mapping`. :type mapping_root: :obj:`xml.etree.ElementTree.Element` :param mapping: Mapping from mapping_root to Document Store record. The list must contain tuples with two items. First item of the tuple must be an instance method of a Document Store record. The second item must be a :obj:`XMLMapper` instance. By calling this mapping instance with the `mapping_root` it must return values that can be used as parameters to the Document Store record instance function. :type mapping: list :param default_language: Optional default language. Defaults to :attr:`root_language`. :type default_language: str or None """ default_language = default_language or self.root_language for add_func, element_map in mapping: for parameters in element_map(mapping_root, default_language, self.NS): add_func(record_instance, *parameters.arguments, **parameters.keyword_arguments) def _find(self, xpath, element=None): element = element or self.root_element return element.find(self._xpath(xpath), self.NS) def _findall(self, xpath, element=None): element = element or self.root_element return element.findall(self._xpath(xpath), self.NS)
[docs] @classmethod def from_string(cls, xml_body): """Get parser that iteratively parses XML and generates populated Document Store record instances. :param xml_body: XML Document as a string. This may come directly from HTTP request body. :type xml_body: str :returns: parser for iteratively parsing XML and generating Document Store records. :rtype: :obj:`XMLParserBase` """ try: root = ET.fromstring(xml_body) except ET.ParseError as exc: raise ParseError(exc) return cls(root)
[docs] @classmethod def from_file(cls, filepath): """Get parser that iteratively parses XML and generates populated Document Store record instances. :param filepath: Path for the XML file. :type filepath: str :returns: parser for iteratively parsing XML and generating Document Store records. :rtype: :obj:`XMLParserBase` """ try: tree = ET.parse(filepath) except ET.ParseError as exc: raise ParseError("Parsing XML from filepath '%s' results in ParseError: '%s'" % (filepath, exc)) from exc root = tree.getroot() return cls(root)
[docs] @classmethod def child_text(cls, xpath): """Returns a function which will lookup a child element from given xpath. The returned function takes a single element as a parameter which should be an :obj:`xml.etree.ElementTree.Element` or similar. When executed the function returns the child element's text contents or None if child element cannot be found. :param xpath: xpath to child. relative to parent. :returns: function which accepts the parent element as a parameter. :rtype: function """ def _get_child_text(element): child = element.find(xpath, cls.NS) if child is None: return None return "".join(child.itertext()) return _get_child_text
@staticmethod def _get_attr_and_set_param(params, kw_key, elem, attr_key): """Helper method to lookup element attribute and set it as keyword argument to params :param params: Target parameter :type param: :obj:`kuha_common.document_store.mappings.xmlbase.MappedParams` :param kw_key: Target parameter keyword argument key :type kw_key: str :param elem: XML element to look for attribute :type elem: :obj:`xml.etree.ElementTree.Element` or None :param attr_key: XML element attribute key :type attr_key: str :returns: None """ if elem is None: return attr = elem.get(attr_key) if attr is None: return params.keyword_arguments[kw_key] = attr @classmethod def _findall_from_elements(cls, elements, xpath): """Calls findall for each element and yields result. Helper to reduce nested loops when looping elements. :param elements: iterable containing XML elements. :param xpath: xpath used to loop single element. :returns: Generator to iterate elements with xpath """ for element in elements: for item in element.findall(xpath, cls.NS): yield item @property def root_element(self): """Get root element. :returns: Root element :rtype: :obj:`xml.etree.ElementTree.Element` """ return self._root_element @property def root_language(self): """Get language of the root element. If root does not have a language, returns :attr:`self.default_language`. :returns: root element language. :rtype: str """ return self.root_element.attrib.get('{%s}lang' % self.NS.get('xml'), self.default_language) @property def study_number(self): """Get study number as formatted in source XML. :seealso: :attr:`self.study_number_identifier` :returns: Study number from source XML. :rtype: str """ return self._identifiers.get('study_number')[0] @study_number.setter def study_number(self, study_number): """Set study number :note: study number must be a string althought the naming suggests an integer/float would do. :param study_number: Study number. :type study_number: str """ valid = as_valid_identifier(study_number) if not valid: raise InvalidContent("Invalid study number: '%s'" % study_number) self._identifiers['study_number'] = (study_number, valid) @property def study_number_identifier(self): """Get study number converted as a valid Document Store identifier. :returns: Study number as valid Document Store identifier. :rtype: str """ return self._identifiers.get('study_number')[1] # // Implemented API # Iterators which generate Document Store records # must be implemented in subclass. @property def studies(self): """Studies generator. Must be implemented in subclass. :returns: Generator which yields Document Store studies. """ raise NotImplementedError("Implement in subclass") @property def variables(self): """Variables generator. Must be implemented in subclass. :returns: Generator which yields Document Store variables. """ raise NotImplementedError("Implement in subclass") @property def questions(self): """Questions generator. Must be implemented in subclass. :returns: Generator which yields Document Store questions. """ raise NotImplementedError("Implement in subclass") @property def study_groups(self): """Study groups generator. Must be implemented in subclass. :returns: Generator which yields Document Store study groups. """ raise NotImplementedError("Implement in subclass") @property def all(self): """Iterate all records found from source XML. :returns: Generator which yields Document Store records. :rtype: Generator """ for study in self.studies: yield study for study_group in self.study_groups: yield study_group for variable in self.variables: yield variable for question in self.questions: yield question
[docs] def select(self, collection=None): """Returns a selective parser. Call with a Document Store collection as parameter to select records only for certain collection. .. Note:: The returned attributes are defined in subclasses, so they may or may not be generators. :param collection: Document Store collection to select only records belonging to this collection. :type collection: str or None :returns: Generator which yields Document Store records. :rtype: Generator """ if not collection: return self.all if collection not in COLLECTIONS: raise ValueError("%s is not a valid Document Store collection" % (collection,)) return getattr(self, collection)
[docs]def as_valid_identifier(candidate): """Convert candidate to a string that conforms the rules of validation. Indentifier must match regex: [a-zA-Z0-9]+[a-zA-Z0-9?_()-.]*'\"] .. note:: Regex is defined in Document Store. Should it be moved to kuha_common? :returns: identifier which conforms the rules of validation. :rtype: str """ def replace_special_characters(match_object): string = match_object.group() string = re.sub(r'[äÄ]', 'ae', string, flags=re.UNICODE) string = re.sub(r'[öÖ]', 'oe', string, flags=re.UNICODE) string = re.sub(r'[\s]', '_', string, flags=re.UNICODE) string = re.sub(r'[^a-zA-Z0-9_.-]', '', string, flags=re.UNICODE) return string candidate = re.sub(r'[^a-zA-Z0-9]+[a-zA-Z0-9?_()-.]*', replace_special_characters, candidate) candidate = re.sub(r'^[^a-zA-Z0-9]+', '', candidate, count=1) return candidate
[docs]def str_equals(correct, default=None): """Conversion function wrapper to compare strings for equality. Wrapper function that formats comparison value and default value for returned comparison function. Check if string found from element value or element attribute equals to `correct`. :param correct: comparison string. :type correct: str :param default: If the value parameter of the comparison function is None, return this value. :type default: str :returns: function which accepts a single parameter for comparison. Returns True or False, or `default` if the parameter is None. :rtype: function """ def _equals(value): if value is None: return default return str(value) == str(correct) return _equals
[docs]def fixed_value(fixed): """Fixed value. :param fixed: Use this value :returns: function which accepts a single argument value. The function always returns fixed. :rtype: function """ def _fixed_value(value): return fixed return _fixed_value
[docs]def element_remove_whitespaces(element): """Conversion function to remove extra whitespace from end of element text. Iterates element's inner text using :meth:`xml.etree.ElementTree.Element.itertext` which iterates over this element and all subelements. Removes extra whitespaces so paragraphs of text will only have one separating whitespace character. :param element: Element from which to get text. :type element: :obj:`xml.etree.ElementTree.Element` :returns: Element's inner text without extra whitespace. :rtype: str """ value = "" ends_with_space = False for text in element.itertext(): text = " ".join([_p.strip() for _p in text.split("\n")]) if not ends_with_space and value != "": value = value + " " + text.lstrip() else: value += text.lstrip() ends_with_space = text.endswith(" ") if ends_with_space: value = value.rstrip() return value
[docs]def element_strip_descendant_text(element, tags_to_remove=None): """Conversion function to remove inner elements and their contents. :param element: Element for lookup. :type element: :obj:`xml.etree.ElementTree.Element` :param tags_to_remove: Optional list of tags to remove from results text content. If given, will only remove elements that match tags that are found from this list. If not given, will remove all child elements. Tags that are given in list but not found from ``element`` children will be silently ignored. :type tags_to_remove: list :returns: Element's inner text without text from descendants and without extra whitespace. :rtype: str """ element_copy = copy.deepcopy(element) children = list(element_copy) for child in children: if tags_to_remove is None or child.tag in tags_to_remove: element_copy.remove(child) return element_remove_whitespaces(element_copy)
[docs]def element_strip_descendant_text_by_tag(tag, *tags): """Conversion function to remove inner elements by tag name. Acts as a closure function that formats the ``tags_to_remove`` parameter and proxies to :func:`element_strip_descendant_text`. :param str tag: Tag name to remove. Repeat for multiple tags. :returns: lambda that proxies to :func:`element_strip_descendant_text` """ return lambda element: element_strip_descendant_text(element, (tag,) + tags)
[docs]def get_preferred_publication_id_agency_pair(ids_agencys): """Get preferred id + agency two-tuple. Due to limitations in Kuha data model, a Study.related_publication item can only contain a single identifier + agency pair. This function will search the preferred id + agency pair by consulting the agency values. Preference by priority: 1. DOI 2. Handle 3. URN 4. ARK 5. <first ID with agency> 6. <first ID without agency> :params list ids_agencys: list of id + agency two-tuples :returns: id + agency two-tuple. :rtype: tuple """ if len(ids_agencys) == 1: return ids_agencys[0] agencys_ids = {} for _id, agency in ids_agencys: if agency == 'DOI': return _id, agency if agency not in agencys_ids: agencys_ids.update({agency: _id}) candidate = (agencys_ids.pop(None, None), None) if agencys_ids: for agency in ('Handle', 'URN', 'ARK'): if agency in agencys_ids: candidate = (agencys_ids[agency], agency) break else: agency = list(agencys_ids)[0] candidate = (agencys_ids[agency], agency) return candidate