Module nitric.api.documents

Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import List, AsyncIterator, Union, Any, Tuple

from grpclib import GRPCError

from nitric.api.const import MAX_SUB_COLLECTION_DEPTH
from nitric.exception import exception_from_grpc_error
from nitric.proto.nitric.document.v1 import (
    DocumentServiceStub,
    Collection as CollectionMessage,
    Key as KeyMessage,
    Expression as ExpressionMessage,
    ExpressionValue,
    Document as DocumentMessage,
    DocumentSetRequest,
    DocumentGetRequest,
    DocumentDeleteRequest,
    DocumentQueryStreamRequest,
    DocumentQueryRequest,
)

from nitric.utils import new_default_channel, _dict_from_struct, _struct_from_dict

NIL_DOC_ID = ""


class CollectionDepthException(Exception):
    """The max depth of document sub-collections has been exceeded."""

    pass


@dataclass(frozen=True, order=True)
class DocumentRef:
    """A reference to a document in a collection."""

    _documents: Documents
    parent: CollectionRef
    id: str

    def collection(self, name: str) -> CollectionRef:
        """
        Return a reference to a sub-collection of this document.

        This is currently only supported to one level of depth.
        e.g. Documents().collection('a').doc('b').collection('c').doc('d') is valid,
        Documents().collection('a').doc('b').collection('c').doc('d').collection('e') is invalid (1 level too deep).
        """
        current_depth = self.parent.sub_collection_depth()
        if current_depth >= MAX_SUB_COLLECTION_DEPTH:
            # Collection nesting is only supported to a maximum depth.
            raise CollectionDepthException(
                f"sub-collections supported to a depth of {MAX_SUB_COLLECTION_DEPTH}, "
                f"attempted to create new collection with depth {current_depth + 1}"
            )
        return CollectionRef(_documents=self._documents, name=name, parent=self)

    async def get(self) -> Document:
        """Retrieve the contents of this document, if it exists."""
        try:
            response = await self._documents._stub.get(
                document_get_request=DocumentGetRequest(key=_doc_ref_to_wire(self))
            )
            return _document_from_wire(documents=self._documents, message=response.document)
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def set(self, content: dict):
        """
        Set the contents of this document.

        If the document exists it will be updated, otherwise a new document will be created.
        """
        try:
            await self._documents._stub.set(
                document_set_request=DocumentSetRequest(
                    key=_doc_ref_to_wire(self),
                    content=_struct_from_dict(content),
                )
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def delete(self):
        """Delete this document, if it exists."""
        try:
            await self._documents._stub.delete(
                document_delete_request=DocumentDeleteRequest(
                    key=_doc_ref_to_wire(self),
                )
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)


def _document_from_wire(documents: Documents, message: DocumentMessage) -> Document:
    ref = _doc_ref_from_wire(documents=documents, message=message.key)

    return Document(
        _ref=ref,
        content=_dict_from_struct(message.content),
    )


def _doc_ref_to_wire(ref: DocumentRef) -> KeyMessage:
    return KeyMessage(id=ref.id, collection=_collection_to_wire(ref.parent))


def _doc_ref_from_wire(documents: Documents, message: KeyMessage) -> DocumentRef:
    return DocumentRef(
        _documents=documents,
        id=message.id,
        parent=_collection_from_wire(documents=documents, message=message.collection),
    )


def _collection_to_wire(ref: CollectionRef) -> CollectionMessage:
    if ref.is_sub_collection():
        return CollectionMessage(name=ref.name, parent=_doc_ref_to_wire(ref.parent) if ref.parent else None)
    return CollectionMessage(name=ref.name)


def _collection_from_wire(documents: Documents, message: CollectionMessage) -> CollectionRef:
    return CollectionRef(
        _documents=documents,
        name=message.name,
        parent=_doc_ref_from_wire(documents=documents, message=message.parent) if message.parent else None,
    )


@dataclass(frozen=True, order=True)
class CollectionRef:
    """A reference to a collection of documents."""

    _documents: Documents
    name: str
    parent: Union[DocumentRef, None] = field(default_factory=lambda: None)

    def doc(self, doc_id: str) -> DocumentRef:
        """Return a reference to a document in the collection."""
        return DocumentRef(_documents=self._documents, parent=self, id=doc_id)

    def collection(self, name: str) -> CollectionGroupRef:
        """
        Return a reference to a sub-collection of this document.

        This is currently only supported to one level of depth.
        e.g. Documents().collection('a').collection('b').doc('c') is valid,
        Documents().collection('a').doc('b').collection('c').collection('d') is invalid (1 level too deep).
        """
        current_depth = self.sub_collection_depth()
        if current_depth >= MAX_SUB_COLLECTION_DEPTH:
            # Collection nesting is only supported to a maximum depth.
            raise CollectionDepthException(
                f"sub-collections supported to a depth of {MAX_SUB_COLLECTION_DEPTH}, "
                f"attempted to create new collection with depth {current_depth + 1}"
            )
        return CollectionGroupRef(_documents=self._documents, name=name, parent=self)

    def query(
        self,
        paging_token: Any = None,
        limit: int = 0,
        expressions: Union[Expression, List[Expression]] = None,
    ) -> QueryBuilder:
        """Return a query builder scoped to this collection."""
        return QueryBuilder(
            documents=self._documents,
            collection=self,
            paging_token=paging_token,
            limit=limit,
            expressions=[expressions] if isinstance(expressions, Expression) else expressions,
        )

    def sub_collection_depth(self) -> int:
        """Return the depth of this collection, which is a count of the parents above this collection."""
        if not self.is_sub_collection():
            return 0
        else:
            return self.parent.parent.sub_collection_depth() + 1

    def is_sub_collection(self):
        """Return True if this collection is a sub-collection of a document in another collection."""
        return self.parent is not None


@dataclass(frozen=True, order=True)
class CollectionGroupRef:
    """A reference to a collection group."""

    _documents: Documents
    name: str
    parent: Union[CollectionRef, None] = field(default_factory=lambda: None)

    def query(
        self,
        paging_token: Any = None,
        limit: int = 0,
        expressions: Union[Expression, List[Expression]] = None,
    ) -> QueryBuilder:
        """Return a query builder scoped to this collection."""
        return QueryBuilder(
            documents=self._documents,
            collection=self.to_collection_ref(),
            paging_token=paging_token,
            limit=limit,
            expressions=[expressions] if isinstance(expressions, Expression) else expressions,
        )

    def sub_collection_depth(self) -> int:
        """Return the depth of this collection group, which is a count of the parents above this collection."""
        if not self.is_sub_collection():
            return 0
        else:
            return self.parent.sub_collection_depth() + 1

    def is_sub_collection(self):
        """Return True if this collection is a sub-collection of a document in another collection."""
        return self.parent is not None

    def to_collection_ref(self):
        """Return this collection group as a collection ref."""
        return CollectionRef(
            self._documents,
            self.name,
            DocumentRef(
                self._documents,
                self.parent,
                NIL_DOC_ID,
            ),
        )

    @staticmethod
    def from_collection_ref(collectionRef: CollectionRef, documents: Documents) -> CollectionGroupRef:
        """Return a collection ref as a collection group."""
        if collectionRef.parent is not None:
            return CollectionGroupRef(
                documents,
                collectionRef.name,
                CollectionGroupRef.from_collection_ref(
                    collectionRef.parent,
                    documents,
                ),
            )


class Operator(Enum):
    """Valid query expression operators."""

    less_than = "<"
    greater_than = ">"
    less_than_or_equal = "<="
    greater_than_or_equal = ">="
    equals = "=="
    starts_with = "startsWith"


class _ExpressionBuilder:
    """Builder for creating query expressions using magic methods."""

    def __init__(self, operand):
        self._operand = operand

    def __eq__(self, other) -> Expression:
        return Expression(self._operand, Operator.equals, other)

    def __lt__(self, other) -> Expression:
        return Expression(self._operand, Operator.less_than, other)

    def __le__(self, other) -> Expression:
        return Expression(self._operand, Operator.less_than_or_equal, other)

    def __gt__(self, other) -> Expression:
        return Expression(self._operand, Operator.greater_than, other)

    def __ge__(self, other) -> Expression:
        return Expression(self._operand, Operator.greater_than_or_equal, other)

    def eq(self, other) -> Expression:
        return self == other

    def lt(self, other) -> Expression:
        return self < other

    def le(self, other) -> Expression:
        return self <= other

    def gt(self, other) -> Expression:
        return self > other

    def ge(self, other) -> Expression:
        return self >= other

    def starts_with(self, match) -> Expression:
        return Expression(self._operand, Operator.starts_with, match)


def condition(name: str) -> _ExpressionBuilder:
    """
    Construct a query expressions builder, for convenience.

    Expression builders in turn provides magic methods for constructing expressions.

    e.g. prop('first_name') == 'john' is equivalent to Expression('first_name, '=', 'john')

    Supported operations are ==, <, >, <=, >=, .starts_with()
    """
    return _ExpressionBuilder(operand=name)


@dataclass(order=True)
class Expression:
    """Query expressions, representing a boolean operation used for query filters."""

    operand: str
    operator: Union[Operator, str]
    value: Union[str, int, float, bool]

    def __post_init__(self):
        if isinstance(self.operator, str):
            # Convert string operators to their enum values
            self.operator = Operator(self.operator)

    def _value_to_expression_value(self):
        """Return an ExpressionValue message representation of the value of this expression."""
        if isinstance(self.value, str):
            return ExpressionValue(string_value=self.value)
        # Check bool before numbers, because booleans are numbers.
        if isinstance(self.value, bool):
            return ExpressionValue(bool_value=self.value)
        if isinstance(self.value, int):
            return ExpressionValue(int_value=self.value)
        if isinstance(self.value, float):
            return ExpressionValue(double_value=self.value)

    def _to_wire(self):
        """Return the Expression protobuf message representation of this expression."""
        return ExpressionMessage(
            operand=self.operand,
            operator=self.operator.value,
            value=self._value_to_expression_value(),
        )

    def __str__(self):
        return "{0} {1} {2}".format(self.operand, self.operator.name, self.value)


@dataclass(frozen=True, order=True)
class Document:
    """Represents a document and any associated metadata."""

    _ref: DocumentRef
    content: dict

    @property
    def id(self):
        """Return the document's unique id."""
        return self._ref.id

    @property
    def collection(self) -> CollectionRef:
        """Return the CollectionRef for the collection that contains this document."""
        return self._ref.parent

    @property
    def ref(self):
        """Return the DocumentRef for this document."""
        return self._ref


@dataclass(frozen=True, order=True)
class QueryResultsPage:
    """Represents a page of results from a query."""

    paging_token: any = field(default_factory=lambda: None)
    documents: List[Document] = field(default_factory=lambda: [])

    def has_more_pages(self) -> bool:
        """Return false if the page token is None or empty (both represent no more pages)."""
        return bool(self.paging_token)


class QueryBuilder:
    """Document query builder for retrieving documents from a collection based on filters."""

    _documents: Documents
    _collection: CollectionRef
    _paging_token: Any
    _limit: int
    _expressions: List[Expression]

    def __init__(
        self,
        documents: Documents,
        collection: CollectionRef,
        paging_token: Any = None,
        limit: int = 0,
        expressions: List[Expression] = None,
    ):
        """Construct a new QueryBuilder."""
        self._documents = documents
        self._collection = collection
        self._paging_token = paging_token
        self._limit = limit  # default to unlimited.
        if expressions is None:
            self._expressions = []
        else:
            self._expressions = expressions

    def _flat_expressions(self, expressions) -> List[Expression]:
        """Process possible inputs for .where() into a flattened list of expressions."""
        if isinstance(expressions, tuple) and len(expressions) == 3 and isinstance(expressions[0], str):
            # handle the special case where an expression was passed in as its component arguments.
            # e.g. .where('age', '<', 30) instead of .where(condition('age') > 30)
            return [Expression(*expressions)]
        if isinstance(expressions, Expression):
            # when a single expression is received, wrap in a list and return it
            return [expressions]
        else:
            # flatten lists of lists into single dimension list of expressions
            exps = []
            for exp in expressions:
                exps = exps + self._flat_expressions(exp)
            return exps

    def where(
        self,
        *expressions: Union[
            Expression, List[Expression], Union[str, Operator, int, bool, Tuple[str, Union[str, Operator], Any]]
        ],
    ) -> QueryBuilder:
        """
        Add a filter expression to the query.

        :param expressions: a single expression or a set of expression args or a variadic/tuple/list of expressions.

        Examples
        --------
            .where('age', '>', 20)
            .where(condition('age') > 20)
            .where(condition('age').gt(20))
            .where(
                condition('age') > 20,
                condition('age') < 50,
            )
            .where(
                [
                    condition('age') > 20,
                    condition('age') < 50,
                ]
            )
            .where(
                ('age', '>', 20),
                ('age', '<', 50),
            )

        """
        for expression in self._flat_expressions(expressions):
            self._expressions.append(expression)
        return self

    def page_from(self, token) -> QueryBuilder:
        """
        Set the paging token for the query.

        Used when requesting subsequent pages from a query.
        """
        self._paging_token = token
        return self

    def limit(self, limit: int) -> QueryBuilder:
        """Set the maximum number of results returned by this query."""
        if limit is None or not isinstance(limit, int) or limit < 0:
            raise ValueError("limit must be a positive integer or 0 for unlimited.")
        self._limit = limit
        return self

    def _expressions_to_wire(self) -> List[ExpressionMessage]:
        """Return this queries' expressions as a list of their protobuf message representation."""
        return [expressions._to_wire() for expressions in self._expressions]

    async def stream(self) -> AsyncIterator[Document]:
        """Return all query results as a stream."""
        # TODO: add limit, expressions and paging token to query.
        if self._paging_token is not None:
            raise ValueError("page_from() should not be used with streamed queries.")

        try:
            async for result in self._documents._stub.query_stream(
                document_query_stream_request=DocumentQueryStreamRequest(
                    collection=_collection_to_wire(self._collection),
                    expressions=self._expressions_to_wire(),
                    limit=self._limit,
                )
            ):
                yield _document_from_wire(documents=self._documents, message=result.document)
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def fetch(self) -> QueryResultsPage:
        """
        Fetch a single page of results.

        If a page has been fetched previously, a token can be provided via paging_from(), to fetch the subsequent pages.
        """
        try:
            results = await self._documents._stub.query(
                document_query_request=DocumentQueryRequest(
                    collection=_collection_to_wire(self._collection),
                    expressions=self._expressions_to_wire(),
                    limit=self._limit,
                    paging_token=self._paging_token,
                )
            )

            return QueryResultsPage(
                paging_token=results.paging_token if results.paging_token else None,
                documents=[
                    _document_from_wire(documents=self._documents, message=result) for result in results.documents
                ],
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    def __eq__(self, other):
        return self.__repr__() == other.__repr__()

    def __str__(self):
        repr_str = "from {0}".format(str(self._collection))
        if self._paging_token:
            repr_str += ", paging token {0}".format(str(self._paging_token))
        if len(self._expressions):
            repr_str += ", where " + " and ".join([str(exp) for exp in self._expressions])
        if self._limit != 1:
            repr_str += ", limit to {0} results".format(self._limit)

        return "Query({0})".format(repr_str)

    def __repr__(self):
        repr_str = "Documents.collection({0}).query()".format(self._collection)
        if self._paging_token:
            repr_str += ".page_from({0})".format(self._paging_token)
        if len(self._expressions):
            repr_str += "".join([".where({0})".format(str(exp)) for exp in self._expressions])
        if self._limit != 1:
            repr_str += ".limit({0})".format(self._limit)

        return repr_str


class Documents(object):
    """
    Nitric client for interacting with document collections.

    This client insulates application code from stack specific event operations or SDKs.
    """

    _stub: DocumentServiceStub

    def __init__(self):
        """Construct a Nitric Document Client."""
        self._channel = new_default_channel()
        self._stub = DocumentServiceStub(channel=self._channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self._channel is not None:
            self._channel.close()

    def collection(self, name: str) -> CollectionRef:
        """Return a reference to a document collection."""
        return CollectionRef(_documents=self, name=name)

Functions

def condition(name: str) ‑> nitric.api.documents._ExpressionBuilder

Construct a query expressions builder, for convenience.

Expression builders in turn provides magic methods for constructing expressions.

e.g. prop('first_name') == 'john' is equivalent to Expression('first_name, '=', 'john')

Supported operations are ==, <, >, <=, >=, .starts_with()

Expand source code
def condition(name: str) -> _ExpressionBuilder:
    """
    Construct a query expressions builder, for convenience.

    Expression builders in turn provides magic methods for constructing expressions.

    e.g. prop('first_name') == 'john' is equivalent to Expression('first_name, '=', 'john')

    Supported operations are ==, <, >, <=, >=, .starts_with()
    """
    return _ExpressionBuilder(operand=name)

Classes

class CollectionDepthException (*args, **kwargs)

The max depth of document sub-collections has been exceeded.

Expand source code
class CollectionDepthException(Exception):
    """The max depth of document sub-collections has been exceeded."""

    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class CollectionGroupRef (_documents: Documents, name: str, parent: Union[CollectionRef, None] = <factory>)

A reference to a collection group.

Expand source code
@dataclass(frozen=True, order=True)
class CollectionGroupRef:
    """A reference to a collection group."""

    _documents: Documents
    name: str
    parent: Union[CollectionRef, None] = field(default_factory=lambda: None)

    def query(
        self,
        paging_token: Any = None,
        limit: int = 0,
        expressions: Union[Expression, List[Expression]] = None,
    ) -> QueryBuilder:
        """Return a query builder scoped to this collection."""
        return QueryBuilder(
            documents=self._documents,
            collection=self.to_collection_ref(),
            paging_token=paging_token,
            limit=limit,
            expressions=[expressions] if isinstance(expressions, Expression) else expressions,
        )

    def sub_collection_depth(self) -> int:
        """Return the depth of this collection group, which is a count of the parents above this collection."""
        if not self.is_sub_collection():
            return 0
        else:
            return self.parent.sub_collection_depth() + 1

    def is_sub_collection(self):
        """Return True if this collection is a sub-collection of a document in another collection."""
        return self.parent is not None

    def to_collection_ref(self):
        """Return this collection group as a collection ref."""
        return CollectionRef(
            self._documents,
            self.name,
            DocumentRef(
                self._documents,
                self.parent,
                NIL_DOC_ID,
            ),
        )

    @staticmethod
    def from_collection_ref(collectionRef: CollectionRef, documents: Documents) -> CollectionGroupRef:
        """Return a collection ref as a collection group."""
        if collectionRef.parent is not None:
            return CollectionGroupRef(
                documents,
                collectionRef.name,
                CollectionGroupRef.from_collection_ref(
                    collectionRef.parent,
                    documents,
                ),
            )

Class variables

var name : str
var parent : Optional[CollectionRef]

Static methods

def from_collection_ref(collectionRef: CollectionRef, documents: Documents) ‑> CollectionGroupRef

Return a collection ref as a collection group.

Expand source code
@staticmethod
def from_collection_ref(collectionRef: CollectionRef, documents: Documents) -> CollectionGroupRef:
    """Return a collection ref as a collection group."""
    if collectionRef.parent is not None:
        return CollectionGroupRef(
            documents,
            collectionRef.name,
            CollectionGroupRef.from_collection_ref(
                collectionRef.parent,
                documents,
            ),
        )

Methods

def is_sub_collection(self)

Return True if this collection is a sub-collection of a document in another collection.

Expand source code
def is_sub_collection(self):
    """Return True if this collection is a sub-collection of a document in another collection."""
    return self.parent is not None
def query(self, paging_token: Any = None, limit: int = 0, expressions: Union[Expression, List[Expression]] = None) ‑> QueryBuilder

Return a query builder scoped to this collection.

Expand source code
def query(
    self,
    paging_token: Any = None,
    limit: int = 0,
    expressions: Union[Expression, List[Expression]] = None,
) -> QueryBuilder:
    """Return a query builder scoped to this collection."""
    return QueryBuilder(
        documents=self._documents,
        collection=self.to_collection_ref(),
        paging_token=paging_token,
        limit=limit,
        expressions=[expressions] if isinstance(expressions, Expression) else expressions,
    )
def sub_collection_depth(self) ‑> int

Return the depth of this collection group, which is a count of the parents above this collection.

Expand source code
def sub_collection_depth(self) -> int:
    """Return the depth of this collection group, which is a count of the parents above this collection."""
    if not self.is_sub_collection():
        return 0
    else:
        return self.parent.sub_collection_depth() + 1
def to_collection_ref(self)

Return this collection group as a collection ref.

Expand source code
def to_collection_ref(self):
    """Return this collection group as a collection ref."""
    return CollectionRef(
        self._documents,
        self.name,
        DocumentRef(
            self._documents,
            self.parent,
            NIL_DOC_ID,
        ),
    )
class CollectionRef (_documents: Documents, name: str, parent: Union[DocumentRef, None] = <factory>)

A reference to a collection of documents.

Expand source code
@dataclass(frozen=True, order=True)
class CollectionRef:
    """A reference to a collection of documents."""

    _documents: Documents
    name: str
    parent: Union[DocumentRef, None] = field(default_factory=lambda: None)

    def doc(self, doc_id: str) -> DocumentRef:
        """Return a reference to a document in the collection."""
        return DocumentRef(_documents=self._documents, parent=self, id=doc_id)

    def collection(self, name: str) -> CollectionGroupRef:
        """
        Return a reference to a sub-collection of this document.

        This is currently only supported to one level of depth.
        e.g. Documents().collection('a').collection('b').doc('c') is valid,
        Documents().collection('a').doc('b').collection('c').collection('d') is invalid (1 level too deep).
        """
        current_depth = self.sub_collection_depth()
        if current_depth >= MAX_SUB_COLLECTION_DEPTH:
            # Collection nesting is only supported to a maximum depth.
            raise CollectionDepthException(
                f"sub-collections supported to a depth of {MAX_SUB_COLLECTION_DEPTH}, "
                f"attempted to create new collection with depth {current_depth + 1}"
            )
        return CollectionGroupRef(_documents=self._documents, name=name, parent=self)

    def query(
        self,
        paging_token: Any = None,
        limit: int = 0,
        expressions: Union[Expression, List[Expression]] = None,
    ) -> QueryBuilder:
        """Return a query builder scoped to this collection."""
        return QueryBuilder(
            documents=self._documents,
            collection=self,
            paging_token=paging_token,
            limit=limit,
            expressions=[expressions] if isinstance(expressions, Expression) else expressions,
        )

    def sub_collection_depth(self) -> int:
        """Return the depth of this collection, which is a count of the parents above this collection."""
        if not self.is_sub_collection():
            return 0
        else:
            return self.parent.parent.sub_collection_depth() + 1

    def is_sub_collection(self):
        """Return True if this collection is a sub-collection of a document in another collection."""
        return self.parent is not None

Class variables

var name : str
var parent : Optional[DocumentRef]

Methods

def collection(self, name: str) ‑> CollectionGroupRef

Return a reference to a sub-collection of this document.

This is currently only supported to one level of depth. e.g. Documents().collection('a').collection('b').doc('c') is valid, Documents().collection('a').doc('b').collection('c').collection('d') is invalid (1 level too deep).

Expand source code
def collection(self, name: str) -> CollectionGroupRef:
    """
    Return a reference to a sub-collection of this document.

    This is currently only supported to one level of depth.
    e.g. Documents().collection('a').collection('b').doc('c') is valid,
    Documents().collection('a').doc('b').collection('c').collection('d') is invalid (1 level too deep).
    """
    current_depth = self.sub_collection_depth()
    if current_depth >= MAX_SUB_COLLECTION_DEPTH:
        # Collection nesting is only supported to a maximum depth.
        raise CollectionDepthException(
            f"sub-collections supported to a depth of {MAX_SUB_COLLECTION_DEPTH}, "
            f"attempted to create new collection with depth {current_depth + 1}"
        )
    return CollectionGroupRef(_documents=self._documents, name=name, parent=self)
def doc(self, doc_id: str) ‑> DocumentRef

Return a reference to a document in the collection.

Expand source code
def doc(self, doc_id: str) -> DocumentRef:
    """Return a reference to a document in the collection."""
    return DocumentRef(_documents=self._documents, parent=self, id=doc_id)
def is_sub_collection(self)

Return True if this collection is a sub-collection of a document in another collection.

Expand source code
def is_sub_collection(self):
    """Return True if this collection is a sub-collection of a document in another collection."""
    return self.parent is not None
def query(self, paging_token: Any = None, limit: int = 0, expressions: Union[Expression, List[Expression]] = None) ‑> QueryBuilder

Return a query builder scoped to this collection.

Expand source code
def query(
    self,
    paging_token: Any = None,
    limit: int = 0,
    expressions: Union[Expression, List[Expression]] = None,
) -> QueryBuilder:
    """Return a query builder scoped to this collection."""
    return QueryBuilder(
        documents=self._documents,
        collection=self,
        paging_token=paging_token,
        limit=limit,
        expressions=[expressions] if isinstance(expressions, Expression) else expressions,
    )
def sub_collection_depth(self) ‑> int

Return the depth of this collection, which is a count of the parents above this collection.

Expand source code
def sub_collection_depth(self) -> int:
    """Return the depth of this collection, which is a count of the parents above this collection."""
    if not self.is_sub_collection():
        return 0
    else:
        return self.parent.parent.sub_collection_depth() + 1
class Document (_ref: DocumentRef, content: dict)

Represents a document and any associated metadata.

Expand source code
@dataclass(frozen=True, order=True)
class Document:
    """Represents a document and any associated metadata."""

    _ref: DocumentRef
    content: dict

    @property
    def id(self):
        """Return the document's unique id."""
        return self._ref.id

    @property
    def collection(self) -> CollectionRef:
        """Return the CollectionRef for the collection that contains this document."""
        return self._ref.parent

    @property
    def ref(self):
        """Return the DocumentRef for this document."""
        return self._ref

Class variables

var content : dict

Instance variables

var collectionCollectionRef

Return the CollectionRef for the collection that contains this document.

Expand source code
@property
def collection(self) -> CollectionRef:
    """Return the CollectionRef for the collection that contains this document."""
    return self._ref.parent
var id

Return the document's unique id.

Expand source code
@property
def id(self):
    """Return the document's unique id."""
    return self._ref.id
var ref

Return the DocumentRef for this document.

Expand source code
@property
def ref(self):
    """Return the DocumentRef for this document."""
    return self._ref
class DocumentRef (_documents: Documents, parent: CollectionRef, id: str)

A reference to a document in a collection.

Expand source code
@dataclass(frozen=True, order=True)
class DocumentRef:
    """A reference to a document in a collection."""

    _documents: Documents
    parent: CollectionRef
    id: str

    def collection(self, name: str) -> CollectionRef:
        """
        Return a reference to a sub-collection of this document.

        This is currently only supported to one level of depth.
        e.g. Documents().collection('a').doc('b').collection('c').doc('d') is valid,
        Documents().collection('a').doc('b').collection('c').doc('d').collection('e') is invalid (1 level too deep).
        """
        current_depth = self.parent.sub_collection_depth()
        if current_depth >= MAX_SUB_COLLECTION_DEPTH:
            # Collection nesting is only supported to a maximum depth.
            raise CollectionDepthException(
                f"sub-collections supported to a depth of {MAX_SUB_COLLECTION_DEPTH}, "
                f"attempted to create new collection with depth {current_depth + 1}"
            )
        return CollectionRef(_documents=self._documents, name=name, parent=self)

    async def get(self) -> Document:
        """Retrieve the contents of this document, if it exists."""
        try:
            response = await self._documents._stub.get(
                document_get_request=DocumentGetRequest(key=_doc_ref_to_wire(self))
            )
            return _document_from_wire(documents=self._documents, message=response.document)
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def set(self, content: dict):
        """
        Set the contents of this document.

        If the document exists it will be updated, otherwise a new document will be created.
        """
        try:
            await self._documents._stub.set(
                document_set_request=DocumentSetRequest(
                    key=_doc_ref_to_wire(self),
                    content=_struct_from_dict(content),
                )
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def delete(self):
        """Delete this document, if it exists."""
        try:
            await self._documents._stub.delete(
                document_delete_request=DocumentDeleteRequest(
                    key=_doc_ref_to_wire(self),
                )
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

Class variables

var id : str
var parentCollectionRef

Methods

def collection(self, name: str) ‑> CollectionRef

Return a reference to a sub-collection of this document.

This is currently only supported to one level of depth. e.g. Documents().collection('a').doc('b').collection('c').doc('d') is valid, Documents().collection('a').doc('b').collection('c').doc('d').collection('e') is invalid (1 level too deep).

Expand source code
def collection(self, name: str) -> CollectionRef:
    """
    Return a reference to a sub-collection of this document.

    This is currently only supported to one level of depth.
    e.g. Documents().collection('a').doc('b').collection('c').doc('d') is valid,
    Documents().collection('a').doc('b').collection('c').doc('d').collection('e') is invalid (1 level too deep).
    """
    current_depth = self.parent.sub_collection_depth()
    if current_depth >= MAX_SUB_COLLECTION_DEPTH:
        # Collection nesting is only supported to a maximum depth.
        raise CollectionDepthException(
            f"sub-collections supported to a depth of {MAX_SUB_COLLECTION_DEPTH}, "
            f"attempted to create new collection with depth {current_depth + 1}"
        )
    return CollectionRef(_documents=self._documents, name=name, parent=self)
async def delete(self)

Delete this document, if it exists.

Expand source code
async def delete(self):
    """Delete this document, if it exists."""
    try:
        await self._documents._stub.delete(
            document_delete_request=DocumentDeleteRequest(
                key=_doc_ref_to_wire(self),
            )
        )
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
async def get(self) ‑> Document

Retrieve the contents of this document, if it exists.

Expand source code
async def get(self) -> Document:
    """Retrieve the contents of this document, if it exists."""
    try:
        response = await self._documents._stub.get(
            document_get_request=DocumentGetRequest(key=_doc_ref_to_wire(self))
        )
        return _document_from_wire(documents=self._documents, message=response.document)
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
async def set(self, content: dict)

Set the contents of this document.

If the document exists it will be updated, otherwise a new document will be created.

Expand source code
async def set(self, content: dict):
    """
    Set the contents of this document.

    If the document exists it will be updated, otherwise a new document will be created.
    """
    try:
        await self._documents._stub.set(
            document_set_request=DocumentSetRequest(
                key=_doc_ref_to_wire(self),
                content=_struct_from_dict(content),
            )
        )
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
class Documents

Nitric client for interacting with document collections.

This client insulates application code from stack specific event operations or SDKs.

Construct a Nitric Document Client.

Expand source code
class Documents(object):
    """
    Nitric client for interacting with document collections.

    This client insulates application code from stack specific event operations or SDKs.
    """

    _stub: DocumentServiceStub

    def __init__(self):
        """Construct a Nitric Document Client."""
        self._channel = new_default_channel()
        self._stub = DocumentServiceStub(channel=self._channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self._channel is not None:
            self._channel.close()

    def collection(self, name: str) -> CollectionRef:
        """Return a reference to a document collection."""
        return CollectionRef(_documents=self, name=name)

Methods

def collection(self, name: str) ‑> CollectionRef

Return a reference to a document collection.

Expand source code
def collection(self, name: str) -> CollectionRef:
    """Return a reference to a document collection."""
    return CollectionRef(_documents=self, name=name)
class Expression (operand: str, operator: Union[Operator, str], value: Union[str, int, float, bool])

Query expressions, representing a boolean operation used for query filters.

Expand source code
@dataclass(order=True)
class Expression:
    """Query expressions, representing a boolean operation used for query filters."""

    operand: str
    operator: Union[Operator, str]
    value: Union[str, int, float, bool]

    def __post_init__(self):
        if isinstance(self.operator, str):
            # Convert string operators to their enum values
            self.operator = Operator(self.operator)

    def _value_to_expression_value(self):
        """Return an ExpressionValue message representation of the value of this expression."""
        if isinstance(self.value, str):
            return ExpressionValue(string_value=self.value)
        # Check bool before numbers, because booleans are numbers.
        if isinstance(self.value, bool):
            return ExpressionValue(bool_value=self.value)
        if isinstance(self.value, int):
            return ExpressionValue(int_value=self.value)
        if isinstance(self.value, float):
            return ExpressionValue(double_value=self.value)

    def _to_wire(self):
        """Return the Expression protobuf message representation of this expression."""
        return ExpressionMessage(
            operand=self.operand,
            operator=self.operator.value,
            value=self._value_to_expression_value(),
        )

    def __str__(self):
        return "{0} {1} {2}".format(self.operand, self.operator.name, self.value)

Class variables

var operand : str
var operator : Union[Operator, str]
var value : Union[str, int, float, bool]
class Operator (value, names=None, *, module=None, qualname=None, type=None, start=1)

Valid query expression operators.

Expand source code
class Operator(Enum):
    """Valid query expression operators."""

    less_than = "<"
    greater_than = ">"
    less_than_or_equal = "<="
    greater_than_or_equal = ">="
    equals = "=="
    starts_with = "startsWith"

Ancestors

  • enum.Enum

Class variables

var equals
var greater_than
var greater_than_or_equal
var less_than
var less_than_or_equal
var starts_with
class QueryBuilder (documents: Documents, collection: CollectionRef, paging_token: Any = None, limit: int = 0, expressions: List[Expression] = None)

Document query builder for retrieving documents from a collection based on filters.

Construct a new QueryBuilder.

Expand source code
class QueryBuilder:
    """Document query builder for retrieving documents from a collection based on filters."""

    _documents: Documents
    _collection: CollectionRef
    _paging_token: Any
    _limit: int
    _expressions: List[Expression]

    def __init__(
        self,
        documents: Documents,
        collection: CollectionRef,
        paging_token: Any = None,
        limit: int = 0,
        expressions: List[Expression] = None,
    ):
        """Construct a new QueryBuilder."""
        self._documents = documents
        self._collection = collection
        self._paging_token = paging_token
        self._limit = limit  # default to unlimited.
        if expressions is None:
            self._expressions = []
        else:
            self._expressions = expressions

    def _flat_expressions(self, expressions) -> List[Expression]:
        """Process possible inputs for .where() into a flattened list of expressions."""
        if isinstance(expressions, tuple) and len(expressions) == 3 and isinstance(expressions[0], str):
            # handle the special case where an expression was passed in as its component arguments.
            # e.g. .where('age', '<', 30) instead of .where(condition('age') > 30)
            return [Expression(*expressions)]
        if isinstance(expressions, Expression):
            # when a single expression is received, wrap in a list and return it
            return [expressions]
        else:
            # flatten lists of lists into single dimension list of expressions
            exps = []
            for exp in expressions:
                exps = exps + self._flat_expressions(exp)
            return exps

    def where(
        self,
        *expressions: Union[
            Expression, List[Expression], Union[str, Operator, int, bool, Tuple[str, Union[str, Operator], Any]]
        ],
    ) -> QueryBuilder:
        """
        Add a filter expression to the query.

        :param expressions: a single expression or a set of expression args or a variadic/tuple/list of expressions.

        Examples
        --------
            .where('age', '>', 20)
            .where(condition('age') > 20)
            .where(condition('age').gt(20))
            .where(
                condition('age') > 20,
                condition('age') < 50,
            )
            .where(
                [
                    condition('age') > 20,
                    condition('age') < 50,
                ]
            )
            .where(
                ('age', '>', 20),
                ('age', '<', 50),
            )

        """
        for expression in self._flat_expressions(expressions):
            self._expressions.append(expression)
        return self

    def page_from(self, token) -> QueryBuilder:
        """
        Set the paging token for the query.

        Used when requesting subsequent pages from a query.
        """
        self._paging_token = token
        return self

    def limit(self, limit: int) -> QueryBuilder:
        """Set the maximum number of results returned by this query."""
        if limit is None or not isinstance(limit, int) or limit < 0:
            raise ValueError("limit must be a positive integer or 0 for unlimited.")
        self._limit = limit
        return self

    def _expressions_to_wire(self) -> List[ExpressionMessage]:
        """Return this queries' expressions as a list of their protobuf message representation."""
        return [expressions._to_wire() for expressions in self._expressions]

    async def stream(self) -> AsyncIterator[Document]:
        """Return all query results as a stream."""
        # TODO: add limit, expressions and paging token to query.
        if self._paging_token is not None:
            raise ValueError("page_from() should not be used with streamed queries.")

        try:
            async for result in self._documents._stub.query_stream(
                document_query_stream_request=DocumentQueryStreamRequest(
                    collection=_collection_to_wire(self._collection),
                    expressions=self._expressions_to_wire(),
                    limit=self._limit,
                )
            ):
                yield _document_from_wire(documents=self._documents, message=result.document)
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def fetch(self) -> QueryResultsPage:
        """
        Fetch a single page of results.

        If a page has been fetched previously, a token can be provided via paging_from(), to fetch the subsequent pages.
        """
        try:
            results = await self._documents._stub.query(
                document_query_request=DocumentQueryRequest(
                    collection=_collection_to_wire(self._collection),
                    expressions=self._expressions_to_wire(),
                    limit=self._limit,
                    paging_token=self._paging_token,
                )
            )

            return QueryResultsPage(
                paging_token=results.paging_token if results.paging_token else None,
                documents=[
                    _document_from_wire(documents=self._documents, message=result) for result in results.documents
                ],
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    def __eq__(self, other):
        return self.__repr__() == other.__repr__()

    def __str__(self):
        repr_str = "from {0}".format(str(self._collection))
        if self._paging_token:
            repr_str += ", paging token {0}".format(str(self._paging_token))
        if len(self._expressions):
            repr_str += ", where " + " and ".join([str(exp) for exp in self._expressions])
        if self._limit != 1:
            repr_str += ", limit to {0} results".format(self._limit)

        return "Query({0})".format(repr_str)

    def __repr__(self):
        repr_str = "Documents.collection({0}).query()".format(self._collection)
        if self._paging_token:
            repr_str += ".page_from({0})".format(self._paging_token)
        if len(self._expressions):
            repr_str += "".join([".where({0})".format(str(exp)) for exp in self._expressions])
        if self._limit != 1:
            repr_str += ".limit({0})".format(self._limit)

        return repr_str

Methods

async def fetch(self) ‑> QueryResultsPage

Fetch a single page of results.

If a page has been fetched previously, a token can be provided via paging_from(), to fetch the subsequent pages.

Expand source code
async def fetch(self) -> QueryResultsPage:
    """
    Fetch a single page of results.

    If a page has been fetched previously, a token can be provided via paging_from(), to fetch the subsequent pages.
    """
    try:
        results = await self._documents._stub.query(
            document_query_request=DocumentQueryRequest(
                collection=_collection_to_wire(self._collection),
                expressions=self._expressions_to_wire(),
                limit=self._limit,
                paging_token=self._paging_token,
            )
        )

        return QueryResultsPage(
            paging_token=results.paging_token if results.paging_token else None,
            documents=[
                _document_from_wire(documents=self._documents, message=result) for result in results.documents
            ],
        )
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
def limit(self, limit: int) ‑> QueryBuilder

Set the maximum number of results returned by this query.

Expand source code
def limit(self, limit: int) -> QueryBuilder:
    """Set the maximum number of results returned by this query."""
    if limit is None or not isinstance(limit, int) or limit < 0:
        raise ValueError("limit must be a positive integer or 0 for unlimited.")
    self._limit = limit
    return self
def page_from(self, token) ‑> QueryBuilder

Set the paging token for the query.

Used when requesting subsequent pages from a query.

Expand source code
def page_from(self, token) -> QueryBuilder:
    """
    Set the paging token for the query.

    Used when requesting subsequent pages from a query.
    """
    self._paging_token = token
    return self
async def stream(self) ‑> AsyncIterator[Document]

Return all query results as a stream.

Expand source code
async def stream(self) -> AsyncIterator[Document]:
    """Return all query results as a stream."""
    # TODO: add limit, expressions and paging token to query.
    if self._paging_token is not None:
        raise ValueError("page_from() should not be used with streamed queries.")

    try:
        async for result in self._documents._stub.query_stream(
            document_query_stream_request=DocumentQueryStreamRequest(
                collection=_collection_to_wire(self._collection),
                expressions=self._expressions_to_wire(),
                limit=self._limit,
            )
        ):
            yield _document_from_wire(documents=self._documents, message=result.document)
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
def where(self, *expressions: Union[Expression, List[Expression], Union[str, Operator, int, bool, Tuple[str, Union[str, Operator], Any]]]) ‑> QueryBuilder

Add a filter expression to the query.

:param expressions: a single expression or a set of expression args or a variadic/tuple/list of expressions.

Examples

.where('age', '>', 20)
.where(condition('age') > 20)
.where(condition('age').gt(20))
.where(
    condition('age') > 20,
    condition('age') < 50,
)
.where(
    [
        condition('age') > 20,
        condition('age') < 50,
    ]
)
.where(
    ('age', '>', 20),
    ('age', '<', 50),
)
Expand source code
def where(
    self,
    *expressions: Union[
        Expression, List[Expression], Union[str, Operator, int, bool, Tuple[str, Union[str, Operator], Any]]
    ],
) -> QueryBuilder:
    """
    Add a filter expression to the query.

    :param expressions: a single expression or a set of expression args or a variadic/tuple/list of expressions.

    Examples
    --------
        .where('age', '>', 20)
        .where(condition('age') > 20)
        .where(condition('age').gt(20))
        .where(
            condition('age') > 20,
            condition('age') < 50,
        )
        .where(
            [
                condition('age') > 20,
                condition('age') < 50,
            ]
        )
        .where(
            ('age', '>', 20),
            ('age', '<', 50),
        )

    """
    for expression in self._flat_expressions(expressions):
        self._expressions.append(expression)
    return self
class QueryResultsPage (paging_token: any = <factory>, documents: List[Document] = <factory>)

Represents a page of results from a query.

Expand source code
@dataclass(frozen=True, order=True)
class QueryResultsPage:
    """Represents a page of results from a query."""

    paging_token: any = field(default_factory=lambda: None)
    documents: List[Document] = field(default_factory=lambda: [])

    def has_more_pages(self) -> bool:
        """Return false if the page token is None or empty (both represent no more pages)."""
        return bool(self.paging_token)

Class variables

var documents : List[Document]
var paging_token

Methods

def has_more_pages(self) ‑> bool

Return false if the page token is None or empty (both represent no more pages).

Expand source code
def has_more_pages(self) -> bool:
    """Return false if the page token is None or empty (both represent no more pages)."""
    return bool(self.paging_token)