Source code for kuha_document_store.validation

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Simple validation for dictionary representation of
document store records.

Validate study record dictionary:

    >>> from kuha_common.document_store.records import Study
    >>> from kuha_document_store.validation import validate
    >>> validate(Study.get_collection(), Study().export_dict(include_metadata=False))
    Traceback (most recent call last):
    [...]
        def validate(collection, document, raise_error=True, update=False):
    kuha_document_store.validation.RecordValidationError: ('Validation of studies failed',
        {'study_number': ['null value not allowed']}
    )

"""
from cerberus import Validator

from kuha_common.document_store.constants import REC_FIELDNAME_LANGUAGE
from kuha_common.document_store.records import RecordBase


KEY_REGEX = 'regex'
KEY_TYPE = 'type'
KEY_REQUIRED = 'required'
KEY_SCHEMA = 'schema'
KEY_EMPTY = 'empty'
KEY_NULLABLE = 'nullable'
KEY_NO_DUPLICATES = 'no_duplicates'
KEY_ALLOWED = 'allowed'

REGEX_IDENTIFIER = r"[a-zA-Z0-9]+[a-zA-Z0-9?_()-.]*"

TYPE_STRING = 'string'
TYPE_DICT = 'dict'
TYPE_BOOLEAN = 'boolean'
TYPE_LIST = 'list'


def identifier_schema_item(fieldname):
    return {fieldname: {KEY_TYPE: TYPE_STRING,
                        KEY_REQUIRED: True,
                        KEY_EMPTY: False,
                        KEY_REGEX: REGEX_IDENTIFIER}}


def default_schema_item(fieldname, nullable=False, required=False):
    return {fieldname: {KEY_TYPE: TYPE_STRING,
                        KEY_REQUIRED: required,
                        KEY_NULLABLE: nullable}}


def bool_schema_item(fieldname):
    return {fieldname: {KEY_TYPE: TYPE_BOOLEAN}}


def str_enum_item(fieldname, allowed_values, nullable=False, required=False):
    return {fieldname: {KEY_TYPE: TYPE_STRING,
                        KEY_ALLOWED: allowed_values,
                        KEY_NULLABLE: nullable,
                        KEY_REQUIRED: required}}


def uniquelist_schema_item(fieldname):
    return {fieldname: {KEY_TYPE: TYPE_LIST,
                        KEY_SCHEMA: {
                            KEY_TYPE: TYPE_STRING
                        },
                        KEY_NO_DUPLICATES: True,
                        KEY_NULLABLE: True}}


def uniquelist_study_numbers_schema_item(fieldname):
    return {fieldname: {KEY_TYPE: TYPE_LIST,
                        KEY_SCHEMA: {
                            KEY_TYPE: TYPE_STRING,
                            KEY_REGEX: REGEX_IDENTIFIER
                        },
                        KEY_NO_DUPLICATES: True,
                        KEY_NULLABLE: True}}


def container_schema_item(fieldname, subschema):
    return {fieldname: {KEY_TYPE: TYPE_LIST,
                        KEY_SCHEMA: {KEY_TYPE: TYPE_DICT,
                                     KEY_SCHEMA: subschema}}}


def dict_schema_item(fieldname, subschema):
    return {fieldname: {KEY_TYPE: TYPE_DICT,
                        KEY_SCHEMA: subschema}}


[docs]class RecordValidator(Validator): """Subclass :class:`cerberus.Validator` to customize validation. JSON does not support sets. Therefore a rule to validate list items for uniquity is needed. For the sake of simplicity in raising and handling validation errors this class also overrides :meth:`cerberus.Validator.validate`. """ def _validate_no_duplicates(self, no_duplicates, field, value): """Test uniqueness of list item. The rule's arguments are validated against this schema: {'type': 'boolean'} """ if not isinstance(value, list): self._error(field, "Value must be list") elif no_duplicates and len(set(value)) != len(value): self._error(field, "Value must be unique within list")
[docs] def validate(self, document, **kwargs): r"""Override :meth:`cerberus.Validator.validate` Handle unvalidated _id-field here to simplify error message flow and enable validation messages. If document is to be updated it is allowed to have an _id field. If document is being inserted it is an error to have an _id field. :param document: Document to be validated. :type document: dict :param \*\*kwargs: keyword arguments passed to :meth:`cerberus.Validator.validate`. Here it is only checked if keyword argument updated is present and True. :return: True if validation passes, False if not. :rtype: bool """ has_id = document.pop(RecordBase._id.path, None) validated = super().validate(document, **kwargs) # Id cannot be manually set. If this is insert, regard it as an error. # For updates it's allowed for roundtrips. update = kwargs.get('update') if not update and has_id: validated = False self._error(RecordBase._id.path, "Cannot manually set id") return validated
[docs]class RecordValidationError(Exception): """Raised on validation errors. :param collection: Collection that got validated. :type collection: str :param validation_errors: Validation errors from :attr:`cerberus.Validator.errors`. These are stored in :attr:`RecordValidationError.validation_errors` for later processing. :type validation_errors: dict :param msg: Optional message. :type msg: str :returns: :obj:`RecordValidationError` """ def __init__(self, collection, validation_errors, msg=None): if not msg: msg = "Validation of %s failed" % collection super().__init__(msg, validation_errors) self.collection = collection self.validation_errors = validation_errors self.msg = msg
[docs]class RecordValidationSchema: r"""Create validation schema from records in :mod:`kuha_common.document_store.records` to validate user-submitted data. Schema items are built dynamically by consulting record's field types. * For single value fields the type is string and null values are not accepted. * For localizable fields it is required to have a :const:`kuha_common.document_store.constants.REC_FIELDNAME_LANGUAGE` attribute. * Field attributes are strings and they may be null. * Subfield values are strings and not nullable. * Fallback to string, not null. Record's metadata is accepted as input but not required. :note: :attr:`kuha_common.document_store.RecordBase._metadata` and :attr:`kuha_common.document_store.RecordBase._id` are also validated at database level. :seealso: :meth:`kuha_document_store.database.RecordsCollection.get_validator` Every dynamically built schema item may be overriden by a custom schema item given as a parameter for class constructor. :param record_class: class which holds record attributes. :type record_class: :class:`kuha_common.document_store.records.Study` or :class:`kuha_common.document_store.records.Variable` or :class:`kuha_common.document_store.records.Question` or :class:`kuha_common.document_store.records.StudyGroup` :param \*args: Custom schema items to override dynamically built schema items. :returns: :obj:`RecordValidationSchema` """ def __init__(self, record_class, base_schema, *args): # format with base schema self.schema = dict(base_schema) self.customize_schema = {} for custom_schema in args: for path in custom_schema.keys(): keys = path.split('.') _key = path self._recurse_and_update_customize_schema( keys, custom_schema[_key] ) for _field in record_class.iterate_record_fields(): self._add_record_field(_field[1]) self.collection_name = record_class.get_collection() def _add_record_field(self, field): if field.single_value: self._add_schema_item(field.path) return if field.localizable: self._add_container_schema_item(field.path, REC_FIELDNAME_LANGUAGE, required=True) for att in field.attrs: self._add_container_schema_item(field.path, att, nullable=True) if field.sub_name: self._add_container_schema_item(field.path, field.sub_name.name, nullable=True) else: self._add_schema_item(field.path) def _add_schema_item(self, fieldname): if fieldname in self.customize_schema: self.schema.update({fieldname: self.customize_schema[fieldname]}) else: self.schema.update(default_schema_item(fieldname)) def _add_container_schema_item(self, fieldname, subfieldname, nullable=False, required=False): custom_schema = self.customize_schema.get(fieldname, {}).get(subfieldname) if fieldname not in self.schema: if custom_schema: self.schema.update(container_schema_item( fieldname, {subfieldname: custom_schema} )) else: self.schema.update(container_schema_item( fieldname, default_schema_item(subfieldname, nullable, required) )) elif custom_schema: self.schema[fieldname][KEY_SCHEMA][KEY_SCHEMA].update( {subfieldname: custom_schema}) else: self.schema[fieldname][KEY_SCHEMA][KEY_SCHEMA].update( default_schema_item(subfieldname, nullable, required)) def _recurse_and_update_customize_schema(self, keys, schema): tmp = self.customize_schema for key in keys: if key not in tmp: tmp.update({key: {}}) tmp = tmp[key] tmp.update(schema)
[docs] def get_schema(self): """Get Schema. :returns: Validation schema supported by :mod:`cerberus` :rtype: dict """ return self.schema
[docs]def validate(rec_val_schema, document, raise_error=True, update=False): """Validate document against collection schema. :param :obj:`RecordValidationSchema` rec_val_schema: Record validation schema to validate against. :param dict document: Document to validate. Document is a dictionary representation of a document store record. :param bool raise_error: Should a :exc:`RecordValidationError` be raised if validation fails. :param bool update: Validate for an update/replace operation of an existing record? :returns: True if document passed validation, False if fails. :rtype: bool :raises: :exc:`RecordValidationError` if raise_error is True and document fails validation. """ validator = RecordValidator(rec_val_schema.get_schema()) if validator.validate(document, update=update): return True if raise_error: raise RecordValidationError(rec_val_schema.collection_name, validator.errors) return False