Module nitric.api.queues

Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

from typing import List, Union

from grpclib import GRPCError

from nitric.exception import FailedPreconditionException, exception_from_grpc_error, InvalidArgumentException
from nitric.utils import new_default_channel, _struct_from_dict, _dict_from_struct
from nitric.proto.nitric.queue.v1 import (
    QueueServiceStub,
    NitricTask,
    FailedTask as WireFailedTask,
    QueueCompleteRequest,
    QueueSendRequest,
    QueueSendBatchRequest,
    QueueReceiveRequest,
)
from dataclasses import dataclass, field


@dataclass(frozen=True, order=True)
class Task(object):
    """A task to be sent to a Queue."""

    id: str = field(default=None)
    payload_type: str = field(default=None)
    payload: dict = field(default_factory=dict)


@dataclass(frozen=True, order=True)
class ReceivedTask(object):
    """A reference to a task received from a Queue, with a lease."""

    id: str = field(default=None)
    payload_type: str = field(default=None)
    payload: dict = field(default_factory=dict)
    lease_id: str = field(default=None)
    _queueing: Queues = field(default=None)
    _queue: QueueRef = field(default=None)

    async def complete(self):
        """
        Mark this task as complete and remove it from the queue.

        Only callable for tasks that have been received from a Queue.
        """
        if self._queueing is None or self._queue is None or self.lease_id is None:
            raise FailedPreconditionException(
                "Task is missing internal client or lease id, was it returned from " "queue.receive?"
            )
        try:
            await self._queueing._queue_stub.complete(
                queue_complete_request=QueueCompleteRequest(queue=self._queue.name, lease_id=self.lease_id)
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)


@dataclass(frozen=True, order=True)
class FailedTask(Task):
    """Represents a failed queue publish."""

    message: str = field(default="")


def _task_to_wire(task: Task) -> NitricTask:
    """
    Convert a Nitric Task to a Nitric Queue Task.

    :param task: to convert
    :return: converted task
    """
    return NitricTask(
        id=task.id,
        payload_type=task.payload_type,
        payload=_struct_from_dict(task.payload),
    )


def _wire_to_received_task(task: NitricTask, queueing: Queues = None, queue: QueueRef = None) -> ReceivedTask:
    """
    Convert a Nitric Queue Task (protobuf) to a Nitric Task (python SDK).

    :param task: to convert
    :return: converted task
    """
    return ReceivedTask(
        id=task.id,
        payload_type=task.payload_type,
        payload=_dict_from_struct(task.payload),
        lease_id=task.lease_id,
        _queueing=queueing,
        _queue=queue,
    )


def _wire_to_failed_task(failed_task: WireFailedTask) -> FailedTask:
    """
    Convert a queue task that failed to push into a Failed Task object.

    :param failed_task: the failed task
    :return: the Failed Task with failure message
    """
    task = _wire_to_received_task(failed_task.task)

    return FailedTask(
        id=task.id,
        payload_type=task.payload_type,
        payload=task.payload,
        message=failed_task.message,
    )


