Module nitric.api.events

Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

from typing import List, Union

from grpclib import GRPCError

from nitric.exception import exception_from_grpc_error
from nitric.utils import new_default_channel, _struct_from_dict
from nitric.proto.nitric.event.v1 import (
    EventServiceStub,
    NitricEvent,
    TopicServiceStub,
    EventPublishRequest,
    TopicListRequest,
)
from dataclasses import dataclass, field


@dataclass(frozen=True, order=True)
class Event(object):
    """Eventing client, providing access to Topic and Event references and operations on those entities."""

    payload: dict = field(default_factory=dict)
    id: str = field(default=None)
    payload_type: str = field(default=None)


def _event_to_wire(event: Event) -> NitricEvent:
    return NitricEvent(
        id=event.id,
        payload=_struct_from_dict(event.payload),
        payload_type=event.payload_type,
    )


@dataclass(frozen=True, order=True)
class TopicRef(object):
    """A reference to a topic on an event service, used to perform operations on that topic."""

    _events: Events
    name: str

    async def publish(
        self,
        event: Union[Event, dict] = None,
    ) -> Event:
        """
        Publish an event/message to a topic, which can be subscribed to by other services.

        :param event: the event to publish
        :return: the published event, with the id added if one was auto-generated
        """
        if event is None:
            event = Event()

        if isinstance(event, dict):
            event = Event(payload=event)

        try:
            response = await self._events._stub.publish(
                event_publish_request=EventPublishRequest(topic=self.name, event=_event_to_wire(event))
            )
            return Event(**{**event.__dict__.copy(), **{"id": response.id}})
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)


class Events(object):
    """
    Nitric generic publish/subscribe event client.

    This client insulates application code from stack specific event operations or SDKs.
    """

    def __init__(self):
        """Construct a Nitric Event Client."""
        self.channel = new_default_channel()
        self._stub = EventServiceStub(channel=self.channel)
        self._topic_stub = TopicServiceStub(channel=self.channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self.channel is not None:
            self.channel.close()

    async def topics(self) -> List[TopicRef]:
        """Get a list of topics available for publishing or subscription."""
        try:
            response = await self._topic_stub.list(topic_list_request=TopicListRequest())
            return [self.topic(topic.name) for topic in response.topics]
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    def topic(self, name: str) -> TopicRef:
        """Return a reference to a topic."""
        return TopicRef(_events=self, name=name)

Classes

class Event (payload: dict = <factory>, id: str = None, payload_type: str = None)

Eventing client, providing access to Topic and Event references and operations on those entities.

Expand source code
@dataclass(frozen=True, order=True)
class Event(object):
    """Eventing client, providing access to Topic and Event references and operations on those entities."""

    payload: dict = field(default_factory=dict)
    id: str = field(default=None)
    payload_type: str = field(default=None)

Class variables

var id : str
var payload : dict
var payload_type : str
class Events

Nitric generic publish/subscribe event client.

This client insulates application code from stack specific event operations or SDKs.

Construct a Nitric Event Client.

Expand source code
class Events(object):
    """
    Nitric generic publish/subscribe event client.

    This client insulates application code from stack specific event operations or SDKs.
    """

    def __init__(self):
        """Construct a Nitric Event Client."""
        self.channel = new_default_channel()
        self._stub = EventServiceStub(channel=self.channel)
        self._topic_stub = TopicServiceStub(channel=self.channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self.channel is not None:
            self.channel.close()

    async def topics(self) -> List[TopicRef]:
        """Get a list of topics available for publishing or subscription."""
        try:
            response = await self._topic_stub.list(topic_list_request=TopicListRequest())
            return [self.topic(topic.name) for topic in response.topics]
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    def topic(self, name: str) -> TopicRef:
        """Return a reference to a topic."""
        return TopicRef(_events=self, name=name)

Methods

def topic(self, name: str) ‑> TopicRef

Return a reference to a topic.

Expand source code
def topic(self, name: str) -> TopicRef:
    """Return a reference to a topic."""
    return TopicRef(_events=self, name=name)
async def topics(self) ‑> List[TopicRef]

Get a list of topics available for publishing or subscription.

Expand source code
async def topics(self) -> List[TopicRef]:
    """Get a list of topics available for publishing or subscription."""
    try:
        response = await self._topic_stub.list(topic_list_request=TopicListRequest())
        return [self.topic(topic.name) for topic in response.topics]
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
class TopicRef (_events: Events, name: str)

A reference to a topic on an event service, used to perform operations on that topic.

Expand source code
@dataclass(frozen=True, order=True)
class TopicRef(object):
    """A reference to a topic on an event service, used to perform operations on that topic."""

    _events: Events
    name: str

    async def publish(
        self,
        event: Union[Event, dict] = None,
    ) -> Event:
        """
        Publish an event/message to a topic, which can be subscribed to by other services.

        :param event: the event to publish
        :return: the published event, with the id added if one was auto-generated
        """
        if event is None:
            event = Event()

        if isinstance(event, dict):
            event = Event(payload=event)

        try:
            response = await self._events._stub.publish(
                event_publish_request=EventPublishRequest(topic=self.name, event=_event_to_wire(event))
            )
            return Event(**{**event.__dict__.copy(), **{"id": response.id}})
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

Class variables

var name : str

Methods

async def publish(self, event: Union[Event, dict] = None) ‑> Event

Publish an event/message to a topic, which can be subscribed to by other services.

:param event: the event to publish :return: the published event, with the id added if one was auto-generated

Expand source code
async def publish(
    self,
    event: Union[Event, dict] = None,
) -> Event:
    """
    Publish an event/message to a topic, which can be subscribed to by other services.

    :param event: the event to publish
    :return: the published event, with the id added if one was auto-generated
    """
    if event is None:
        event = Event()

    if isinstance(event, dict):
        event = Event(payload=event)

    try:
        response = await self._events._stub.publish(
            event_publish_request=EventPublishRequest(topic=self.name, event=_event_to_wire(event))
        )
        return Event(**{**event.__dict__.copy(), **{"id": response.id}})
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)