Module nitric.api

Nitric API SDK.

Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Nitric API SDK."""
from nitric.api.events import Events, Event, TopicRef
from nitric.api.queues import Queues, Task, FailedTask
from nitric.api.storage import Storage
from nitric.api.documents import Documents
from nitric.api.secrets import Secrets

__all__ = [
    "Events",
    "Queues",
    "Documents",
    "Storage",
    "Event",
    "Task",
    "FailedTask",
    "TopicRef",
    "Secrets",
]

Sub-modules

nitric.api.const
nitric.api.documents
nitric.api.events
nitric.api.queues
nitric.api.secrets
nitric.api.storage

Classes

class Documents

Nitric client for interacting with document collections.

This client insulates application code from stack specific event operations or SDKs.

Construct a Nitric Document Client.

Expand source code
class Documents(object):
    """
    Nitric client for interacting with document collections.

    This client insulates application code from stack specific event operations or SDKs.
    """

    _stub: DocumentServiceStub

    def __init__(self):
        """Construct a Nitric Document Client."""
        self._channel = new_default_channel()
        self._stub = DocumentServiceStub(channel=self._channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self._channel is not None:
            self._channel.close()

    def collection(self, name: str) -> CollectionRef:
        """Return a reference to a document collection."""
        return CollectionRef(_documents=self, name=name)

Methods

def collection(self, name: str) ‑> CollectionRef

Return a reference to a document collection.

Expand source code
def collection(self, name: str) -> CollectionRef:
    """Return a reference to a document collection."""
    return CollectionRef(_documents=self, name=name)
class Event (payload: dict = <factory>, id: str = None, payload_type: str = None)

Eventing client, providing access to Topic and Event references and operations on those entities.

Expand source code
@dataclass(frozen=True, order=True)
class Event(object):
    """Eventing client, providing access to Topic and Event references and operations on those entities."""

    payload: dict = field(default_factory=dict)
    id: str = field(default=None)
    payload_type: str = field(default=None)

Class variables

var id : str
var payload : dict
var payload_type : str
class Events

Nitric generic publish/subscribe event client.

This client insulates application code from stack specific event operations or SDKs.

Construct a Nitric Event Client.

Expand source code
class Events(object):
    """
    Nitric generic publish/subscribe event client.

    This client insulates application code from stack specific event operations or SDKs.
    """

    def __init__(self):
        """Construct a Nitric Event Client."""
        self.channel = new_default_channel()
        self._stub = EventServiceStub(channel=self.channel)
        self._topic_stub = TopicServiceStub(channel=self.channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self.channel is not None:
            self.channel.close()

    async def topics(self) -> List[TopicRef]:
        """Get a list of topics available for publishing or subscription."""
        try:
            response = await self._topic_stub.list(topic_list_request=TopicListRequest())
            return [self.topic(topic.name) for topic in response.topics]
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    def topic(self, name: str) -> TopicRef:
        """Return a reference to a topic."""
        return TopicRef(_events=self, name=name)

Methods

def topic(self, name: str) ‑> TopicRef

Return a reference to a topic.

Expand source code
def topic(self, name: str) -> TopicRef:
    """Return a reference to a topic."""
    return TopicRef(_events=self, name=name)
async def topics(self) ‑> List[TopicRef]

Get a list of topics available for publishing or subscription.

Expand source code
async def topics(self) -> List[TopicRef]:
    """Get a list of topics available for publishing or subscription."""
    try:
        response = await self._topic_stub.list(topic_list_request=TopicListRequest())
        return [self.topic(topic.name) for topic in response.topics]
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
class FailedTask (id: str = None, payload_type: str = None, payload: dict = <factory>, message: str = '')

Represents a failed queue publish.

Expand source code
@dataclass(frozen=True, order=True)
class FailedTask(Task):
    """Represents a failed queue publish."""

    message: str = field(default="")

Ancestors

Class variables

var message : str
class Queues

Queueing client, providing access to Queue and Task references and operations on those entities.

Construct a Nitric Queue Client.

Expand source code
class Queues(object):
    """Queueing client, providing access to Queue and Task references and operations on those entities."""

    def __init__(self):
        """Construct a Nitric Queue Client."""
        self.channel = new_default_channel()
        self._queue_stub = QueueServiceStub(channel=self.channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self.channel is not None:
            self.channel.close()

    def queue(self, name: str):
        """Return a reference to a queue from the connected queue service."""
        return QueueRef(_queueing=self, name=name)

Methods

def queue(self, name: str)

Return a reference to a queue from the connected queue service.

Expand source code
def queue(self, name: str):
    """Return a reference to a queue from the connected queue service."""
    return QueueRef(_queueing=self, name=name)
class Secrets

Nitric secrets management client.

This client insulates application code from stack specific secrets managements services.

Construct a Nitric Storage Client.

Expand source code
class Secrets(object):
    """
    Nitric secrets management client.

    This client insulates application code from stack specific secrets managements services.
    """

    def __init__(self):
        """Construct a Nitric Storage Client."""
        self._channel = new_default_channel()
        self._secrets_stub = SecretServiceStub(channel=self._channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self._channel is not None:
            self._channel.close()

    def secret(self, name: str):
        """Return a reference to a secret container from the connected secrets management service."""
        return SecretContainerRef(_secrets=self, name=name)

Methods

def secret(self, name: str)

Return a reference to a secret container from the connected secrets management service.

Expand source code
def secret(self, name: str):
    """Return a reference to a secret container from the connected secrets management service."""
    return SecretContainerRef(_secrets=self, name=name)
class Storage

Nitric generic blob storage client.

This client insulates application code from stack specific blob store operations or SDKs.

Construct a Nitric Storage Client.

Expand source code
class Storage(object):
    """
    Nitric generic blob storage client.

    This client insulates application code from stack specific blob store operations or SDKs.
    """

    def __init__(self):
        """Construct a Nitric Storage Client."""
        self._channel = new_default_channel()
        self._storage_stub = StorageServiceStub(channel=self._channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self._channel is not None:
            self._channel.close()

    def bucket(self, name: str):
        """Return a reference to a bucket from the connected storage service."""
        return BucketRef(_storage=self, name=name, _server=None)

Methods

def bucket(self, name: str)

Return a reference to a bucket from the connected storage service.

Expand source code
def bucket(self, name: str):
    """Return a reference to a bucket from the connected storage service."""
    return BucketRef(_storage=self, name=name, _server=None)
class Task (id: str = None, payload_type: str = None, payload: dict = <factory>)

A task to be sent to a Queue.

Expand source code
@dataclass(frozen=True, order=True)
class Task(object):
    """A task to be sent to a Queue."""

    id: str = field(default=None)
    payload_type: str = field(default=None)
    payload: dict = field(default_factory=dict)

Subclasses

Class variables

var id : str
var payload : dict
var payload_type : str
class TopicRef (_events: Events, name: str)

A reference to a topic on an event service, used to perform operations on that topic.

Expand source code
@dataclass(frozen=True, order=True)
class TopicRef(object):
    """A reference to a topic on an event service, used to perform operations on that topic."""

    _events: Events
    name: str

    async def publish(
        self,
        event: Union[Event, dict] = None,
    ) -> Event:
        """
        Publish an event/message to a topic, which can be subscribed to by other services.

        :param event: the event to publish
        :return: the published event, with the id added if one was auto-generated
        """
        if event is None:
            event = Event()

        if isinstance(event, dict):
            event = Event(payload=event)

        try:
            response = await self._events._stub.publish(
                event_publish_request=EventPublishRequest(topic=self.name, event=_event_to_wire(event))
            )
            return Event(**{**event.__dict__.copy(), **{"id": response.id}})
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

Class variables

var name : str

Methods

async def publish(self, event: Union[Event, dict] = None) ‑> Event

Publish an event/message to a topic, which can be subscribed to by other services.

:param event: the event to publish :return: the published event, with the id added if one was auto-generated

Expand source code
async def publish(
    self,
    event: Union[Event, dict] = None,
) -> Event:
    """
    Publish an event/message to a topic, which can be subscribed to by other services.

    :param event: the event to publish
    :return: the published event, with the id added if one was auto-generated
    """
    if event is None:
        event = Event()

    if isinstance(event, dict):
        event = Event(payload=event)

    try:
        response = await self._events._stub.publish(
            event_publish_request=EventPublishRequest(topic=self.name, event=_event_to_wire(event))
        )
        return Event(**{**event.__dict__.copy(), **{"id": response.id}})
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)