#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Simple validation for dictionary representation of
document store records.
:note: This module has strict dependency to
:mod:`kuha_common.document_store.records`
Validate study record dictionary:
>>> from kuha_common.document_store.records import Study
>>> from kuha_document_store.validation import validate
>>> validate(Study.get_collection(), Study().export_dict(include_metadata=False))
Traceback (most recent call last):
[...]
def validate(collection, document, raise_error=True, update=False):
kuha_document_store.validation.RecordValidationError: ('Validation of studies failed',
{'study_number': ['null value not allowed']}
)
"""
from cerberus import Validator
from kuha_common.document_store.constants import REC_FIELDNAME_LANGUAGE
from kuha_common.document_store.records import (
RecordBase,
Study,
Variable,
Question,
StudyGroup
)
KEY_REGEX = 'regex'
KEY_TYPE = 'type'
KEY_REQUIRED = 'required'
KEY_SCHEMA = 'schema'
KEY_EMPTY = 'empty'
KEY_NULLABLE = 'nullable'
KEY_NO_DUPLICATES = 'no_duplicates'
REGEX_IDENTIFIER = r"[a-zA-Z0-9]+[a-zA-Z0-9?_()-.]*"
TYPE_STRING = 'string'
TYPE_DICT = 'dict'
TYPE_BOOLEAN = 'boolean'
TYPE_LIST = 'list'
def _identifier_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_STRING,
KEY_REQUIRED: True,
KEY_EMPTY: False,
KEY_REGEX: REGEX_IDENTIFIER}}
def _default_schema_item(fieldname, nullable=False, required=False):
return {fieldname: {KEY_TYPE: TYPE_STRING,
KEY_REQUIRED: required,
KEY_NULLABLE: nullable}}
def _bool_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_BOOLEAN}}
def _uniquelist_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_LIST,
KEY_SCHEMA: {
KEY_TYPE: TYPE_STRING
},
KEY_NO_DUPLICATES: True,
KEY_NULLABLE: True}}
def _uniquelist_study_numbers_schema_item(fieldname):
return {fieldname: {KEY_TYPE: TYPE_LIST,
KEY_SCHEMA: {
KEY_TYPE: TYPE_STRING,
KEY_REGEX: REGEX_IDENTIFIER
},
KEY_NO_DUPLICATES: True,
KEY_NULLABLE: True}}
def _container_schema_item(fieldname, subschema):
return {fieldname: {KEY_TYPE: TYPE_LIST,
KEY_SCHEMA: {KEY_TYPE: TYPE_DICT,
KEY_SCHEMA: subschema}}}
def _metadata_schema():
items = {}
items.update(_default_schema_item(RecordBase._metadata.attr_created.name))
items.update(_default_schema_item(RecordBase._metadata.attr_updated.name))
items.update(_default_schema_item(RecordBase._metadata.attr_cmm_type.name))
return {
RecordBase._metadata.path: {
KEY_TYPE: TYPE_DICT,
KEY_SCHEMA: items
}
}
[docs]class RecordValidator(Validator):
"""Subclass :class:`cerberus.Validator` to customize validation.
JSON does not support sets. Therefore a rule to validate
list items for uniquity is needed.
For the sake of simplicity in raising and handling validation errors
this class also overrides :meth:`cerberus.Validator.validate`.
"""
def _validate_no_duplicates(self, no_duplicates, field, value):
"""Test uniqueness of list item.
The rule's arguments are validated against this schema:
{'type': 'boolean'}
"""
if not isinstance(value, list):
self._error(field, "Value must be list")
elif no_duplicates and len(set(value)) != len(value):
self._error(field, "Value must be unique within list")
[docs] def validate(self, document, **kwargs):
r"""Override :meth:`cerberus.Validator.validate`
Handle unvalidated _id-field here to simplify error message flow
and enable validation messages.
If document is to be updated it is allowed to have
an _id field. If document is being inserted it is an
error to have an _id field.
:param document: Document to be validated.
:type document: dict
:param \*\*kwargs: keyword arguments passed to
:meth:`cerberus.Validator.validate`.
Here it is only checked if keyword argument
updated is present and True.
:return: True if validation passes, False if not.
:rtype: bool
"""
has_id = document.pop(RecordBase._id.path, None)
validated = super().validate(document, **kwargs)
# Id cannot be manually set. If this is insert, regard it as an error.
# For updates it's allowed for roundtrips.
update = kwargs.get('update')
if not update and has_id:
validated = False
self._error(RecordBase._id.path, "Cannot manually set id")
return validated
[docs]class RecordValidationError(Exception):
"""Raised on validation errors.
:param collection: Collection that got validated.
:type collection: str
:param validation_errors: Validation errors from :attr:`cerberus.Validator.errors`.
These are stored in :attr:`RecordValidationError.validation_errors`
for later processing.
:type validation_errors: dict
:param msg: Optional message.
:type msg: str
:returns: :obj:`RecordValidationError`
"""
def __init__(self, collection, validation_errors, msg=None):
if not msg:
msg = "Validation of %s failed" % collection
super().__init__(msg, validation_errors)
self.collection = collection
self.validation_errors = validation_errors
self.msg = msg
[docs]class RecordValidationSchema:
r"""Create validation schema from records in
:mod:`kuha_common.document_store.records` to validate
user-submitted data.
Schema items are built dynamically by consulting record's field types.
* For single value fields the type is string and null values are not accepted.
* For localizable fields it is required to have a
:const:`kuha_common.document_store.constants.REC_FIELDNAME_LANGUAGE` attribute.
* Field attributes are strings and they may be null.
* Subfield values are strings and not nullable.
* Fallback to string, not null.
Record's metadata is accepted as input but not required.
:note: :attr:`kuha_common.document_store.RecordBase._metadata` and
:attr:`kuha_common.document_store.RecordBase._id` are also validated
at database level.
:seealso: :meth:`kuha_document_store.database.RecordsCollection.get_validator`
Every dynamically built schema item may be overriden by a custom schema item
given as a parameter for class constructor.
:param record_class: class which holds record attributes.
:type record_class: :class:`kuha_common.document_store.records.Study` or
:class:`kuha_common.document_store.records.Variable` or
:class:`kuha_common.document_store.records.Question` or
:class:`kuha_common.document_store.records.StudyGroup`
:param \*args: Custom schema items to override dynamically built schema items.
:returns: :obj:`RecordValidationSchema`
"""
def __init__(self, record_class, *args):
# format with metadata schema
self.schema = _metadata_schema()
self.customize_schema = {}
for custom_schema in args:
for path in custom_schema.keys():
keys = path.split('.')
_key = path
self._recurse_and_update_customize_schema(
keys, custom_schema[_key]
)
for _field in record_class.iterate_record_fields():
self._add_record_field(_field[1])
def _add_record_field(self, field):
if field.single_value:
self._add_schema_item(field.path)
return
if field.localizable:
self._add_container_schema_item(field.path, REC_FIELDNAME_LANGUAGE, required=True)
for att in field.attrs:
self._add_container_schema_item(field.path, att, nullable=True)
if field.sub_name:
self._add_container_schema_item(field.path, field.sub_name.name)
else:
self._add_schema_item(field.path)
def _add_schema_item(self, fieldname):
if fieldname in self.customize_schema:
self.schema.update({fieldname: self.customize_schema[fieldname]})
else:
self.schema.update(_default_schema_item(fieldname))
def _add_container_schema_item(self, fieldname, subfieldname, nullable=False, required=False):
custom_schema = self.customize_schema.get(fieldname, {}).get(subfieldname)
if fieldname not in self.schema:
if custom_schema:
self.schema.update(_container_schema_item(
fieldname, {subfieldname: custom_schema}
))
else:
self.schema.update(_container_schema_item(
fieldname, _default_schema_item(subfieldname, nullable, required)
))
else:
if custom_schema:
self.schema[fieldname][KEY_SCHEMA][KEY_SCHEMA].update(
{subfieldname: custom_schema}
)
else:
self.schema[fieldname][KEY_SCHEMA][KEY_SCHEMA].update(
_default_schema_item(subfieldname, nullable, required)
)
def _recurse_and_update_customize_schema(self, keys, schema):
tmp = self.customize_schema
for key in keys:
if key not in tmp:
tmp.update({key: {}})
tmp = tmp[key]
tmp.update(schema)
[docs] def get_schema(self):
"""Get Schema.
:returns: Validation schema supported by :mod:`cerberus`
:rtype: dict
"""
return self.schema
SCHEMAS = {
Study.collection: RecordValidationSchema(
Study,
_identifier_schema_item(Study.study_number.path),
_uniquelist_schema_item(Study.persistent_identifiers.path),
_bool_schema_item(Study.universes.attr_included.path)
),
Variable.collection: RecordValidationSchema(
Variable,
_identifier_schema_item(Variable.variable_name.path),
_identifier_schema_item(Variable.study_number.path),
_bool_schema_item(Variable.codelist_codes.attr_missing.path),
_uniquelist_schema_item(Variable.question_identifiers.path)
),
Question.collection: RecordValidationSchema(
Question,
_identifier_schema_item(Question.study_number.path),
_identifier_schema_item(Question.question_identifier.path),
_default_schema_item(Question.variable_name.path, nullable=True),
),
StudyGroup.collection: RecordValidationSchema(
StudyGroup,
_identifier_schema_item(StudyGroup.study_group_identifier.path),
_uniquelist_study_numbers_schema_item(StudyGroup.study_numbers.path)
)
}
[docs]def validate(collection, document, raise_error=True, update=False):
"""Validate document against collection schema.
:param collection: Collection the document belongs to.
:type collection: str
:param document: Document to validate. Document is a dictionary representation
of a document store record.
:type document: dict
:param raise_error: Should a :exc:`RecordValidationError` be raised if
validation fails.
:type raise_error: bool
:param update: Validate for an update/replace operation of an
existing record?
:type update: bool
:returns: True if document passed validation, False if fails.
:rtype: bool
:raises: :exc:`RecordValidationError` if raise_error is True and
document fails validation.
"""
schema = SCHEMAS[collection].get_schema()
validator = RecordValidator(schema)
if validator.validate(document, update=update):
return True
if raise_error:
raise RecordValidationError(collection, validator.errors)
return False