Source code for kuha_document_store.database

#!/usr/bin/env python3
# Author(s): Toni Sissala
# Copyright 2020 Finnish Social Science Data Archive FSD / University of Tampere
# Licensed under the EUPL. See LICENSE.txt for full license.
"""Database module provides access to MongoDB database.

MongoDB Database is accessed throught this module.
The module also provides convenience methods for easy access
and manipulation via Document Store records defined in
:mod:`kuha_common.document_store.records`

Database can be used directly, via records or with
JSON representation of records.

:note: This module has strict dependency to
       :mod:`kuha_common.document_store.records`
"""
# stdlib
import logging
import json
from urllib.parse import quote_plus
from collections import namedtuple
# 3rd party libs
from motor.motor_tornado import MotorClient
# Motor uses pymongo under the hood, and raises pymongo errors
import pymongo.errors
from pymongo import (
    ReturnDocument,
    DESCENDING,
    ASCENDING
)
import bson.json_util
from bson.objectid import ObjectId
import bson.errors
import dateutil.parser
# kuha_common
from kuha_common.document_store.records import (
    Study,
    Variable,
    Question,
    StudyGroup,
    RecordBase,
    datetime_to_datestamp,
    datestamp_to_datetime,
    datetime_now,
    dig_and_set,
    path_split,
    record_by_collection,
    SEPARATOR_PATH,
    REC_STATUS_DELETED,
    REC_STATUS_CREATED
)
from kuha_common.document_store.constants import (
    MDB_ISODATE,
    MDB_TYPE_DATE,
    MDB_TYPE_NULL,
    MDB_TYPE_STRING,
    MDB_NOT_EQUAL,
    MDB_SET,
    MDB_FIELDNAME_ID
)
# self
from kuha_document_store import validation


_logger = logging.getLogger(__name__)

# COLLECTION properties
_COMMON_ISODATE_FIELDS = [
    RecordBase._metadata.attr_updated.path,
    RecordBase._metadata.attr_deleted.path,
    RecordBase._metadata.attr_created.path
]
_COMMON_INDEXES = [[(RecordBase._metadata.attr_updated.path, DESCENDING)]]
_COMMON_OBJECTID_FIELDS = [RecordBase._id.path]

Collection = namedtuple('Collection', ['name', 'validators', 'indexes_unique',
                                       'indexes', 'isodate_fields', 'object_id_fields'])


def _collection_validator(collection_name, record_class, required=None):
    required = required or []
    required.extend([attr.path for attr in [
        record_class._metadata.attr_created,
        record_class._metadata.attr_updated,
        record_class._metadata.attr_deleted,
        record_class._metadata.attr_cmm_type,
        record_class._metadata.attr_schema_version,
        record_class._metadata.attr_status
    ]])
    properties = {
        record_class._metadata.attr_created.path: {
            'bsonType': MDB_TYPE_DATE,
            'description': 'Must be date and is required'
        },
        record_class._metadata.attr_updated.path: {
            'bsonType': MDB_TYPE_DATE,
            'description': 'Must be date and is required'
        },
        record_class._metadata.attr_deleted.path: {
            'bsonType': [MDB_TYPE_DATE, MDB_TYPE_NULL],
            'description': 'Must be date or null and is required'
        },
        # schema version should be a string since it needs to allow
        # values such as '1.0', '1.1.1.' etc.
        # It should be checked on application-level to allow schema
        # migrations without the need to rebuild the DB schema.
        record_class._metadata.attr_schema_version.path: {
            'bsonType': MDB_TYPE_STRING,
            'description': 'Must be string and is required'
        },
        record_class._metadata.attr_status.path: {
            'bsonType': MDB_TYPE_STRING,
            'description': 'Must be string and is required'
        },
        record_class._metadata.attr_cmm_type.path: {
            'bsonType': MDB_TYPE_STRING,
            'pattern': "^{cmm_type}$".format(cmm_type=record_class.cmm_type),
            'description': 'Fixed string %s and is required' % (record_class.cmm_type,)
        }
    }
    return {
        '$jsonSchema': {
            'bsonType': 'object',
            'required': required,
            'properties': properties
        }
    }


def _init_collection(name, validators, indexes_unique):
    return Collection(name=name, validators=validators, indexes_unique=indexes_unique,
                      indexes=list(_COMMON_INDEXES), isodate_fields=list(_COMMON_ISODATE_FIELDS),
                      object_id_fields=list(_COMMON_OBJECTID_FIELDS))


