Source code for pdmongo.core

from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union

import pymongo.errors
from pandas import DataFrame
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.results import InsertManyResult
from pymongo.uri_parser import parse_uri


def _get_db_instance(db: Union[str, Database]) -> MongoClient:
    """
    Retrieve the pymongo.database.Database instance.

    Parameters
    ----------
    db: str or pymongo.database.Database
        - if str an instance of pymongo.database.Database will be instantiated and returned
        - if pymongo.database.Database the db instance is returned

    Returns
    -------
    pymongo.database.Database
    """
    if isinstance(db, str):
        db_name = parse_uri(db).get('database')
        if db_name is None:
            # TODO: Improve validation message
            raise ValueError("Invalid db: Could not extract database from uri: %s", db)
        db = MongoClient(db)[db_name]
    return db


def _collection_exists(db: Database, col_name: str) -> bool:
    try:
        db.validate_collection(col_name)
        return True
    except pymongo.errors.OperationFailure:
        return False


def _handle_exists_collection(name: str, exists: Optional[str], db: Database) -> None:
    """
    Handles the `if_exists` argument of `to_mongo`.

    Parameters
    ----------
    if_exists: str
        Can be one of 'fail', 'replace', 'append'
            - fail: A ValueError is raised
            - replace: Collection is deleted before inserting new documents
            - append: Documents are appended to existing collection
    """

    if exists == "fail":
        if _collection_exists(db, name):
            raise ValueError(f"Collection '{name}' already exists.")
        return

    if exists == "replace":
        if _collection_exists(db, name):
            db[name].drop()
        return

    if exists == "append":
        return

    raise ValueError(f"'{exists}' is not valid for if_exists")


def _split_in_chunks(lst: Sequence[Any], chunksize: int) -> Iterator[Sequence[Any]]:
    """
    Splits a list in chunks based on provided chunk size.

    Parameters
    ----------
    lst: list
        The list to split in chunks

    Returns
    -------
    result: generator
    A generator with the chunks
    """
    for i in range(0, len(lst), chunksize):
        yield lst[i:i + chunksize]


def _validate_chunksize(chunksize: int) -> None:
    """
    Raises the proper exception if chunksize is not valid.

    Parameters
    ----------
    chunksize: int
    The chunksize to validate.
    """
    if not isinstance(chunksize, int):
        raise TypeError("Invalid chunksize: Must be an int")
    if not chunksize > 0:
        raise ValueError("Invalid chunksize: Must be > 0")


[docs]def read_mongo( collection: str, query: List[Dict[str, Any]], db: Union[str, Database], index_col: Optional[Union[str, List[str]]] = None, extra: Optional[Dict[str, Any]] = None, chunksize: Optional[int] = None ) -> DataFrame: """ Read MongoDB query into a DataFrame. Returns a DataFrame corresponding to the result set of the query. Optionally provide an `index_col` parameter to use one of the columns as the index, otherwise default integer index will be used. Parameters ---------- collection : str Mongo collection to select for querying query : list Must be an aggregate query. The input will be passed to pymongo `.aggregate` db : pymongo.database.Database or database string URI The database to use index_col : str or list of str, optional, default: None Column(s) to set as index(MultiIndex). extra : dict, optional, default: None List of parameters to pass to aggregate method. chunksize : int, default None If specified, return an iterator where `chunksize` is the number of docs to include in each chunk. Returns ------- Dataframe """ params = {} if chunksize is not None: _validate_chunksize(chunksize) params['batchSize'] = chunksize db = _get_db_instance(db) if extra is None: extra = {} if extra.get('batchSize') is not None: if chunksize is not None: raise ValueError("Either chunksize or batchSize must be provided, not both") return DataFrame.from_records( db[collection].aggregate(query, **{**params, **extra}), index=index_col)
[docs]def to_mongo( frame: DataFrame, name: str, db: Union[str, Database], if_exists: Optional[str] = "fail", index: Optional[bool] = True, index_label: Optional[Union[str, Sequence[str]]] = None, chunksize: Optional[int] = None, ) -> Union[List[InsertManyResult], InsertManyResult]: """ Write records stored in a DataFrame to a MongoDB collection. Parameters ---------- frame : DataFrame, Series name : str Name of collection. db : pymongo.database.Database or database string URI The database to write to if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. index : boolean, default True Write DataFrame index as a column. index_label : str or sequence, optional Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. chunksize : int, optional Specify the number of rows in each batch to be written at a time. By default, all rows will be written at once. """ db = _get_db_instance(db) _handle_exists_collection(name, if_exists, db) records = frame.to_dict('records') if index is True: idx = frame.index idx_name = idx.name idx_data = idx.tolist() for i, record in enumerate(records): if index_label is None and idx_name is not None: record[idx_name] = idx_data[i] if chunksize is not None: _validate_chunksize(chunksize) result_insert_many = [] for chunk in _split_in_chunks(records, chunksize): result_insert_many.append(db[name].insert_many(chunk)) return result_insert_many return db[name].insert_many(records)