Source code for kuha_document_store.database

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Database module provides access to MongoDB database.

MongoDB Database is accessed throught this module.
The module also provides convenience methods for easy access
and manipulation via Document Store records defined in
:mod:`kuha_common.document_store.records`

Database can be used directly, via records or with
JSON representation of records.

:note: This module has strict dependency to
       :mod:`kuha_common.document_store.records`
"""
# stdlib
import json
from urllib.parse import quote_plus
# 3rd party libs
import motor
# Motor uses pymongo under the hood, and raises pymongo errors
import pymongo.errors
from pymongo import (
    ReturnDocument,
    DESCENDING,
    ASCENDING
)
import bson.json_util
from bson.objectid import ObjectId
import bson.errors
import dateutil.parser

# kuha_common
from kuha_common.document_store.records import (
    Study,
    Variable,
    Question,
    StudyGroup,
    RecordBase,
    record_by_collection,
    datetime_to_datestamp,
    datestamp_to_datetime,
    datetime_now
)
from kuha_common.document_store.constants import (
    MDB_ISODATE,
    MDB_EXISTS,
    MDB_REGEX,
    MDB_TYPE,
    MDB_AND,
    MDB_TYPE_DATE,
    MDB_FIELDNAME_ID
)
# self
from kuha_document_store.validation import (
    validate,
    RecordValidationError
)


[docs]def mongodburi(host_port, *hosts_ports, database=None, credentials=None, options=None): """Create and return a mongodb connection string in the form of a MongoURI. The standard URI connection scheme has the form: mongodb://[username:password@]host1[:port1][,...hostN[:portN]]][/[database][?options]] - https://docs.mongodb.com/manual/reference/connection-string/ :param str host_port: One of more host and port of a mongod instance. :param str database: Optional database. :param tuple credentials: Options credentials (user, pwd). :param list options: Optional options as a list of tuples [(opt_key1, opt_val1), (opt_key2, opt_val2)] :returns: MongoURI connection string. :rtype: str """ template = 'mongodb://{credentials}{replicas}{database}{options}' database = '/{}'.format(database or '') if database or options else '' credentials = '%s:%s@' % (credentials[0], quote_plus(credentials[1])) if credentials else '' options = '?' + '&'.join(['%s=%s' % x for x in options]) if options else '' replicas = ','.join((host_port,) + hosts_ports) return template.format(credentials=credentials, replicas=replicas, database=database, options=options)
def _isodate_hook(dict_): # Mongodb represenation supports since-epoch-style dates. # we wish to use isodates if MDB_ISODATE in dict_: return dateutil.parser.parse(dict_[MDB_ISODATE]) return bson.json_util.object_hook(dict_) def _json_decode(json_object, pre_decode='utf-8'): if pre_decode: json_object = json_object.decode(pre_decode) return json.loads(json_object, object_hook=_isodate_hook) def _recurse_and_convert(_dict, keys, conversion, allow_none=False): """Do in-place conversion of dictionary object by recursing to `keys`. :param _dict: object to convert. :type _dict: dict :param keys: dictionary keys to recurse into. When recursion is complete a value that is found is submitted to `conversion` callable. :type keys: list :param conversion: callable which will do conversion and return the converted value. :type conversion: callable :param allow_none: Treat missing key as exception. :type allow_none: bool """ _len = len(keys) for index, _key in enumerate(keys, start=1): if index == _len: try: _v = _dict[keys[-1]] except KeyError: if allow_none: return raise _dict[keys[-1]] = conversion(_v) try: _dict = _dict[_key] except KeyError: if allow_none: return raise
[docs]class RecordsCollection: """Database collection. :note: Relational Database term *table* is called a *collection* in MongoDB. Contains properties for Document Store collections. Has strict dependency to :mod:`kuha_common.document_store.records` :param record_class: Class of a record that belongs to this collection. :type record_class: :class:`kuha_common.document_store.records.Study` or :class:`kuha_common.document_store.records.Variable` or :class:`kuha_common.document_store.records.Question` or :class:`kuha_common.document_store.records.StudyGroup` :param indexes_unique: declare unique indexes. :type indexes_unique: list or None :param indexes: additional indexes :type indexes: list or None :param validators: additional validators :type validators: list or None :returns: :obj:`RecordsCollection` """ #: List common isodate fields isodate_fields = [ RecordBase._metadata.attr_created.path, RecordBase._metadata.attr_updated.path ] #: Fields containing MongoDB ObjectIDs object_id_fields = [ RecordBase._id.path ] #: Declare updated field as index. index_updated = [ (RecordBase._metadata.attr_updated.path, DESCENDING) ] def __init__(self, record_class, indexes_unique=None, indexes=None, validators=None): self.collection_name = record_class.get_collection() self.indexes_unique = indexes_unique self.indexes = [ self.index_updated ] if indexes: self.indexes.append(indexes) self.validators = [ {RecordBase._metadata.attr_created.path: {MDB_EXISTS: True, MDB_TYPE: MDB_TYPE_DATE}}, {RecordBase._metadata.attr_updated.path: {MDB_EXISTS: True, MDB_TYPE: MDB_TYPE_DATE}}, {RecordBase._metadata.attr_cmm_type.path: {MDB_EXISTS: True, MDB_REGEX: "^{}$".format(record_class.cmm_type)}} ] if validators: self.validators.extend(validators) @classmethod def _encode_isodates(cls, _dict): for path in cls.isodate_fields: sects = path.split('.') _recurse_and_convert( _dict, sects, datetime_to_datestamp, allow_none=True ) @classmethod def _decode_isodates(cls, _dict): for path in cls.isodate_fields: sects = path.split('.') _recurse_and_convert( _dict, sects, datestamp_to_datetime, allow_none=True )
[docs] @classmethod def bson_to_json(cls, _dict): """Encode BSON dictionary to JSON. Encodes special type of dictionary that comes from MongoDB queries to JSON representation. Also converts datetimes to strings. :param _dict: Source object containing BSON. :type _dict: dict :returns: Source object converted to JSON. :rtype: str """ cls._encode_isodates(_dict) return bson.json_util.dumps(_dict)
[docs] def get_validator(self): """Get defined database-level validators. :note: All validators are combined with AND operator. :returns: Database level validators to be used on DB setup. :rtype: dict """ return {MDB_AND: self.validators}
[docs] async def process_json_for_upsert(self, json_document, old_metadata=None): """Preprocess JSON for insert/update operations. Decodes JSON to Python dictionary. Validates the result. Creates metadata for the document if the document has none, otherwise uses the submitted metadata. Decodes submitted metadata datestamps to datetime objects. :param json_document: JSON representation of a record. :type json_document: str :param old_metadata: old metadata if updating existing record. :type old_metadata: dict or None :returns: Document ready to be submitted to database. :rtype: dict """ update = bool(old_metadata) doc = _json_decode(json_document) validate(self.collection_name, doc, update=update) if RecordBase._metadata.path in doc: # Submitted metadata. Use it. self._decode_isodates(doc) else: if update: old_metadata[RecordBase._metadata.path][RecordBase._metadata.attr_updated.name] = datetime_now() doc.update({ RecordBase._metadata.path: old_metadata[RecordBase._metadata.path] }) else: record = record_by_collection(self.collection_name)() doc.update(record.export_metadata_dict()) return doc
#: Define Record Collections RECORD_COLLECTIONS = [ RecordsCollection( Study, indexes_unique=[[(Study.study_number.path, DESCENDING)]], validators=[{Study.study_number.path: {MDB_EXISTS: True}}] ), RecordsCollection( Variable, indexes_unique=[[(Variable.study_number.path, ASCENDING), (Variable.variable_name.path, DESCENDING)]], validators=[{Variable.study_number.path: {MDB_EXISTS: True}}, {Variable.variable_name.path: {MDB_EXISTS: True}}] ), RecordsCollection( Question, indexes_unique=[[(Question.study_number.path, ASCENDING), (Question.question_identifier.path, ASCENDING)]], validators=[{Question.study_number.path: {MDB_EXISTS: True}}, {Question.question_identifier.path: {MDB_EXISTS: True}}] ), RecordsCollection( StudyGroup, indexes_unique=[ [(StudyGroup.study_group_identifier.path, DESCENDING)]], validators=[ {StudyGroup.study_group_identifier.path: {MDB_EXISTS: True}}] ) ]
[docs]class Database: """MongoDB database. Provides access to low-level database operations. For fine access control uses two database credentials, one for read-only operations, one for write operations. Chooses the correct credentials to authenticate based on the operation to be performed. :note: Does not authenticate or connect to the database before actually performing operations that need connecting. Therefore connection/authentication issues will raise when performing operations and not when initiating the database. :param settings: settings for database connections :type settings: :obj:`argparse.Namespace` :returns: :obj:`Database` """ def __init__(self, settings): self._name = settings.database_name host_port = '%s:%s' % (settings.database_host, settings.database_port) # Motor does not support multiple users on same client. Create separate clients. self.__reader_uri = mongodburi(host_port, database=self._name, credentials=(settings.database_user_reader, settings.database_pass_reader)) self.__editor_uri = mongodburi(host_port, database=self._name, credentials=(settings.database_user_editor, settings.database_pass_editor)) self._reader_client = None self._editor_client = None async def _for_editing(self, collection): if self._editor_client is None: self._editor_client = motor.motor_tornado.MotorClient(self.__editor_uri) self.__editor_uri = None return self._editor_client[self._name][collection] async def _for_reading(self, collection): if self._reader_client is None: self._reader_client = motor.motor_tornado.MotorClient(self.__reader_uri) self.__reader_uri = None return self._reader_client[self._name][collection]
[docs] def close(self): """Close open sockets to database. """ if self._reader_client: self._reader_client.close() self._reader_client = None if self._editor_client: self._editor_client.close() self._editor_client = None
# READ OPERATIONS
[docs] async def query_single(self, collection_name, query, fields=None, callback=None): """Query for a single database document. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :param fields: Fields to select. None selects all. :type fields: list or None :param callback: Result callback. Called with result as parameter. If None this method will return the result. :type callback: function or None :returns: A single document or None if no matching document is found. or if callback is given. :rtype: dict or None """ collection = await self._for_reading(collection_name) result = await collection.find_one(query, projection=fields) if callback: await callback(result) else: return result
[docs] async def query_multiple(self, collection_name, query, callback, fields=None, skip=0, sort_by=None, sort_order=1, limit=0): """Query for multiple database documents. :note: has mandatory callback parameter. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :param callback: Result callback. Called with each document as parameter. :type callback: Function that receives single record result as argument. :param fields: Fields to select. None selects all. :type fields: list or None :param skip: Skip documents from the beginning of query. :type skip: int :param sort_by: Sort by field. :type sort_by: str :param sort_order: Sort by ascending or descending order. MongoDB users 1 to sort ascending -1 to sort descending. :type sort_order: int :param limit: Limit the number of returning documents. 0 returns all documents. :type limit: int """ collection = await self._for_reading(collection_name) cursor = collection.find(query, projection=fields, skip=skip, limit=limit) if sort_by: cursor = cursor.sort(sort_by, sort_order) async for result in cursor: await callback(result)
[docs] async def query_distinct(self, collection_name, fieldname, filter_=None): """Query for distinct values in collection field. :param collection_name: Name of database collection. :type collection_name: str :param fieldname: Field to query for distinct values. :type fieldname: str :param filter_: Optional filter to use with query. :type filter_: dict or None :returns: distinct values. :rtype: list """ collection = await self._for_reading(collection_name) return await collection.distinct(fieldname, filter=filter_)
[docs] async def count(self, collection_name, filter_=None): """Query for document count. :param collection_name: Name of database collection. :type collection_name: str :param filter_: Optional filter to use for query. :type filter_: dict or None :returns: Count of documents. :rtype: int """ filter_ = filter_ or {} collection = await self._for_reading(collection_name) return await collection.count_documents(filter=filter_)
# WRITE OPERATIONS
[docs] async def insert(self, collection_name, document): """Insert single document to database. :param collection_name: Name of database collection. :type collection_name: str :param document: Document to insert. :type document: dict :returns: Insert result :rtype: :obj:`pymongo.results.InsertOneResult` """ collection = await self._for_editing(collection_name) return await collection.insert_one(document)
[docs] async def replace(self, collection_name, oid, document): """Replace single document in database. :param collection_name: Name of database collection. :type collection_name: str :param oid: MongoDB object ID as string. :type oid: str :param document: Document to store. :type document: dict :returns: Update result. :rtype: :obj:`pymongo.results.UpdateResult` """ collection = await self._for_editing(collection_name) return await collection.replace_one({MDB_FIELDNAME_ID: ObjectId(oid)}, document)
[docs] async def insert_or_replace(self, collection_name, query, document): """Insert or replace a single document in database. Uses special MongoDB method which will replace an existing document if one is found via query. Otherwise it will insert a new document. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :param document: Document to store. :type document: dict :returns: The document that was stored. :rtype: dict """ collection = await self._for_editing(collection_name) result = await collection.find_one_and_replace( query, document, upsert=True, return_document=ReturnDocument.AFTER ) return result
# DELETE OPERATIONS
[docs] async def delete_one(self, collection_name, query): """Delete single document. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :returns: Delete result :rtype: :obj:`pymongo.results.DeleteResult` """ collection = await self._for_editing(collection_name) result = await collection.delete_one(query) return result
[docs] async def delete_many(self, collection_name, query): """Delete multiple documents. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :returns: Delete result :rtype: :obj:`pymongo.results.DeleteResult` """ collection = await self._for_editing(collection_name) result = await collection.delete_many(query) return result
[docs]class DocumentStoreDatabase(Database): """Subclass of :class:`Database` Provides specialized methods extending the functionality of :class:`Database`. Combines database operations with properties of RecordsCollection. Defines exceptions that, when raised, the HTTP-response operation can continue. """ #: These are exceptions that may be raised #: in normal database operation, so they are not #: exceptions that should terminate the HTTP-response #: process. As such, the caller may want to catch these errors. recoverable_errors = ( pymongo.errors.WriteError, json.decoder.JSONDecodeError, bson.errors.InvalidId, RecordValidationError )
[docs] @staticmethod def json_decode(json_object): """Helper method for converting HTTP input JSON to python dictionary. :param json_object: json to convert. :type json_object: str :returns: JSON object converted to python dictionary. :rtype: dict """ return _json_decode(json_object)
@staticmethod def _wrap_callback(callback=None, not_found_exception=None): async def _callback(result): if not result: if not_found_exception: raise not_found_exception() return json_chunk = RecordsCollection.bson_to_json(result) if callback: await callback(json_chunk) else: return json_chunk return _callback async def _get_collection_by_name(self, name): for coll in RECORD_COLLECTIONS: coll_name = coll.collection_name if coll_name == name: return coll raise Exception("No such collection: {}".format(name)) # QUERIES
[docs] async def query_multiple(self, collection_name, query, callback, **kwargs): r"""Query multiple documents with callback. Converts resulting BSON to JSON. Calls callback with each resulting record JSON. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :param callback: Result callback. Called with each document as parameter. :type callback: function :param \*\*kwargs: additional keyword arguments passed to super method. """ _call = self._wrap_callback(callback) await super().query_multiple(collection_name, query, _call, **kwargs)
[docs] async def query_by_oid(self, collection_name, oid, callback, fields=None, not_found_exception=None): """Query single record by ObjectID with callback. Converts BSON result to JSON. Calls the callback with resulting JSON. If parameter for `not_found_exception` is given, will raise the exception if query ObjectID points to no known database object. :param collection_name: Name of database collection. :type collection_name: str :param oid: ObjectID to query for. :type oid: str :param callback: function to call with resulting JSON. :type callback: function :param fields: Fields to select. None selects all. :type fields: list or None :param not_found_exception: Raised if ObjectID not found. :type not_found_exception: Exception class. """ query = {MDB_FIELDNAME_ID: ObjectId(oid)} _call = self._wrap_callback(callback, not_found_exception) await self.query_single(collection_name, query, fields=fields, callback=_call)
[docs] async def query_distinct(self, collection_name, fieldname, filter_=None): """Query for distinct values in collection field. If `fieldname` points to a leaf node, returns a list of values, if it points to a branch node, returns a list of dictionaries. If `fieldname` points to leaf node of isodate representations, or to branch node that contains isodates, converts datetimes to datestamps which are JSON serializable. If 'fieldname' points to a leaf node containing MongoDB ObjectID values, cast those values to string. :note: Requires changes to logic if collection.object_id_fields should contain paths with multiple components, for example 'some.path.with.id'. In that case distinct queries that point to brach nodes with OIDs will fail with Exception TypeError: ObjectId('...') is not JSON serializable. :note: Distinction will not work as expected on datestamp-fields that are stored as signed 64-bit integers with millisecond precision. The returned datestamps are not as precise since they have second precision. :param collection_name: Name of database collection. :type collection_name: str :param fieldname: Field to query for distinct values. :type fieldname: str :param filter_: Optional filter to use with query. :type filter_: dict or None :returns: distinct values from database :rtype: list """ results = await super().query_distinct(collection_name, fieldname, filter_=filter_) # results may be a list of dictionaries or list of values, or an empty list. if results == []: # Return empty list. return results # From this point on, we know that there are results. # it also implies that: # * fieldname is a valid path. However, it may not be a path that leads to a leaf node. # * len(collection.isodate_fields.split('.')) >= len(fieldname.split('.')) collection = await self._get_collection_by_name(collection_name) # It is assumed that collection.object_id_fields does not contain paths with multiple components. # This will fail it it does. if fieldname in collection.object_id_fields: return [str(result) if result is not None else result for result in results] if fieldname in collection.isodate_fields: # List if datetimes. return [datetime_to_datestamp(result) if result is not None else result for result in results] for datefield in collection.isodate_fields: if fieldname.split('.')[0] != datefield.split('.')[0]: # No common base. continue # From this point on, we know `results` is a list of dicts. # The `results`s keys will be relative to `fieldname`. # We need the keys. keys = list(filter(None, datefield.replace(fieldname, '', 1).split('.'))) for result in results: _recurse_and_convert( result, keys, lambda value: datetime_to_datestamp(value) if value is not None else value) return results
# WRITES
[docs] async def insert_or_update_record(self, record): """Insert or update database document by Document Store record. Special method that takes a Document Store record instance as parameter and determines whether to insert or update the given record. Makes a query to MongoDB to determine if the record is already in database. If there is a record, calls the record instance's updates_record method to update the instance with values that are present in database but not in the submitted instance. Afterwards calls :meth:`insert_or_replace` with record instances dictionary representation. :param record: Document Store record instance. :type record: :obj:`kuha_common.document_store.records.Study` or :obj:`kuha_common.document_store.records.Variable` or :obj:`kuha_common.document_store.records.Question` or :obj:`kuha_common.document_store.records.StudyGroup` :returns: operation details: {'operation': 'insert'|'update', 'id': <ObjectID>, <records-unique-values>} :rtype: dict """ rval = {'operation': 'insert'} if isinstance(record, Study): query = { record.study_number.get_name(): record.study_number.get_value() } rval.update({'study_number': record.study_number.get_value()}) elif isinstance(record, Variable): query = { record.study_number.get_name(): record.study_number.get_value(), record.variable_name.get_name(): record.variable_name.get_value() } rval.update({'study_number': record.study_number.get_value(), 'variable_name': record.variable_name.get_value()}) elif isinstance(record, Question): query = { record.study_number.get_name(): record.study_number.get_value(), record.question_identifier.get_name(): record.question_identifier.get_value() } rval.update({'study_number': record.study_number.get_value(), 'question_identifier': record.question_identifier.get_value()}) elif isinstance(record, StudyGroup): query = { record.study_group_identifier.get_name(): record.study_group_identifier.get_value() } rval.update({ 'study_group_identifier': record.study_group_identifier.get_value() }) old_record = await self.query_single(record.get_collection(), query) if old_record: record.updates_record(old_record) rval['operation'] = 'update' inserted_record = await self.insert_or_replace(record.get_collection(), query, record.export_dict()) rval.update({'id': str(inserted_record.get(RecordBase._id.path))}) return rval
[docs] async def bulk_insert_or_update_record(self, records): """Run bulk insert/update operations for Document Store records. Method that takes an iterable parameter yielding Document Store records. Then calls :meth:`insert_or_update_record` with each record instance. :param records: Document Store records. :type records: iterable :returns: list of insert_or_update_record methods operation details. :rtype: list """ ids = [] for record_instance in records: _id = await self.insert_or_update_record(record_instance) ids.append(_id) return ids
[docs] async def insert_json(self, collection_name, json_object): """Insert JSON-encoded document to Database. Special method that takes a JSON object that is then inserted to database. :param collection_name: Name of database collection. :type collection_name: str :param json_object: JSON object representing collection document. :type json_object: str :returns: Insert result. :rtype: :obj:`pymongo.results.InsertOneResult` """ collection = await self._get_collection_by_name(collection_name) collection_document = await collection.process_json_for_upsert(json_object) return await self.insert(collection_name, collection_document)
[docs] async def replace_json(self, collection_name, oid, json_object, not_found_exception): """Replace JSON-encoded document in Database. Special method that replaces a document in database with document given as parameter `json_object`. The document to be replaced is queried by given `oid`. This method also takes a `not_found_exception` as mandatory parameter. The exception is raised if a document with given `oid` cannot be found. :note: if the submitted JSON does not contain metadata for the document. the metadata gets calculated by :meth:`RecordsCollection.process_json_for_upsert` :param collection_name: Name of database collection. :type collection_name: str :param oid: MongoDB object ID as string. :type oid: str :param json_object: JSON object representing collection document. :type json_object: str :param not_found_exception: exception to raise if document is not found with `oid` :type not_found_exception: Exception class. :returns: Update result. :rtype: :obj:`pymongo.results.UpdateResult` """ query = {MDB_FIELDNAME_ID: ObjectId(oid)} old_metadata = await self.query_single(collection_name, query, fields=[RecordBase._metadata.path]) if not old_metadata: raise not_found_exception(context=oid) collection = await self._get_collection_by_name(collection_name) collection_document = await collection.process_json_for_upsert(json_object, old_metadata=old_metadata) return await self.replace(collection_name, oid, collection_document)
# DELETES
[docs] async def delete_by_oid(self, collection_name, oid): """Delete database document with ObjectID. :param collection_name: Name of database collection. :type collection_name: str :param oid: MongoDB object ID as string. :type oid: str :returns: Delete result :rtype: :obj:`pymongo.results.DeleteResult` """ query = {MDB_FIELDNAME_ID: ObjectId(oid)} return await self.delete_one(collection_name, query)