def studies_collection():
    coll = Study.get_collection()
    validators = _collection_validator(coll, Study, required=[Study.study_number.path])
    indexes_unique = [[(Study.study_number.path, DESCENDING)]]
    return _init_collection(coll, validators, indexes_unique)


def variables_collection():
    coll = Variable.get_collection()
    validators = _collection_validator(coll, Variable, required=[Variable.study_number.path,
                                                                 Variable.variable_name.path])
    indexes_unique = [[(Variable.study_number.path, ASCENDING),
                       (Variable.variable_name.path, DESCENDING)]]
    return _init_collection(coll, validators, indexes_unique)


def questions_collection():
    coll = Question.get_collection()
    validators = _collection_validator(coll, Question, required=[Question.study_number.path,
                                                                 Question.question_identifier.path])
    indexes_unique = [[(Question.study_number.path, ASCENDING),
                       (Question.question_identifier.path, ASCENDING)]]
    return _init_collection(coll, validators, indexes_unique)


def study_groups_collection():
    coll = StudyGroup.get_collection()
    validators = _collection_validator(coll, StudyGroup, required=[StudyGroup.study_group_identifier.path])
    indexes_unique = [[(StudyGroup.study_group_identifier.path, DESCENDING)]]
    return _init_collection(coll, validators, indexes_unique)


def iter_collections():
    for coll in (studies_collection(), variables_collection(), questions_collection(),
                 study_groups_collection()):
        yield coll


