Module nitric.api
Nitric API SDK.
Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Nitric API SDK."""
from nitric.api.events import Events, Event, TopicRef
from nitric.api.queues import Queues, Task, FailedTask
from nitric.api.storage import Storage
from nitric.api.documents import Documents
from nitric.api.secrets import Secrets
__all__ = [
"Events",
"Queues",
"Documents",
"Storage",
"Event",
"Task",
"FailedTask",
"TopicRef",
"Secrets",
]
Sub-modules
nitric.api.const
nitric.api.documents
nitric.api.events
nitric.api.queues
nitric.api.secrets
nitric.api.storage
Classes
class Documents
-
Nitric client for interacting with document collections.
This client insulates application code from stack specific event operations or SDKs.
Construct a Nitric Document Client.
Expand source code
class Documents(object): """ Nitric client for interacting with document collections. This client insulates application code from stack specific event operations or SDKs. """ _stub: DocumentServiceStub def __init__(self): """Construct a Nitric Document Client.""" self._channel = new_default_channel() self._stub = DocumentServiceStub(channel=self._channel) def __del__(self): # close the channel when this client is destroyed if self._channel is not None: self._channel.close() def collection(self, name: str) -> CollectionRef: """Return a reference to a document collection.""" return CollectionRef(_documents=self, name=name)
Methods
def collection(self, name: str) ‑> CollectionRef
-
Return a reference to a document collection.
Expand source code
def collection(self, name: str) -> CollectionRef: """Return a reference to a document collection.""" return CollectionRef(_documents=self, name=name)
class Event (payload: dict = <factory>, id: str = None, payload_type: str = None)
-
Eventing client, providing access to Topic and Event references and operations on those entities.
Expand source code
@dataclass(frozen=True, order=True) class Event(object): """Eventing client, providing access to Topic and Event references and operations on those entities.""" payload: dict = field(default_factory=dict) id: str = field(default=None) payload_type: str = field(default=None)
Class variables
var id : str
var payload : dict
var payload_type : str
class Events
-
Nitric generic publish/subscribe event client.
This client insulates application code from stack specific event operations or SDKs.
Construct a Nitric Event Client.
Expand source code
class Events(object): """ Nitric generic publish/subscribe event client. This client insulates application code from stack specific event operations or SDKs. """ def __init__(self): """Construct a Nitric Event Client.""" self.channel = new_default_channel() self._stub = EventServiceStub(channel=self.channel) self._topic_stub = TopicServiceStub(channel=self.channel) def __del__(self): # close the channel when this client is destroyed if self.channel is not None: self.channel.close() async def topics(self) -> List[TopicRef]: """Get a list of topics available for publishing or subscription.""" try: response = await self._topic_stub.list(topic_list_request=TopicListRequest()) return [self.topic(topic.name) for topic in response.topics] except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def topic(self, name: str) -> TopicRef: """Return a reference to a topic.""" return TopicRef(_events=self, name=name)
Methods
def topic(self, name: str) ‑> TopicRef
-
Return a reference to a topic.
Expand source code
def topic(self, name: str) -> TopicRef: """Return a reference to a topic.""" return TopicRef(_events=self, name=name)
async def topics(self) ‑> List[TopicRef]
-
Get a list of topics available for publishing or subscription.
Expand source code
async def topics(self) -> List[TopicRef]: """Get a list of topics available for publishing or subscription.""" try: response = await self._topic_stub.list(topic_list_request=TopicListRequest()) return [self.topic(topic.name) for topic in response.topics] except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)
class FailedTask (id: str = None, payload_type: str = None, payload: dict = <factory>, message: str = '')
-
Represents a failed queue publish.
Expand source code
@dataclass(frozen=True, order=True) class FailedTask(Task): """Represents a failed queue publish.""" message: str = field(default="")
Ancestors
Class variables
var message : str
class Queues
-
Queueing client, providing access to Queue and Task references and operations on those entities.
Construct a Nitric Queue Client.
Expand source code
class Queues(object): """Queueing client, providing access to Queue and Task references and operations on those entities.""" def __init__(self): """Construct a Nitric Queue Client.""" self.channel = new_default_channel() self._queue_stub = QueueServiceStub(channel=self.channel) def __del__(self): # close the channel when this client is destroyed if self.channel is not None: self.channel.close() def queue(self, name: str): """Return a reference to a queue from the connected queue service.""" return QueueRef(_queueing=self, name=name)
Methods
def queue(self, name: str)
-
Return a reference to a queue from the connected queue service.
Expand source code
def queue(self, name: str): """Return a reference to a queue from the connected queue service.""" return QueueRef(_queueing=self, name=name)
class Secrets
-
Nitric secrets management client.
This client insulates application code from stack specific secrets managements services.
Construct a Nitric Storage Client.
Expand source code
class Secrets(object): """ Nitric secrets management client. This client insulates application code from stack specific secrets managements services. """ def __init__(self): """Construct a Nitric Storage Client.""" self._channel = new_default_channel() self._secrets_stub = SecretServiceStub(channel=self._channel) def __del__(self): # close the channel when this client is destroyed if self._channel is not None: self._channel.close() def secret(self, name: str): """Return a reference to a secret container from the connected secrets management service.""" return SecretContainerRef(_secrets=self, name=name)
Methods
def secret(self, name: str)
-
Return a reference to a secret container from the connected secrets management service.
Expand source code
def secret(self, name: str): """Return a reference to a secret container from the connected secrets management service.""" return SecretContainerRef(_secrets=self, name=name)
class Storage
-
Nitric generic blob storage client.
This client insulates application code from stack specific blob store operations or SDKs.
Construct a Nitric Storage Client.
Expand source code
class Storage(object): """ Nitric generic blob storage client. This client insulates application code from stack specific blob store operations or SDKs. """ def __init__(self): """Construct a Nitric Storage Client.""" self._channel = new_default_channel() self._storage_stub = StorageServiceStub(channel=self._channel) def __del__(self): # close the channel when this client is destroyed if self._channel is not None: self._channel.close() def bucket(self, name: str): """Return a reference to a bucket from the connected storage service.""" return BucketRef(_storage=self, name=name, _server=None)
Methods
def bucket(self, name: str)
-
Return a reference to a bucket from the connected storage service.
Expand source code
def bucket(self, name: str): """Return a reference to a bucket from the connected storage service.""" return BucketRef(_storage=self, name=name, _server=None)
class Task (id: str = None, payload_type: str = None, payload: dict = <factory>)
-
A task to be sent to a Queue.
Expand source code
@dataclass(frozen=True, order=True) class Task(object): """A task to be sent to a Queue.""" id: str = field(default=None) payload_type: str = field(default=None) payload: dict = field(default_factory=dict)
Subclasses
Class variables
var id : str
var payload : dict
var payload_type : str
class TopicRef (_events: Events, name: str)
-
A reference to a topic on an event service, used to perform operations on that topic.
Expand source code
@dataclass(frozen=True, order=True) class TopicRef(object): """A reference to a topic on an event service, used to perform operations on that topic.""" _events: Events name: str async def publish( self, event: Union[Event, dict] = None, ) -> Event: """ Publish an event/message to a topic, which can be subscribed to by other services. :param event: the event to publish :return: the published event, with the id added if one was auto-generated """ if event is None: event = Event() if isinstance(event, dict): event = Event(payload=event) try: response = await self._events._stub.publish( event_publish_request=EventPublishRequest(topic=self.name, event=_event_to_wire(event)) ) return Event(**{**event.__dict__.copy(), **{"id": response.id}}) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)
Class variables
var name : str
Methods
async def publish(self, event: Union[Event, dict] = None) ‑> Event
-
Publish an event/message to a topic, which can be subscribed to by other services.
:param event: the event to publish :return: the published event, with the id added if one was auto-generated
Expand source code
async def publish( self, event: Union[Event, dict] = None, ) -> Event: """ Publish an event/message to a topic, which can be subscribed to by other services. :param event: the event to publish :return: the published event, with the id added if one was auto-generated """ if event is None: event = Event() if isinstance(event, dict): event = Event(payload=event) try: response = await self._events._stub.publish( event_publish_request=EventPublishRequest(topic=self.name, event=_event_to_wire(event)) ) return Event(**{**event.__dict__.copy(), **{"id": response.id}}) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)