@dataclass(frozen=True, order=True)
class QueueRef(object):
    """A reference to a queue from a queue service, used to perform operations on that queue."""

    _queueing: Queues
    name: str

    async def send(
        self, tasks: Union[Task, dict, List[Union[Task, dict]]] = None
    ) -> Union[Task, List[Union[Task, FailedTask]]]:
        """
        Send one or more tasks to this queue.

        If a list of tasks is provided this function will return a list containing any tasks that failed to be sent to
        the queue.

        :param tasks: A task or list of tasks to send to the queue.
        """
        if isinstance(tasks, list):
            return await self._send_batch(tasks)

        task = tasks
        if task is None:
            task = Task()

        if isinstance(task, dict):
            # Handle if its just a payload
            if task.get("payload") is None:
                task = {"payload": task}
            task = Task(**task)

        try:
            await self._queueing._queue_stub.send(
                queue_send_request=QueueSendRequest(queue=self.name, task=_task_to_wire(task))
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def _send_batch(self, tasks: List[Union[Task, dict]], raise_on_failure: bool = True) -> List[FailedTask]:
        """
        Push a collection of tasks to a queue, which can be retrieved by other services.

        :param tasks: The tasks to push to the queue
        :param raise_on_failure: Whether to raise an exception when one or more tasks fails to send
        :return: PushResponse containing a list containing details of any messages that failed to publish.
        """
        if tasks is None or len(tasks) < 1:
            raise InvalidArgumentException("No tasks provided, nothing to send.")

        wire_tasks = [_task_to_wire(Task(**task) if isinstance(task, dict) else task) for task in tasks]

        try:
            response = await self._queueing._queue_stub.send_batch(
                queue_send_batch_request=QueueSendBatchRequest(queue=self.name, tasks=wire_tasks)
            )
            return [_wire_to_failed_task(failed_task) for failed_task in response.failed_tasks]
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def receive(self, limit: int = None) -> List[Task]:
        """
        Pop 1 or more items from the specified queue up to the depth limit.

        Queue items are Nitric Tasks that are leased for a limited period of time, where they may be worked on.
        Once complete or failed they must be acknowledged using the request specific leaseId.

        If the lease on a queue item expires before it is acknowledged or the lease is extended the task will be
        returned to the queue for reprocessing.

        :param limit: The maximum number of queue items to return. Default: 1, Min: 1.
        :return: Queue items popped from the specified queue.
        """
        # Set the default and minimum depth to 1.
        if limit is None or limit < 1:
            limit = 1

        try:
            response = await self._queueing._queue_stub.receive(
                queue_receive_request=QueueReceiveRequest(queue=self.name, depth=limit)
            )
            # Map the response protobuf response items to Python SDK Nitric Tasks
            return [_wire_to_received_task(task=task, queueing=self._queueing, queue=self) for task in response.tasks]
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)