[docs]def mongodburi(host_port, *hosts_ports, database=None, credentials=None, options=None): """Create and return a mongodb connection string in the form of a MongoURI. The standard URI connection scheme has the form: mongodb://[username:password@]host1[:port1][,...hostN[:portN]]][/[database][?options]] - https://docs.mongodb.com/manual/reference/connection-string/ :param str host_port: One of more host and port of a mongod instance. :param str database: Optional database. :param tuple credentials: Options credentials (user, pwd). :param list options: Optional options as a list of tuples [(opt_key1, opt_val1), (opt_key2, opt_val2)] :returns: MongoURI connection string. :rtype: str """ template = 'mongodb://{credentials}{replicas}{database}{options}' database = '/{}'.format(database or '') if database or options else '' credentials = '%s:%s@' % (credentials[0], quote_plus(credentials[1])) if credentials else '' options = '?' + '&'.join(['%s=%s' % x for x in options]) if options else '' replicas = ','.join((host_port,) + hosts_ports) return template.format(credentials=credentials, replicas=replicas, database=database, options=options)
def _isodate_hook(dict_): # Mongodb represenation supports since-epoch-style dates. # we wish to use isodates if MDB_ISODATE in dict_: return dateutil.parser.parse(dict_[MDB_ISODATE]) return bson.json_util.object_hook(dict_) def _json_decode(json_object, pre_decode='utf-8'): if pre_decode: json_object = json_object.decode(pre_decode) return json.loads(json_object, object_hook=_isodate_hook) def _encode_isodates(isodate_fields, _dict): """In-place conversion of isodate fields. Convert Python datetime objects to datestamps. Allow None values. :param list isodate_fields: Fieldpaths containing isodates. :param dict _dict: Document to convert. :returns: None """ for path in isodate_fields: dig_and_set(_dict, path, datetime_to_datestamp, allow_none=True) def _decode_isodates(isodate_fields, _dict): """In-place conversion of isodate fields. Convert datestamps to Python datetime objects. Allow None values. :param list isodate_fields: Fieldpaths containing isodates. :param dict _dict: Document to convert. :returns: None """ for path in isodate_fields: dig_and_set(_dict, path, datestamp_to_datetime, allow_none=True)
[docs]def bson_to_json(collection, _dict): """Encode BSON dictionary to JSON. Encodes special type of dictionary that comes from MongoDB queries to JSON representation. Also converts datetimes to strings. :param _dict: Source object containing BSON. :type _dict: dict :returns: Source object converted to JSON. :rtype: str """ _encode_isodates(collection.isodate_fields, _dict) return bson.json_util.dumps(_dict)
[docs]class Database: """MongoDB database. Provides access to low-level database operations. For fine access control uses two database credentials, one for read-only operations, one for write operations. Chooses the correct credentials to authenticate based on the operation to be performed. :note: Does not authenticate or connect to the database before actually performing operations that need connecting. Therefore connection/authentication issues will raise when performing operations and not when initiating the database. :param settings: settings for database connections :type settings: :obj:`argparse.Namespace` :returns: :obj:`Database` """ def __init__(self, name, reader_uri, editor_uri): self._name = name self.__reader_uri = reader_uri self.__editor_uri = editor_uri self._reader_client = None self._editor_client = None async def _for_editing(self, collection): if self._editor_client is None: self._editor_client = MotorClient(self.__editor_uri) self.__editor_uri = None return self._editor_client[self._name][collection] async def _for_reading(self, collection): if self._reader_client is None: self._reader_client = MotorClient(self.__reader_uri) self.__reader_uri = None return self._reader_client[self._name][collection]
[docs] def close(self): """Close open sockets to database. """ if self._reader_client: self._reader_client.close() self._reader_client = None if self._editor_client: self._editor_client.close() self._editor_client = None
@staticmethod async def oid_filter(oid): return {MDB_FIELDNAME_ID: ObjectId(oid)} # READ OPERATIONS
[docs] async def query_single(self, collection_name, query, fields=None, callback=None): """Query for a single database document. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :param fields: Fields to select. None selects all. :type fields: list or None :param callback: Result callback. Called with result as parameter. If None this method will return the result. :type callback: function or None :returns: A single document or None if no matching document is found. or if callback is given. :rtype: dict or None """ collection = await self._for_reading(collection_name) result = await collection.find_one(query, projection=fields) if callback: await callback(result) else: return result
[docs] async def query_multiple(self, collection_name, query, callback, fields=None, skip=0, sort_by=None, sort_order=1, limit=0): """Query for multiple database documents. :note: has mandatory callback parameter. :param str collection_name: Name of database collection. :param dict query: Database query filter. :param callable callback: Result callback. Called with each document as parameter. :param list or None fields: Fields to select. None selects all. :param int skip: Skip documents from the beginning of query. :param str sort_by: Sort by field. :param int sort_order: Sort by ascending or descending order. MongoDB users 1 to sort ascending -1 to sort descending. :param int limit: Limit the number of returning documents. 0 returns all documents. """ _logger.debug("Executing select-query with params: collection='%s', filter='%s', " "fields='%s, skip='%s', sort_by='%s', sort_order='%s', limit='%s'", collection_name, query, fields, skip, sort_by, sort_order, limit) collection = await self._for_reading(collection_name) cursor = collection.find(query, projection=fields, skip=skip, limit=limit) if sort_by: cursor = cursor.sort(sort_by, sort_order) async for result in cursor: await callback(result)
[docs] async def query_distinct(self, collection_name, fieldname, filter_=None): """Query for distinct values in collection field. :param collection_name: Name of database collection. :type collection_name: str :param fieldname: Field to query for distinct values. :type fieldname: str :param filter_: Optional filter to use with query. :type filter_: dict or None :returns: distinct values. :rtype: list """ collection = await self._for_reading(collection_name) return await collection.distinct(fieldname, filter=filter_)
[docs] async def count(self, collection_name, filter_=None): """Query for document count. :param collection_name: Name of database collection. :type collection_name: str :param filter_: Optional filter to use for query. :type filter_: dict or None :returns: Count of documents. :rtype: int """ filter_ = filter_ or {} collection = await self._for_reading(collection_name) return await collection.count_documents(filter=filter_)
# WRITE OPERATIONS
[docs] async def insert(self, collection_name, document): """Insert single document to database. :param collection_name: Name of database collection. :type collection_name: str :param document: Document to insert. :type document: dict :returns: Insert result :rtype: :obj:`pymongo.results.InsertOneResult` """ collection = await self._for_editing(collection_name) _logger.debug("Insert '%s' to collection '%s'", document, collection_name) return await collection.insert_one(document)
[docs] async def replace(self, collection_name, oid, document): """Replace single document in database. :param collection_name: Name of database collection. :type collection_name: str :param oid: MongoDB object ID as string. :type oid: str :param document: Document to store. :type document: dict :returns: Update result. :rtype: :obj:`pymongo.results.UpdateResult` """ collection = await self._for_editing(collection_name) return await collection.replace_one(await self.oid_filter(oid), document)
[docs] async def insert_or_replace(self, collection_name, query, document): """Insert or replace a single document in database. Uses special MongoDB method which will replace an existing document if one is found via query. Otherwise it will insert a new document. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :param document: Document to store. :type document: dict :returns: The document that was stored. :rtype: dict """ collection = await self._for_editing(collection_name) result = await collection.find_one_and_replace( query, document, upsert=True, return_document=ReturnDocument.AFTER ) return result
[docs] async def update(self, collection_name, filter_, update_operations): # TODO Write docstring """Update documents in collection matching filter. """ collection = await self._for_editing(collection_name) _logger.debug("Update '%s' with filter: '%s'", collection_name, filter_) result = await collection.update_many(filter_, update_operations) return result.modified_count
# DELETE OPERATIONS
[docs] async def delete(self, collection_name, filter_): """Delete documents matching filter. :param str collection_name: Name of database collection. :param dict query: Database query. :returns: Deleted count :rtype: int """ collection = await self._for_editing(collection_name) _logger.debug("Delete %s with filter: %s", collection_name, filter_) result = await collection.delete_many(filter_) return result.deleted_count
[docs]class DocumentStoreDatabase(Database): """Subclass of :class:`Database` Provides specialized methods extending the functionality of :class:`Database`. Combines database operations with properties of RecordsCollection. Defines exceptions that, when raised, the HTTP-response operation can continue. """ #: These are exceptions that may be raised #: in normal database operation, so they are not #: exceptions that should terminate the HTTP-response #: process. As such, the caller may want to catch these errors. recoverable_errors = ( pymongo.errors.WriteError, json.decoder.JSONDecodeError, bson.errors.InvalidId, validation.RecordValidationError ) def __init__(self, collections, **kwargs): self._validation_schemas = {} self._collections = collections for coll in collections: self._validation_schemas.update({coll.name: None}) super().__init__(**kwargs)
[docs] @staticmethod def json_decode(json_object): """Helper method for converting HTTP input JSON to python dictionary. :param json_object: json to convert. :type json_object: str :returns: JSON object converted to python dictionary. :rtype: dict """ return _json_decode(json_object)
@staticmethod def _wrap_callback(collection, callback=None, not_found_exception=None): async def _callback(result): if not result: if not_found_exception: raise not_found_exception() return json_chunk = bson_to_json(collection, result) if callback: await callback(json_chunk) else: return json_chunk return _callback async def _get_collection_by_name(self, name): for coll in self._collections: if coll.name == name: return coll raise Exception("No such collection: {}".format(name)) @staticmethod async def _prepare_validation_schema(rec_class): collection_name = rec_class.get_collection() md_schema = { **validation.default_schema_item(rec_class._metadata.attr_created.name), **validation.default_schema_item(rec_class._metadata.attr_updated.name), **validation.default_schema_item(rec_class._metadata.attr_deleted.name, nullable=True), **validation.str_enum_item(rec_class._metadata.attr_status.name, [REC_STATUS_DELETED, REC_STATUS_CREATED]), **validation.str_enum_item(rec_class._metadata.attr_schema_version.name, [rec_class.schema_version]), **validation.str_enum_item(rec_class._metadata.attr_cmm_type.name, [rec_class.cmm_type])} base_schema = {**validation.dict_schema_item(rec_class._metadata.path, md_schema)} if collection_name == Study.get_collection(): schema = validation.RecordValidationSchema( Study, base_schema, validation.identifier_schema_item(Study.study_number.path), validation.uniquelist_schema_item(Study.persistent_identifiers.path), validation.bool_schema_item(Study.universes.attr_included.path) ) elif collection_name == Variable.get_collection(): schema = validation.RecordValidationSchema( Variable, base_schema, validation.identifier_schema_item(Variable.variable_name.path), validation.identifier_schema_item(Variable.study_number.path), validation.bool_schema_item(Variable.codelist_codes.attr_missing.path), validation.uniquelist_schema_item(Variable.question_identifiers.path) ) elif collection_name == Question.get_collection(): schema = validation.RecordValidationSchema( Question, base_schema, validation.identifier_schema_item(Question.study_number.path), validation.identifier_schema_item(Question.question_identifier.path), validation.default_schema_item(Question.variable_name.path, nullable=True) ) elif collection_name == StudyGroup.get_collection(): schema = validation.RecordValidationSchema( StudyGroup, base_schema, validation.identifier_schema_item(StudyGroup.study_group_identifier.path), validation.uniquelist_study_numbers_schema_item(StudyGroup.study_numbers.path) ) return schema async def _get_validation_schema(self, rec_class): # If this raises KeyError, then the Database does not support such collection. collection_name = rec_class.get_collection() if self._validation_schemas[collection_name] is None: self._validation_schemas[collection_name] = await self._prepare_validation_schema(rec_class) return self._validation_schemas[collection_name] @staticmethod def _get_record_by_collection_name(name): return record_by_collection(name) async def _process_json_for_upsert(self, collection, json_document, old_metadata=None): update = bool(old_metadata) doc = _json_decode(json_document) rec_class = self._get_record_by_collection_name(collection.name) schema = await self._get_validation_schema(rec_class) validation.validate(schema, doc, update=update) _decode_isodates(collection.isodate_fields, doc) if rec_class._metadata.path not in doc: # Only change metadata if it is now submitted directly. if update: old_metadata[rec_class._metadata.path][rec_class._metadata.attr_updated.name] = datetime_now() doc.update({ rec_class._metadata.path: old_metadata[rec_class._metadata.path] }) else: record = rec_class() doc.update(record.export_metadata_dict(as_datestamps=False)) return doc # QUERIES
[docs] async def query_multiple(self, collection_name, query, callback, **kwargs): r"""Query multiple documents with callback. Converts resulting BSON to JSON. Calls callback with each resulting record JSON. :param collection_name: Name of database collection. :type collection_name: str :param query: Database query. :type query: dict :param callback: Result callback. Called with each document as parameter. :type callback: function :param \*\*kwargs: additional keyword arguments passed to super method. """ coll = await self._get_collection_by_name(collection_name) _call = self._wrap_callback(coll, callback) await super().query_multiple(collection_name, query, _call, **kwargs)
[docs] async def query_by_oid(self, collection_name, oid, callback, fields=None, not_found_exception=None): """Query single record by ObjectID with callback. Converts BSON result to JSON. Calls the callback with resulting JSON. If parameter for `not_found_exception` is given, will raise the exception if query ObjectID points to no known database object. :param collection_name: Name of database collection. :type collection_name: str :param oid: ObjectID to query for. :type oid: str :param callback: function to call with resulting JSON. :type callback: function :param fields: Fields to select. None selects all. :type fields: list or None :param not_found_exception: Raised if ObjectID not found. :type not_found_exception: Exception class. """ query = await self.oid_filter(oid) coll = await self._get_collection_by_name(collection_name) _call = self._wrap_callback(coll, callback, not_found_exception) await self.query_single(collection_name, query, fields=fields, callback=_call)
[docs] async def query_distinct(self, collection_name, fieldname, filter_=None): """Query for distinct values in collection field. If `fieldname` points to a leaf node, returns a list of values, if it points to a branch node, returns a list of dictionaries. If `fieldname` points to leaf node of isodate representations, or to branch node that contains isodates, converts datetimes to datestamps which are JSON serializable. If 'fieldname' points to a leaf node containing MongoDB ObjectID values, cast those values to string. :note: Requires changes to logic if collection.object_id_fields should contain paths with multiple components, for example 'some.path.with.id'. In that case distinct queries that point to brach nodes with OIDs will fail with Exception TypeError: ObjectId('...') is not JSON serializable. :note: Distinction will not work as expected on datestamp-fields that are stored as signed 64-bit integers with millisecond precision. The returned datestamps are not as precise since they have second precision. :param collection_name: Name of database collection. :type collection_name: str :param fieldname: Field to query for distinct values. :type fieldname: str :param filter_: Optional filter to use with query. :type filter_: dict or None :returns: distinct values from database :rtype: list """ results = await super().query_distinct(collection_name, fieldname, filter_=filter_) # results may be a list of dictionaries or list of values, or an empty list. if results == []: # Return empty list. return results # From this point on, we know that there are results. # it also implies that: # * fieldname is a valid path. However, it may not be a path that leads to a leaf node. # * len(collection.isodate_fields.split('.')) >= len(fieldname.split('.')) collection = await self._get_collection_by_name(collection_name) # It is assumed that collection.object_id_fields does not contain paths with multiple components. # This will fail it it does. if fieldname in collection.object_id_fields: return [str(result) if result is not None else result for result in results] if fieldname in collection.isodate_fields: # List if datetimes. return [datetime_to_datestamp(result) if result is not None else result for result in results] for datefield in collection.isodate_fields: if path_split(fieldname)[0] != path_split(datefield)[0]: # No common base. continue # From this point on, we know `results` is a list of dicts. # The `results`s keys will be relative to `fieldname`. # We need the relative path. path = datefield.replace(fieldname + SEPARATOR_PATH, '', 1) dig_and_set(results, path, datetime_to_datestamp, allow_none=True) return results
# WRITES
[docs] async def insert_json(self, collection_name, json_object): """Insert JSON-encoded document to Database. Special method that takes a JSON object that is then inserted to database. :param collection_name: Name of database collection. :type collection_name: str :param json_object: JSON object representing collection document. :type json_object: str :returns: Insert result. :rtype: :obj:`pymongo.results.InsertOneResult` """ collection = await self._get_collection_by_name(collection_name) document = await self._process_json_for_upsert(collection, json_object) return await self.insert(collection_name, document)
[docs] async def replace_json(self, collection_name, oid, json_object, not_found_exception): """Replace JSON-encoded document in Database. Special method that replaces a document in database with document given as parameter `json_object`. The document to be replaced is queried by given `oid`. This method also takes a `not_found_exception` as mandatory parameter. The exception is raised if a document with given `oid` cannot be found. :note: if the submitted JSON does not contain metadata for the document. the metadata gets calculated by :meth:`RecordsCollection.process_json_for_upsert` :param collection_name: Name of database collection. :type collection_name: str :param oid: MongoDB object ID as string. :type oid: str :param json_object: JSON object representing collection document. :type json_object: str :param not_found_exception: exception to raise if document is not found with `oid` :type not_found_exception: Exception class. :returns: Update result. :rtype: :obj:`pymongo.results.UpdateResult` """ query = await self.oid_filter(oid) old_metadata = await self.query_single(collection_name, query, fields=[RecordBase._metadata.path]) if not old_metadata: raise not_found_exception(context=oid) collection = await self._get_collection_by_name(collection_name) collection_document = await self._process_json_for_upsert(collection, json_object, old_metadata=old_metadata) return await self.replace(collection_name, oid, collection_document)
# DELETES
[docs] async def delete_records(self, collection_name, oid=None, hard_delete=False): """Delete database documents. :param str collection_name: Name of database collection. :param str oid: MongoDB object ID as string. :param bool hard_delete: True to physically delete document. False to logically mark the document deleted. :returns: Affected records' count :rtype: int """ rec = self._get_record_by_collection_name(collection_name) filter_ = {rec._id.path: ObjectId(oid)} if oid is not None else {} if hard_delete: return await self.delete(collection_name, filter_) timestamp = datetime_now() filter_.update({rec._metadata.attr_status.path: {MDB_NOT_EQUAL: REC_STATUS_DELETED}}) update_ops = ({MDB_SET: {rec._metadata.attr_status.path: REC_STATUS_DELETED, rec._metadata.attr_updated.path: timestamp, rec._metadata.attr_deleted.path: timestamp}}) return await self.update(collection_name, filter_, update_ops)
[docs]def db_from_settings(settings): """Instantiate DocumentStoreDatabase from loaded settings :param :obj:`argparse.Namespace` settings: loaded settings :returns: Instance of DocumentStoreDatabase :rtype: :obj:`DocumentStoreDatabase` """ options = None if settings.replicaset == '' else [('replicaSet', settings.replicaset)] reader_uri = mongodburi(*settings.replica, database=settings.database_name, credentials=(settings.database_user_reader, settings.database_pass_reader), options=options) editor_uri = mongodburi(*settings.replica, database=settings.database_name, credentials=(settings.database_user_editor, settings.database_pass_editor), options=options) return DocumentStoreDatabase(collections=list(iter_collections()), name=settings.database_name, reader_uri=reader_uri, editor_uri=editor_uri)
[docs]def add_cli_args(parser): """Add database configuration values to be parsed. """ parser.add('--replica', help='MongoDB replica replica host + port. Repeat for multiple replicas. ' 'For example: localhost:27017', env_var='DS_DBREPLICAS', action='append', required=True, type=str) parser.add('--replicaset', help='MongoDB replica set name', env_var='DS_DBREPLICASET', default='rs_kuha', type=str) parser.add('--database-name', help='name of Kuha document store database', default='kuha_document_store', env_var='DS_DBNAME', type=str) parser.add('--database-user-reader', help='Username for reading from the database', default='reader', env_var='DS_DBUSER_READER', type=str) parser.add('--database-pass-reader', help='Password for database-user-reader', default='reader', env_var='DS_DBPASS_READER', type=str) parser.add('--database-user-editor', help='Username for editing the database', default='editor', env_var='DS_DBUSER_EDITOR', type=str) parser.add('--database-pass-editor', help='Password for database-user-editor', default='editor', env_var='DS_DBPASS_EDITOR', type=str)