#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Simple validation for dictionary representation of
document store records.
Validate study record dictionary:
>>> from kuha_common.document_store.records import Study
>>> from kuha_document_store.validation import validate
>>> validate(Study.get_collection(), Study().export_dict(include_metadata=False))
Traceback (most recent call last):
[...]
def validate(collection, document, raise_error=True, update=False):
kuha_document_store.validation.RecordValidationError: ('Validation of studies failed',
{'study_number': ['null value not allowed']}
)
"""
from cerberus import Validator
from kuha_common.document_store.constants import REC_FIELDNAME_LANGUAGE
from kuha_common.document_store.records import RecordBase
KEY_REGEX = 'regex'
KEY_TYPE = 'type'
KEY_REQUIRED = 'required'
KEY_SCHEMA = 'schema'
KEY_EMPTY = 'empty'
KEY_NULLABLE = 'nullable'
KEY_NO_DUPLICATES = 'no_duplicates'
KEY_ALLOWED = 'allowed'
REGEX_IDENTIFIER = r"[a-zA-Z0-9]+[a-zA-Z0-9?_()-.]*"
TYPE_STRING = 'string'
TYPE_DICT = 'dict'
TYPE_BOOLEAN = 'boolean'
TYPE_LIST = 'list'
def identifier_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_STRING,
KEY_REQUIRED: True,
KEY_EMPTY: False,
KEY_REGEX: REGEX_IDENTIFIER}}
def default_schema_item(fieldname, nullable=False, required=False):
return {fieldname: {KEY_TYPE: TYPE_STRING,
KEY_REQUIRED: required,
KEY_NULLABLE: nullable}}
def bool_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_BOOLEAN}}
def str_enum_item(fieldname, allowed_values, nullable=False, required=False):
return {fieldname: {KEY_TYPE: TYPE_STRING,
KEY_ALLOWED: allowed_values,
KEY_NULLABLE: nullable,
KEY_REQUIRED: required}}
def uniquelist_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_LIST,
KEY_SCHEMA: {
KEY_TYPE: TYPE_STRING
},
KEY_NO_DUPLICATES: True,
KEY_NULLABLE: True}}
def uniquelist_study_numbers_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_LIST,
KEY_SCHEMA: {
KEY_TYPE: TYPE_STRING,
KEY_REGEX: REGEX_IDENTIFIER
},
KEY_NO_DUPLICATES: True,
KEY_NULLABLE: True}}
def container_schema_item(fieldname, subschema):
return {fieldname: {KEY_TYPE: TYPE_LIST,
KEY_SCHEMA: {KEY_TYPE: TYPE_DICT,
KEY_SCHEMA: subschema}}}
def dict_schema_item(fieldname, subschema):
return {fieldname: {KEY_TYPE: TYPE_DICT,
KEY_SCHEMA: subschema}}
[docs]class RecordValidator(Validator):
"""Subclass :class:`cerberus.Validator` to customize validation.
JSON does not support sets. Therefore a rule to validate
list items for uniquity is needed.
For the sake of simplicity in raising and handling validation errors
this class also overrides :meth:`cerberus.Validator.validate`.
"""
def _validate_no_duplicates(self, no_duplicates, field, value):
"""Test uniqueness of list item.
The rule's arguments are validated against this schema:
{'type': 'boolean'}
"""
if not isinstance(value, list):
self._error(field, "Value must be list")
elif no_duplicates and len(set(value)) != len(value):
self._error(field, "Value must be unique within list")
[docs] def validate(self, document, **kwargs):
r"""Override :meth:`cerberus.Validator.validate`
Handle unvalidated _id-field here to simplify error message flow
and enable validation messages.
If document is to be updated it is allowed to have
an _id field. If document is being inserted it is an
error to have an _id field.
:param document: Document to be validated.
:type document: dict
:param \*\*kwargs: keyword arguments passed to
:meth:`cerberus.Validator.validate`.
Here it is only checked if keyword argument
updated is present and True.
:return: True if validation passes, False if not.
:rtype: bool
"""
has_id = document.pop(RecordBase._id.path, None)
validated = super().validate(document, **kwargs)
# Id cannot be manually set. If this is insert, regard it as an error.
# For updates it's allowed for roundtrips.
update = kwargs.get('update')
if not update and has_id:
validated = False
self._error(RecordBase._id.path, "Cannot manually set id")
return validated
[docs]class RecordValidationError(Exception):
"""Raised on validation errors.
:param collection: Collection that got validated.
:type collection: str
:param validation_errors: Validation errors from :attr:`cerberus.Validator.errors`.
These are stored in :attr:`RecordValidationError.validation_errors`
for later processing.
:type validation_errors: dict
:param msg: Optional message.
:type msg: str
:returns: :obj:`RecordValidationError`
"""
def __init__(self, collection, validation_errors, msg=None):
if not msg:
msg = "Validation of %s failed" % collection
super().__init__(msg, validation_errors)
self.collection = collection
self.validation_errors = validation_errors
self.msg = msg
[docs]class RecordValidationSchema:
r"""Create validation schema from records in
:mod:`kuha_common.document_store.records` to validate
user-submitted data.
Schema items are built dynamically by consulting record's field types.
* For single value fields the type is string and null values are not accepted.
* For localizable fields it is required to have a
:const:`kuha_common.document_store.constants.REC_FIELDNAME_LANGUAGE` attribute.
* Field attributes are strings and they may be null.
* Subfield values are strings and not nullable.
* Fallback to string, not null.
Record's metadata is accepted as input but not required.
:note: :attr:`kuha_common.document_store.RecordBase._metadata` and
:attr:`kuha_common.document_store.RecordBase._id` are also validated
at database level.
:seealso: :meth:`kuha_document_store.database.RecordsCollection.get_validator`
Every dynamically built schema item may be overriden by a custom schema item
given as a parameter for class constructor.
:param record_class: class which holds record attributes.
:type record_class: :class:`kuha_common.document_store.records.Study` or
:class:`kuha_common.document_store.records.Variable` or
:class:`kuha_common.document_store.records.Question` or
:class:`kuha_common.document_store.records.StudyGroup`
:param \*args: Custom schema items to override dynamically built schema items.
:returns: :obj:`RecordValidationSchema`
"""
def __init__(self, record_class, base_schema, *args):
# format with base schema
self.schema = dict(base_schema)
self.customize_schema = {}
for custom_schema in args:
for path in custom_schema.keys():
keys = path.split('.')
_key = path
self._recurse_and_update_customize_schema(
keys, custom_schema[_key]
)
for _field in record_class.iterate_record_fields():
self._add_record_field(_field[1])
self.collection_name = record_class.get_collection()
def _add_record_field(self, field):
if field.single_value:
self._add_schema_item(field.path)
return
if field.localizable:
self._add_container_schema_item(field.path, REC_FIELDNAME_LANGUAGE, required=True)
for att in field.attrs:
self._add_container_schema_item(field.path, att, nullable=True)
if field.sub_name:
self._add_container_schema_item(field.path, field.sub_name.name, nullable=True)
else:
self._add_schema_item(field.path)
def _add_schema_item(self, fieldname):
if fieldname in self.customize_schema:
self.schema.update({fieldname: self.customize_schema[fieldname]})
else:
self.schema.update(default_schema_item(fieldname))
def _add_container_schema_item(self, fieldname, subfieldname, nullable=False, required=False):
custom_schema = self.customize_schema.get(fieldname, {}).get(subfieldname)
if fieldname not in self.schema:
if custom_schema:
self.schema.update(container_schema_item(
fieldname, {subfieldname: custom_schema}
))
else:
self.schema.update(container_schema_item(
fieldname, default_schema_item(subfieldname, nullable, required)
))
elif custom_schema:
self.schema[fieldname][KEY_SCHEMA][KEY_SCHEMA].update(
{subfieldname: custom_schema})
else:
self.schema[fieldname][KEY_SCHEMA][KEY_SCHEMA].update(
default_schema_item(subfieldname, nullable, required))
def _recurse_and_update_customize_schema(self, keys, schema):
tmp = self.customize_schema
for key in keys:
if key not in tmp:
tmp.update({key: {}})
tmp = tmp[key]
tmp.update(schema)
[docs] def get_schema(self):
"""Get Schema.
:returns: Validation schema supported by :mod:`cerberus`
:rtype: dict
"""
return self.schema
[docs]def validate(rec_val_schema, document, raise_error=True, update=False):
"""Validate document against collection schema.
:param :obj:`RecordValidationSchema` rec_val_schema: Record validation schema
to validate against.
:param dict document: Document to validate. Document is a dictionary representation
of a document store record.
:param bool raise_error: Should a :exc:`RecordValidationError` be raised if
validation fails.
:param bool update: Validate for an update/replace operation of an
existing record?
:returns: True if document passed validation, False if fails.
:rtype: bool
:raises: :exc:`RecordValidationError` if raise_error is True and
document fails validation.
"""
validator = RecordValidator(rec_val_schema.get_schema())
if validator.validate(document, update=update):
return True
if raise_error:
raise RecordValidationError(rec_val_schema.collection_name, validator.errors)
return False