Module nitric.resources.queues
Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
from nitric.exception import exception_from_grpc_error
from typing import List, Union
from enum import Enum
from grpclib import GRPCError
from nitric.api.queues import QueueRef, Queues
from nitric.application import Nitric
from nitric.proto.nitric.resource.v1 import (
Resource,
ResourceType,
Action,
ResourceDeclareRequest,
)
from nitric.resources.base import SecureResource
class QueuePermission(Enum):
"""Valid query expression operators."""
sending = "sending"
receiving = "receiving"
class Queue(SecureResource):
"""A queue resource."""
name: str
actions: List[Action]
def __init__(self, name: str):
"""Construct a new queue resource."""
super().__init__()
self.name = name
def _to_resource(self) -> Resource:
return Resource(name=self.name, type=ResourceType.Queue)
def _perms_to_actions(self, *args: Union[QueuePermission, str]) -> List[Action]:
permission_actions_map = {
QueuePermission.sending: [Action.QueueSend, Action.QueueList, Action.QueueDetail],
QueuePermission.receiving: [Action.QueueReceive, Action.QueueList, Action.QueueDetail],
}
# convert strings to the enum value where needed
perms = [
permission if isinstance(permission, QueuePermission) else QueuePermission[permission.lower()]
for permission in args
]
return [action for perm in perms for action in permission_actions_map[perm]]
async def _register(self):
try:
await self._resources_stub.declare(
resource_declare_request=ResourceDeclareRequest(resource=self._to_resource())
)
except GRPCError as grpc_err:
raise exception_from_grpc_error(grpc_err)
def allow(self, *args: Union[QueuePermission, str]) -> QueueRef:
"""Request the required permissions for this queue."""
# Ensure registration of the resource is complete before requesting permissions.
self._register_policy(*args)
return Queues().queue(self.name)
def queue(name: str) -> Queue:
"""
Create and register a queue.
If a queue has already been registered with the same name, the original reference will be reused.
"""
return Nitric._create_resource(Queue, name)
Functions
def queue(name: str) ‑> Queue
-
Create and register a queue.
If a queue has already been registered with the same name, the original reference will be reused.
Expand source code
def queue(name: str) -> Queue: """ Create and register a queue. If a queue has already been registered with the same name, the original reference will be reused. """ return Nitric._create_resource(Queue, name)
Classes
class Queue (name: str)
-
A queue resource.
Construct a new queue resource.
Expand source code
class Queue(SecureResource): """A queue resource.""" name: str actions: List[Action] def __init__(self, name: str): """Construct a new queue resource.""" super().__init__() self.name = name def _to_resource(self) -> Resource: return Resource(name=self.name, type=ResourceType.Queue) def _perms_to_actions(self, *args: Union[QueuePermission, str]) -> List[Action]: permission_actions_map = { QueuePermission.sending: [Action.QueueSend, Action.QueueList, Action.QueueDetail], QueuePermission.receiving: [Action.QueueReceive, Action.QueueList, Action.QueueDetail], } # convert strings to the enum value where needed perms = [ permission if isinstance(permission, QueuePermission) else QueuePermission[permission.lower()] for permission in args ] return [action for perm in perms for action in permission_actions_map[perm]] async def _register(self): try: await self._resources_stub.declare( resource_declare_request=ResourceDeclareRequest(resource=self._to_resource()) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def allow(self, *args: Union[QueuePermission, str]) -> QueueRef: """Request the required permissions for this queue.""" # Ensure registration of the resource is complete before requesting permissions. self._register_policy(*args) return Queues().queue(self.name)
Ancestors
- SecureResource
- BaseResource
- abc.ABC
Class variables
var actions : List[Action]
var name : str
Methods
def allow(self, *args: Union[QueuePermission, str]) ‑> QueueRef
-
Request the required permissions for this queue.
Expand source code
def allow(self, *args: Union[QueuePermission, str]) -> QueueRef: """Request the required permissions for this queue.""" # Ensure registration of the resource is complete before requesting permissions. self._register_policy(*args) return Queues().queue(self.name)
Inherited members
class QueuePermission (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Valid query expression operators.
Expand source code
class QueuePermission(Enum): """Valid query expression operators.""" sending = "sending" receiving = "receiving"
Ancestors
- enum.Enum
Class variables
var receiving
var sending