class Queues(object):
    """Queueing client, providing access to Queue and Task references and operations on those entities."""

    def __init__(self):
        """Construct a Nitric Queue Client."""
        self.channel = new_default_channel()
        self._queue_stub = QueueServiceStub(channel=self.channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self.channel is not None:
            self.channel.close()

    def queue(self, name: str):
        """Return a reference to a queue from the connected queue service."""
        return QueueRef(_queueing=self, name=name)

Classes

class FailedTask (id: str = None, payload_type: str = None, payload: dict = <factory>, message: str = '')

Represents a failed queue publish.

Expand source code
@dataclass(frozen=True, order=True)
class FailedTask(Task):
    """Represents a failed queue publish."""

    message: str = field(default="")

Ancestors

Class variables

var message : str
class QueueRef (_queueing: Queues, name: str)

A reference to a queue from a queue service, used to perform operations on that queue.

Expand source code
@dataclass(frozen=True, order=True)
class QueueRef(object):
    """A reference to a queue from a queue service, used to perform operations on that queue."""

    _queueing: Queues
    name: str

    async def send(
        self, tasks: Union[Task, dict, List[Union[Task, dict]]] = None
    ) -> Union[Task, List[Union[Task, FailedTask]]]:
        """
        Send one or more tasks to this queue.

        If a list of tasks is provided this function will return a list containing any tasks that failed to be sent to
        the queue.

        :param tasks: A task or list of tasks to send to the queue.
        """
        if isinstance(tasks, list):
            return await self._send_batch(tasks)

        task = tasks
        if task is None:
            task = Task()

        if isinstance(task, dict):
            # Handle if its just a payload
            if task.get("payload") is None:
                task = {"payload": task}
            task = Task(**task)

        try:
            await self._queueing._queue_stub.send(
                queue_send_request=QueueSendRequest(queue=self.name, task=_task_to_wire(task))
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def _send_batch(self, tasks: List[Union[Task, dict]], raise_on_failure: bool = True) -> List[FailedTask]:
        """
        Push a collection of tasks to a queue, which can be retrieved by other services.

        :param tasks: The tasks to push to the queue
        :param raise_on_failure: Whether to raise an exception when one or more tasks fails to send
        :return: PushResponse containing a list containing details of any messages that failed to publish.
        """
        if tasks is None or len(tasks) < 1:
            raise InvalidArgumentException("No tasks provided, nothing to send.")

        wire_tasks = [_task_to_wire(Task(**task) if isinstance(task, dict) else task) for task in tasks]

        try:
            response = await self._queueing._queue_stub.send_batch(
                queue_send_batch_request=QueueSendBatchRequest(queue=self.name, tasks=wire_tasks)
            )
            return [_wire_to_failed_task(failed_task) for failed_task in response.failed_tasks]
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

    async def receive(self, limit: int = None) -> List[Task]:
        """
        Pop 1 or more items from the specified queue up to the depth limit.

        Queue items are Nitric Tasks that are leased for a limited period of time, where they may be worked on.
        Once complete or failed they must be acknowledged using the request specific leaseId.

        If the lease on a queue item expires before it is acknowledged or the lease is extended the task will be
        returned to the queue for reprocessing.

        :param limit: The maximum number of queue items to return. Default: 1, Min: 1.
        :return: Queue items popped from the specified queue.
        """
        # Set the default and minimum depth to 1.
        if limit is None or limit < 1:
            limit = 1

        try:
            response = await self._queueing._queue_stub.receive(
                queue_receive_request=QueueReceiveRequest(queue=self.name, depth=limit)
            )
            # Map the response protobuf response items to Python SDK Nitric Tasks
            return [_wire_to_received_task(task=task, queueing=self._queueing, queue=self) for task in response.tasks]
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

Class variables

var name : str

Methods

async def receive(self, limit: int = None) ‑> List[Task]

Pop 1 or more items from the specified queue up to the depth limit.

Queue items are Nitric Tasks that are leased for a limited period of time, where they may be worked on. Once complete or failed they must be acknowledged using the request specific leaseId.

If the lease on a queue item expires before it is acknowledged or the lease is extended the task will be returned to the queue for reprocessing.

:param limit: The maximum number of queue items to return. Default: 1, Min: 1. :return: Queue items popped from the specified queue.

Expand source code
async def receive(self, limit: int = None) -> List[Task]:
    """
    Pop 1 or more items from the specified queue up to the depth limit.

    Queue items are Nitric Tasks that are leased for a limited period of time, where they may be worked on.
    Once complete or failed they must be acknowledged using the request specific leaseId.

    If the lease on a queue item expires before it is acknowledged or the lease is extended the task will be
    returned to the queue for reprocessing.

    :param limit: The maximum number of queue items to return. Default: 1, Min: 1.
    :return: Queue items popped from the specified queue.
    """
    # Set the default and minimum depth to 1.
    if limit is None or limit < 1:
        limit = 1

    try:
        response = await self._queueing._queue_stub.receive(
            queue_receive_request=QueueReceiveRequest(queue=self.name, depth=limit)
        )
        # Map the response protobuf response items to Python SDK Nitric Tasks
        return [_wire_to_received_task(task=task, queueing=self._queueing, queue=self) for task in response.tasks]
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
async def send(self, tasks: Union[Task, dict, List[Union[Task, dict]]] = None) ‑> Union[Task, List[Union[TaskFailedTask]]]

Send one or more tasks to this queue.

If a list of tasks is provided this function will return a list containing any tasks that failed to be sent to the queue.

:param tasks: A task or list of tasks to send to the queue.

Expand source code
async def send(
    self, tasks: Union[Task, dict, List[Union[Task, dict]]] = None
) -> Union[Task, List[Union[Task, FailedTask]]]:
    """
    Send one or more tasks to this queue.

    If a list of tasks is provided this function will return a list containing any tasks that failed to be sent to
    the queue.

    :param tasks: A task or list of tasks to send to the queue.
    """
    if isinstance(tasks, list):
        return await self._send_batch(tasks)

    task = tasks
    if task is None:
        task = Task()

    if isinstance(task, dict):
        # Handle if its just a payload
        if task.get("payload") is None:
            task = {"payload": task}
        task = Task(**task)

    try:
        await self._queueing._queue_stub.send(
            queue_send_request=QueueSendRequest(queue=self.name, task=_task_to_wire(task))
        )
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
class Queues

Queueing client, providing access to Queue and Task references and operations on those entities.

Construct a Nitric Queue Client.

Expand source code
class Queues(object):
    """Queueing client, providing access to Queue and Task references and operations on those entities."""

    def __init__(self):
        """Construct a Nitric Queue Client."""
        self.channel = new_default_channel()
        self._queue_stub = QueueServiceStub(channel=self.channel)

    def __del__(self):
        # close the channel when this client is destroyed
        if self.channel is not None:
            self.channel.close()

    def queue(self, name: str):
        """Return a reference to a queue from the connected queue service."""
        return QueueRef(_queueing=self, name=name)

Methods

def queue(self, name: str)

Return a reference to a queue from the connected queue service.

Expand source code
def queue(self, name: str):
    """Return a reference to a queue from the connected queue service."""
    return QueueRef(_queueing=self, name=name)
class ReceivedTask (id: str = None, payload_type: str = None, payload: dict = <factory>, lease_id: str = None)

A reference to a task received from a Queue, with a lease.

Expand source code
@dataclass(frozen=True, order=True)
class ReceivedTask(object):
    """A reference to a task received from a Queue, with a lease."""

    id: str = field(default=None)
    payload_type: str = field(default=None)
    payload: dict = field(default_factory=dict)
    lease_id: str = field(default=None)
    _queueing: Queues = field(default=None)
    _queue: QueueRef = field(default=None)

    async def complete(self):
        """
        Mark this task as complete and remove it from the queue.

        Only callable for tasks that have been received from a Queue.
        """
        if self._queueing is None or self._queue is None or self.lease_id is None:
            raise FailedPreconditionException(
                "Task is missing internal client or lease id, was it returned from " "queue.receive?"
            )
        try:
            await self._queueing._queue_stub.complete(
                queue_complete_request=QueueCompleteRequest(queue=self._queue.name, lease_id=self.lease_id)
            )
        except GRPCError as grpc_err:
            raise exception_from_grpc_error(grpc_err)

Class variables

var id : str
var lease_id : str
var payload : dict
var payload_type : str

Methods

async def complete(self)

Mark this task as complete and remove it from the queue.

Only callable for tasks that have been received from a Queue.

Expand source code
async def complete(self):
    """
    Mark this task as complete and remove it from the queue.

    Only callable for tasks that have been received from a Queue.
    """
    if self._queueing is None or self._queue is None or self.lease_id is None:
        raise FailedPreconditionException(
            "Task is missing internal client or lease id, was it returned from " "queue.receive?"
        )
    try:
        await self._queueing._queue_stub.complete(
            queue_complete_request=QueueCompleteRequest(queue=self._queue.name, lease_id=self.lease_id)
        )
    except GRPCError as grpc_err:
        raise exception_from_grpc_error(grpc_err)
class Task (id: str = None, payload_type: str = None, payload: dict = <factory>)

A task to be sent to a Queue.

Expand source code
@dataclass(frozen=True, order=True)
class Task(object):
    """A task to be sent to a Queue."""

    id: str = field(default=None)
    payload_type: str = field(default=None)
    payload: dict = field(default_factory=dict)

Subclasses

Class variables

var id : str
var payload : dict
var payload_type : str