#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2019 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Components to use for XML parsing & mapping to Document Store records.
Contains a base class to use for parsing XML to Document Store records.
Provides common functions useful in parsing XML data.
"""
import re
import copy
import xml.etree.ElementTree as ET
from kuha_common.document_store.records import (
COLLECTIONS,
Study,
Variable,
Question,
StudyGroup
)
from kuha_common.document_store.mappings.exceptions import (
MappingError,
ParseError,
InvalidMapperParameters,
MissingRequiredAttribute,
InvalidContent
)
[docs]class MappedParams:
"""Contains parameters ready to pass to record's add-methods.
:class:`XMLMapper` retrieves parameters from XML record and stores
them in an instance of this class. The record instances add-methods
get called with stored parameters by using tuple and dict unpacking.
Example::
mapped_params = MappedParams('study_identifier')
mapped_params.set_language('en')
mapped_params.keyword_arguments.update({'agency': 'archive'})
study = Study()
study.add_identifiers(*mapped_params.arguments, **mapped_params.keyword_arguments)
:param value: value used as the first argument
:type value: str or None
"""
def __init__(self, value):
self.arguments = (value,)
self.keyword_arguments = {}
[docs] def has_language(self):
"""True if MappedParams has language argument
:returns: True if has language, False if not.
:rtype: bool
"""
return len(self.arguments) == 2
[docs] def set_language(self, language):
"""Set language argument. Will overwrite if previously set.
:param language: Language to set.
:type language: str
"""
self.arguments = (self.arguments[0], language)
[docs] def get_language(self):
"""Get language argument.
:returns: Language
:rtype: str
"""
return self.arguments[1]
[docs] def get_value(self):
"""Get value argument.
:returns: value.
:rtype: str or None
"""
return self.arguments[0]
[docs] def copy(self):
"""Make a copy of the object with
contents and return the copy.
:returns: copy of this :obj:`MappedParams`
:rtype: :obj:`MappedParams`
"""
_copy = self.__class__(self.get_value())
if self.has_language():
_copy.set_language(self.get_language())
_copy.keyword_arguments = self.keyword_arguments.copy()
return _copy
[docs] def has_arguments(self):
"""Return True if :obj:`MappedParams` has
arguments or keyword_arguments.
:returns: True if object has arguments or keyword_arguments.
:rtype: bool
"""
if self.get_value() is None and all(val is None for val in self.keyword_arguments.values()):
return False
return True
[docs]class XMLMapper:
"""XMLMapper populates :obj:`MappedParams` instances from XML.
:param xpath: XPath where to look for element containing value.
:type xpath: str
:param from_attribute: Look value from attribute of the element.
:type from_attribute: str or None
:param required: raises :exc:`MissingRequiredAttribute` if value is not found.
:type required: bool
:param localizable: True if the value is localizable, False if not.
:type localizable: bool
"""
ATT_RELATION_PARENT = 'parent'
ATT_RELATION_CHILD = 'child'
ATT_RELATION_NONE = None
ATT_POSITION_STR = '[{position}]'
def __init__(self, xpath, from_attribute=None, required=False, localizable=True):
self.xpath = xpath
self.from_attribute = from_attribute
self.required = required
self.localizable = localizable
self.attributes = []
self._value_conversion = None
self._value_getter = None
self._param_getter = None
[docs] def set_value_conversion(self, conv_func):
"""Set conversion callable.
:note: `conv_func` must accept a string or None as a
parameter and return the converted value.
:param conv_func: Callable used for conversion.
:type conv_func: callable.
:returns: self
"""
self._value_conversion = conv_func
return self
[docs] def set_value_getter(self, getter_func):
"""Set value getter callable.
:note: `getter_func` must accept an XML element
:obj:`xml.etree.ElementTree.Element` as a
parameter and return the value.
:param getter_func: Callable used for getting a value from XML element.
:type getter_func: callable.
:returns: self
"""
self._value_getter = getter_func
return self
def _set_param_getter(self, getter):
self._param_getter = getter
return self
[docs] def expect_single_value(self):
"""This mapper will be expected to return a single value.
:returns: self
"""
self._set_param_getter(self.value_params)
return self
[docs] def expect_multiple_values(self):
"""This mapper will be expected to return multiple values.
:returns: self
"""
self._set_param_getter(self.values_params)
return self
[docs] def disable_attributes(self):
"""This mapper will not contain any attributes.
:returns: self
"""
self.attributes = None
return self
[docs] def iterate_attributes(self, *relations):
r"""Iterate attributes to map.
:param \*relations: optional parameters to iterate only attributes
with a certain relation.
:type \*relations: str
:returns: A generator yielding tuples of each attribute in the format:
(attribute_name, attribute_mapper, attribute_provides_main_lang)
:rtype: generator
"""
if self.attributes is None:
return
if not relations:
relations = (self.ATT_RELATION_CHILD, self.ATT_RELATION_NONE, self.ATT_RELATION_PARENT)
for att_name, mapper, relation, provides_main_lang in self.attributes:
if relation in relations:
yield (att_name, mapper, provides_main_lang)
def _has_parent_or_unrelated_attributes(self):
"""Convenience method for checking if mapper
has parent or unrelated attributes.
:returns: True if mapper has parent or unrelated attributes.
:rtype: bool
"""
for _ in self.iterate_attributes(self.ATT_RELATION_NONE, self.ATT_RELATION_PARENT):
return True
return False
[docs] def as_params(self, element, default_language, xml_namespaces):
"""Use mapping to construct a :obj:`MappedParams` from XML element.
Use mapper's :attr:`_value_getter` and :attr:`_value_conversion` to get
value from XML element. Construct a :obj:`MappedParams` from the value.
If mapping :attr:`localizable` is True add language from XML elements
xml:lang attribute.
:param element: XML element.
:type element: :obj:`xml.etree.ElementTree.Element`
:param default_language: default language if element has none.
:type default_language: str
:param xml_namespaces: XML namespaces for the element.
:type xml_namespaces: dict
:returns: mapped parameters ready to pass to records add-method.
:rtype: :obj:`MappedParams`
"""
if self._value_getter:
value = self._value_getter(element)
elif self.from_attribute:
value = element.attrib.get(self.from_attribute)
else:
value = "".join(element.itertext())
if value is not None and self._value_conversion is None:
value = value.strip()
if value == "":
value = None
elif self._value_conversion:
value = self._value_conversion(value)
params = MappedParams(value)
if self.localizable:
lang = element.attrib.get('{%s}lang' % (xml_namespaces['xml'],), default_language)
params.set_language(lang)
return params
[docs] def add_attribute(self, att_name, mapper, relative=True, provides_main_lang=False):
"""Add attribute to mapper.
Counts the correct xpath if attribute's mapper's xpath is a parent element (starting with
'/..'). Includes all needed information of the attribute to a list of tuples contained in
:attr:`attributes`.
:param att_name: attribute name
:type att_name: str
:param mapper: mapper instance for mapping value for the attribute.
:type mapper: :obj:`XMLMapper`
:param relative: Is the attribute map's xpath relative to this element.
Defaults to True.
:type relative: bool
:param provides_main_lang: Should the language of the attribute be used
as a language when mapping this value. Defaults to False.
:type provides_main_lang: bool
:raises: :exc:`InvalidMapperParameters` for conflicting parameters such as:
1. Calling this method on a mapper which has disabled use of attributes.
2. Using :attr:`provides_main_lang` for a non-localizable mapper.
3. Setting :attr:`relative` to False on a mapper whose :attr:`xpath` refers
to parent element.
:returns: self
"""
if self.attributes is None:
raise InvalidMapperParameters("{} does not support attributes".format(self.xpath))
if not self.localizable and provides_main_lang:
raise InvalidMapperParameters("{} is not localizable and cannot use lang from attribute {}".
format(self.xpath, att_name))
if mapper.xpath.startswith('/..'):
if not relative:
raise InvalidMapperParameters("Attribute from {} must be relative to {} since it refers to parent.".
format(mapper.xpath, self.xpath))
# Parent element. Count relative xpath.
relation = self.ATT_RELATION_PARENT
rel_xpath_sections = self.xpath.split('/')
if rel_xpath_sections[0] == '.':
rel_xpath_sections.pop(0)
att_xpath = mapper.xpath
items = filter(None, att_xpath.split('/'))
last_item = None
for item in items:
att_xpath = att_xpath.replace('/{}'.format(item), '', 1)
if item == '..':
rel_xpath_sections.pop()
else:
last_item = item
break
parent_xpath = '/'.join(rel_xpath_sections)
if last_item:
mapper.xpath = "./{parent_xpath}/{last_item}{position}{att_xpath}".\
format(parent_xpath=parent_xpath,
last_item=last_item,
position=self.ATT_POSITION_STR,
att_xpath=att_xpath)
else:
mapper.xpath = "./{parent_xpath}{position}{att_xpath}".\
format(parent_xpath=parent_xpath,
position=self.ATT_POSITION_STR,
att_xpath=att_xpath)
elif relative:
relation = self.ATT_RELATION_CHILD
else:
relation = self.ATT_RELATION_NONE
self.attributes.append((att_name, mapper, relation, provides_main_lang))
return self
[docs] def value_params(self, source_xml_element, default_language, xml_namespaces, position=None):
"""Generate sinle :obj:`MappedParams` object from source XML.
:param source_xml_element: XML element.
:type source_xml_element: :obj:`xml.etree.ElementTree.Element`
:param default_language: Default language.
:type default_language: str
:param xml_namespaces: XML namespaces.
:type xml_namespaces: dict
:param position: Optional position for parent xpaths.
:type position: int or None
:returns: generator yielding :obj:`MappedParams`.
:rtype: generator
:raises: :exc:`MissingRequiredAttribute` if mapper's :attr:`required` is True, but
xpath provides no element or the element provides no value.
"""
xpath = self.xpath.format(position=position) if position else self.xpath
element = source_xml_element.find(xpath, xml_namespaces)
if element is None:
if self.required:
raise MissingRequiredAttribute(self.xpath)
return
try:
main_params = self.as_params(element, default_language, xml_namespaces)
except Exception as exc:
attr_msg = ' attribute: %s' % (self.from_attribute,) if self.from_attribute is not None else ''
raise MappingError('Unexpected exception when parsing content from element: %s xpath: %s %s'
% (element, self.xpath, attr_msg)) from exc
if self.required and main_params.get_value() is None:
raise MissingRequiredAttribute(self.xpath)
yield main_params
[docs] def values_params(self, source_xml_element, default_language, xml_namespaces):
"""Generate multiple :obj:`MappedParams` objects from source XML.
The generated MappedParams will contain attributes as keyword_arguments.
:param source_xml_element: XML element.
:type source_xml_element: :obj:`xml.etree.ElementTree.Element`
:param default_language: Default language.
:type default_language: str
:param xml_namespaces: XML namespaces.
:type xml_namespaces: dict
:returns: generator yielding :obj:`MappedParams`.
:rtype: generator
:raises: :exc:`MissingRequiredAttribute` if mapper's :attr:`required` is True, but
xpath provides no element or the element provides no value.
"""
elements = source_xml_element.findall(self.xpath, xml_namespaces)
if self.required and not elements and self._has_parent_or_unrelated_attributes() is False:
raise MissingRequiredAttribute(self.xpath)
if not elements and self._has_parent_or_unrelated_attributes():
main_params = MappedParams(None)
main_params.set_language(default_language)
additional = {}
self._values_from_parent(source_xml_element, default_language, xml_namespaces,
main_params=main_params, additional=additional)
self._values_from_unrelated(source_xml_element, default_language, xml_namespaces,
main_params=main_params, additional=additional)
add_list = self._additional_params_as_list(main_params, additional)
if self.required and main_params.has_arguments() is False:
raise MissingRequiredAttribute(self.xpath)
yield main_params
for add_params in add_list:
yield add_params
for elem_position, element in enumerate(elements, start=1):
main_params = self.as_params(element, default_language, xml_namespaces)
additional = {}
self._values_from_parent(source_xml_element, default_language, xml_namespaces,
main_params=main_params, additional=additional, position=elem_position)
self._values_from_unrelated(source_xml_element, default_language, xml_namespaces,
main_params=main_params, additional=additional)
self._values_from_children(element, default_language, xml_namespaces,
main_params=main_params, additional=additional)
add_list = self._additional_params_as_list(main_params, additional)
if self.required and main_params.has_arguments() is False:
raise MissingRequiredAttribute(self.xpath)
yield main_params
for add_params in add_list:
yield add_params
@staticmethod
def _additional_params_as_list(main_params, additional):
if not additional:
return []
max_ = max([len(x) for x in additional.values()])
add_list = []
latest = None
for parameter, values in additional.items():
for index in range(max_):
try:
mapped_params, provides_main_lang = values[index]
latest = values[index]
except IndexError:
if latest is None:
raise
mapped_params, provides_main_lang = latest
try:
add_list[index].keyword_arguments.update({parameter: mapped_params.get_value()})
except IndexError:
add_list.append(main_params.copy())
else:
if provides_main_lang:
add_list[index].set_language(mapped_params.get_language())
continue
add_list[index].keyword_arguments.update({parameter: mapped_params.get_value()})
if provides_main_lang:
add_list[index].set_language(mapped_params.get_language())
return add_list
@staticmethod
def _process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang):
for att_index, att_param in enumerate(param_iterator):
if att_index > 0:
if parameter not in additional:
additional.update({parameter: []})
additional[parameter].append((att_param, provides_main_lang))
continue
main_params.keyword_arguments[parameter] = att_param.get_value()
if provides_main_lang:
main_params.set_language(att_param.get_language())
def _values_from_parent(self, *args, main_params, additional, position=None):
for parameter, attr_mapper, provides_main_lang in self.iterate_attributes(self.ATT_RELATION_PARENT):
if position is None:
# attribute from parent and no position for child.
# manipulate xpath and _param_getter for one time run.
attr_xpath = attr_mapper.xpath
attr_param_getter = attr_mapper._param_getter
attr_mapper.xpath = attr_mapper.xpath.replace(self.ATT_POSITION_STR, '')
attr_mapper._set_param_getter(attr_mapper.values_params)
param_iterator = attr_mapper(*args)
self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang)
attr_mapper.xpath = attr_xpath
attr_mapper._set_param_getter(attr_param_getter)
continue
param_iterator = attr_mapper(*args, position=position)
self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang)
def _values_from_unrelated(self, *args, main_params, additional):
for parameter, attr_mapper, provides_main_lang in self.iterate_attributes(self.ATT_RELATION_NONE):
param_iterator = attr_mapper(*args)
self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang)
def _values_from_children(self, *args, main_params, additional):
for parameter, attr_mapper, provides_main_lang in self.iterate_attributes(self.ATT_RELATION_CHILD):
main_params.keyword_arguments.update({parameter: None})
param_iterator = attr_mapper(*args)
self._process_param_iterator(param_iterator, parameter, main_params, additional, provides_main_lang)
def __call__(self, *args, **kwargs):
for params in self._param_getter(*args, **kwargs):
if params.has_arguments():
yield params
[docs]class XMLParserBase:
"""Base class where parsers get derived from.
Declares the public API to be used in callers.
**Input:**
* from_file()
* from_string()
**Output:**
* studies
* variables
* questions
* study_groups
* all
* select(collection=None)
Provides common functionality to be used within subclasses which
map XML-data to Document Store records. Subclasses must implement
necessary generators that generate document store records.
Use in subclass::
class XMLRecordParser(XMLParserBase):
@property
def studies(self):
maps = [(Study.add_study_number, self._map_single(xpath_to_study_number, required=True)),
(Study.add_study_titles, self._map_multi(xpath_to_study_title))]
for study_element in self.root_element.findall(xpath_to_study_element, self.NS):
study = Study()
self._map_to_record(study, study_element, maps)
yield study
:param root_element: XML root.
:type root_element: :obj:`xml.etree.ElementTree.Element`
"""
#: XML namespaces
NS = {'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'xml': 'http://www.w3.org/XML/1998/namespace'}
#: Default language.
default_language = 'en'
#: Classes used in parsing. Override to customize.
_study_cls = Study
_variable_cls = Variable
_question_cls = Question
_studygroup_cls = StudyGroup
def __init__(self, root_element):
self._root_element = root_element
self._identifiers = {'study_number': (None, None)}
@classmethod
def _xpath(cls, xpath):
"""Override to manipulate xpath for element lookups"""
return xpath
@classmethod
def _get_xmllang(cls, elem, default=None):
return elem.attrib.get('{%s}lang' % (cls.NS['xml'],), default)
# Implemented API
@classmethod
def _map_single(cls, xpath, from_attribute=None, required=False, localizable=False):
"""Returns an :obj:`XMLMapper` for a single value.
:param xpath: xpath to find the element used for mapping.
:type xpath: str
:param from_attribute: Optional attribute name, if value should come from elements attribute.
:type from_attribute: str or None
:param required: True if the element is required. Default to False
:type required: bool
:param localizable: True if the element is required. Default to False
:type localizable: bool
:returns: Initiated :obj:`XMLMapper`
:rtype: :obj:`XMLMapper`
"""
mapper = XMLMapper(cls._xpath(xpath), from_attribute=from_attribute,
required=required, localizable=localizable)
mapper.disable_attributes().expect_single_value()
return mapper
@classmethod
def _map_multi(cls, xpath, from_attribute=None, localizable=True):
"""Returns an :obj:`XMLMapper` for localizable multi value.
:param xpath: xpath to find the element used for mapping.
:type xpath: str
:param from_attribute: Optional attribute name, if value should come from elements attribute.
:type from_attribute: str or None
:param localizable: True if the element is localizable. Default to True.
:type localizable: bool
:returns: Initiated :obj:`XMLMapper`
:rtype: :obj:`XMLMapper`
"""
mapper = XMLMapper(cls._xpath(xpath), from_attribute=from_attribute, required=False,
localizable=localizable)
mapper.expect_multiple_values()
return mapper
def _map_to_record(self, record_instance, mapping_root, mapping, default_language=None):
"""Map elements to `record_instance` using `mapping`.
:param record_instance: Document Store record instance.
:type record_instance: :obj:`kuha_common.document_store.records.Study` or
:obj:`kuha_common.document_store.records.Variable` or
:obj:`kuha_common.document_store.records.Question` or
:obj:`kuha_common.document_store.records.StudyGroup`
:param mapping_root: XML element considered root of the `mapping`.
:type mapping_root: :obj:`xml.etree.ElementTree.Element`
:param mapping: Mapping from mapping_root to Document Store record.
The list must contain tuples with two items.
First item of the tuple must be an instance method
of a Document Store record. The second item must be
a :obj:`XMLMapper` instance.
By calling this mapping instance with the `mapping_root`
it must return values that can be used as parameters to
the Document Store record instance function.
:type mapping: list
:param default_language: Optional default language. Defaults to :attr:`root_language`.
:type default_language: str or None
"""
default_language = default_language or self.root_language
for add_func, element_map in mapping:
for parameters in element_map(mapping_root, default_language, self.NS):
add_func(record_instance, *parameters.arguments, **parameters.keyword_arguments)
def _find(self, xpath, element=None):
element = element or self.root_element
return element.find(self._xpath(xpath), self.NS)
def _findall(self, xpath, element=None):
element = element or self.root_element
return element.findall(self._xpath(xpath), self.NS)
[docs] @classmethod
def from_string(cls, xml_body):
"""Get parser that iteratively parses XML and generates
populated Document Store record instances.
:param xml_body: XML Document as a string.
This may come directly from HTTP request body.
:type xml_body: str
:returns: parser for iteratively parsing XML and generating Document Store records.
:rtype: :obj:`XMLParserBase`
"""
try:
root = ET.fromstring(xml_body)
except ET.ParseError as exc:
raise ParseError(exc)
return cls(root)
[docs] @classmethod
def from_file(cls, filepath):
"""Get parser that iteratively parses XML and generates
populated Document Store record instances.
:param filepath: Path for the XML file.
:type filepath: str
:returns: parser for iteratively parsing XML and generating Document Store records.
:rtype: :obj:`XMLParserBase`
"""
try:
tree = ET.parse(filepath)
except ET.ParseError as exc:
raise ParseError("Parsing XML from filepath '%s' results in ParseError: '%s'"
% (filepath, exc)) from exc
root = tree.getroot()
return cls(root)
[docs] @classmethod
def child_text(cls, xpath):
"""Returns a function which will lookup a child element
from given xpath. The returned function takes a single element
as a parameter which should be an :obj:`xml.etree.ElementTree.Element`
or similar. When executed the function returns the child element's
text contents or None if child element cannot be found.
:param xpath: xpath to child. relative to parent.
:returns: function which accepts the parent element as a parameter.
:rtype: function
"""
def _get_child_text(element):
child = element.find(xpath, cls.NS)
if child is None:
return None
return "".join(child.itertext())
return _get_child_text
@staticmethod
def _get_attr_and_set_param(params, kw_key, elem, attr_key):
"""Helper method to lookup element attribute and set it as keyword argument to params
:param params: Target parameter
:type param: :obj:`kuha_common.document_store.mappings.xmlbase.MappedParams`
:param kw_key: Target parameter keyword argument key
:type kw_key: str
:param elem: XML element to look for attribute
:type elem: :obj:`xml.etree.ElementTree.Element` or None
:param attr_key: XML element attribute key
:type attr_key: str
:returns: None
"""
if elem is None:
return
attr = elem.get(attr_key)
if attr is None:
return
params.keyword_arguments[kw_key] = attr
@classmethod
def _findall_from_elements(cls, elements, xpath):
"""Calls findall for each element and yields result.
Helper to reduce nested loops when looping elements.
:param elements: iterable containing XML elements.
:param xpath: xpath used to loop single element.
:returns: Generator to iterate elements with xpath
"""
for element in elements:
for item in element.findall(xpath, cls.NS):
yield item
@property
def root_element(self):
"""Get root element.
:returns: Root element
:rtype: :obj:`xml.etree.ElementTree.Element`
"""
return self._root_element
@property
def root_language(self):
"""Get language of the root element. If root does not have a language,
returns :attr:`self.default_language`.
:returns: root element language.
:rtype: str
"""
return self.root_element.attrib.get('{%s}lang' % self.NS.get('xml'), self.default_language)
@property
def study_number(self):
"""Get study number as formatted in source XML.
:seealso: :attr:`self.study_number_identifier`
:returns: Study number from source XML.
:rtype: str
"""
return self._identifiers.get('study_number')[0]
@study_number.setter
def study_number(self, study_number):
"""Set study number
:note: study number must be a string althought the naming
suggests an integer/float would do.
:param study_number: Study number.
:type study_number: str
"""
valid = as_valid_identifier(study_number)
if not valid:
raise InvalidContent("Invalid study number: '%s'" % study_number)
self._identifiers['study_number'] = (study_number, valid)
@property
def study_number_identifier(self):
"""Get study number converted as a valid Document Store identifier.
:returns: Study number as valid Document Store identifier.
:rtype: str
"""
return self._identifiers.get('study_number')[1]
# // Implemented API
# Iterators which generate Document Store records
# must be implemented in subclass.
@property
def studies(self):
"""Studies generator. Must be implemented in subclass.
:returns: Generator which yields Document Store studies.
"""
raise NotImplementedError("Implement in subclass")
@property
def variables(self):
"""Variables generator. Must be implemented in subclass.
:returns: Generator which yields Document Store variables.
"""
raise NotImplementedError("Implement in subclass")
@property
def questions(self):
"""Questions generator. Must be implemented in subclass.
:returns: Generator which yields Document Store questions.
"""
raise NotImplementedError("Implement in subclass")
@property
def study_groups(self):
"""Study groups generator. Must be implemented in subclass.
:returns: Generator which yields Document Store study groups.
"""
raise NotImplementedError("Implement in subclass")
@property
def all(self):
"""Iterate all records found from source XML.
:returns: Generator which yields Document Store records.
:rtype: Generator
"""
for study in self.studies:
yield study
for study_group in self.study_groups:
yield study_group
for variable in self.variables:
yield variable
for question in self.questions:
yield question
[docs] def select(self, collection=None):
"""Returns a selective parser. Call with
a Document Store collection as parameter to select
records only for certain collection.
.. Note:: The returned attributes are defined in
subclasses, so they may or may not be
generators.
:param collection: Document Store collection to select only
records belonging to this collection.
:type collection: str or None
:returns: Generator which yields Document Store records.
:rtype: Generator
"""
if not collection:
return self.all
if collection not in COLLECTIONS:
raise ValueError("%s is not a valid Document Store collection" % (collection,))
return getattr(self, collection)
[docs]def as_valid_identifier(candidate):
"""Convert candidate to a string that conforms the rules of validation.
Indentifier must match regex:
[a-zA-Z0-9]+[a-zA-Z0-9?_()-.]*'\"]
.. note:: Regex is defined in Document Store. Should it be moved
to kuha_common?
:returns: identifier which conforms the rules of validation.
:rtype: str
"""
def replace_special_characters(match_object):
string = match_object.group()
string = re.sub(r'[äÄ]', 'ae', string, flags=re.UNICODE)
string = re.sub(r'[öÖ]', 'oe', string, flags=re.UNICODE)
string = re.sub(r'[\s]', '_', string, flags=re.UNICODE)
string = re.sub(r'[^a-zA-Z0-9_.-]', '', string, flags=re.UNICODE)
return string
candidate = re.sub(r'[^a-zA-Z0-9]+[a-zA-Z0-9?_()-.]*', replace_special_characters, candidate)
candidate = re.sub(r'^[^a-zA-Z0-9]+', '', candidate, count=1)
return candidate
[docs]def str_equals(correct, default=None):
"""Conversion function wrapper to compare strings for equality.
Wrapper function that formats comparison value
and default value for returned comparison function.
Check if string found from element value or element
attribute equals to `correct`.
:param correct: comparison string.
:type correct: str
:param default: If the value parameter of the comparison
function is None, return this value.
:type default: str
:returns: function which accepts a single parameter for
comparison. Returns True or False, or `default`
if the parameter is None.
:rtype: function
"""
def _equals(value):
if value is None:
return default
return str(value) == str(correct)
return _equals
[docs]def fixed_value(fixed):
"""Fixed value.
:param fixed: Use this value
:returns: function which accepts a single argument value.
The function always returns fixed.
:rtype: function
"""
def _fixed_value(value):
return fixed
return _fixed_value
[docs]def element_remove_whitespaces(element):
"""Conversion function to remove extra whitespace from end of element text.
Iterates element's inner text using
:meth:`xml.etree.ElementTree.Element.itertext`
which iterates over this element and all subelements.
Removes extra whitespaces so paragraphs of text will
only have one separating whitespace character.
:param element: Element from which to get text.
:type element: :obj:`xml.etree.ElementTree.Element`
:returns: Element's inner text without extra whitespace.
:rtype: str
"""
value = ""
ends_with_space = False
for text in element.itertext():
text = " ".join([_p.strip() for _p in text.split("\n")])
if not ends_with_space and value != "":
value = value + " " + text.lstrip()
else:
value += text.lstrip()
ends_with_space = text.endswith(" ")
if ends_with_space:
value = value.rstrip()
return value
[docs]def element_strip_descendant_text(element, tags_to_remove=None):
"""Conversion function to remove inner elements and
their contents.
:param element: Element for lookup.
:type element: :obj:`xml.etree.ElementTree.Element`
:param tags_to_remove: Optional list of tags to remove from
results text content. If given, will only remove elements that
match tags that are found from this list. If not given, will
remove all child elements. Tags that are given in list but not
found from ``element`` children will be silently ignored.
:type tags_to_remove: list
:returns: Element's inner text without text from descendants and
without extra whitespace.
:rtype: str
"""
element_copy = copy.deepcopy(element)
children = list(element_copy)
for child in children:
if tags_to_remove is None or child.tag in tags_to_remove:
element_copy.remove(child)
return element_remove_whitespaces(element_copy)
[docs]def element_strip_descendant_text_by_tag(tag, *tags):
"""Conversion function to remove inner elements by tag name.
Acts as a closure function that formats the ``tags_to_remove``
parameter and proxies to :func:`element_strip_descendant_text`.
:param str tag: Tag name to remove. Repeat for multiple tags.
:returns: lambda that proxies to :func:`element_strip_descendant_text`
"""
return lambda element: element_strip_descendant_text(element, (tag,) + tags)
[docs]def get_preferred_publication_id_agency_pair(ids_agencys):
"""Get preferred id + agency two-tuple.
Due to limitations in Kuha data model, a Study.related_publication
item can only contain a single identifier + agency pair. This function
will search the preferred id + agency pair by consulting the agency values.
Preference by priority:
1. DOI
2. Handle
3. URN
4. ARK
5. <first ID with agency>
6. <first ID without agency>
:params list ids_agencys: list of id + agency two-tuples
:returns: id + agency two-tuple.
:rtype: tuple
"""
if len(ids_agencys) == 1:
return ids_agencys[0]
agencys_ids = {}
for _id, agency in ids_agencys:
if agency == 'DOI':
return _id, agency
if agency not in agencys_ids:
agencys_ids.update({agency: _id})
candidate = (agencys_ids.pop(None, None), None)
if agencys_ids:
for agency in ('Handle', 'URN', 'ARK'):
if agency in agencys_ids:
candidate = (agencys_ids[agency], agency)
break
else:
agency = list(agencys_ids)[0]
candidate = (agencys_ids[agency], agency)
return candidate