Module nitric.proto.nitric.resource.v1

Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: proto/resource/v1/resource.proto
# plugin: python-betterproto
from dataclasses import dataclass
from typing import (
    TYPE_CHECKING,
    Dict,
    List,
    Optional,
)

import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase


if TYPE_CHECKING:
    import grpclib.server
    from betterproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


class ResourceType(betterproto.Enum):
    Api = 0
    Function = 1
    Bucket = 2
    Queue = 3
    Topic = 4
    Schedule = 5
    Subscription = 6
    Collection = 7
    Policy = 8
    Secret = 9
    Notification = 10


class Action(betterproto.Enum):
    BucketFileList = 0
    """Bucket Permissions: 0XX"""

    BucketFileGet = 1
    BucketFilePut = 2
    BucketFileDelete = 3
    TopicList = 200
    """Topic Permissions: 2XX"""

    TopicDetail = 201
    TopicEventPublish = 202
    QueueSend = 300
    """Queue Permissions: 3XX"""

    QueueReceive = 301
    QueueList = 302
    QueueDetail = 303
    CollectionDocumentRead = 400
    """Collection Permissions: 4XX"""

    CollectionDocumentWrite = 401
    CollectionDocumentDelete = 402
    CollectionQuery = 403
    CollectionList = 404
    SecretPut = 500
    """Secret Permissions: 5XX"""

    SecretAccess = 501


@dataclass(eq=False, repr=False)
class PolicyResource(betterproto.Message):
    principals: List["Resource"] = betterproto.message_field(1)
    actions: List["Action"] = betterproto.enum_field(2)
    resources: List["Resource"] = betterproto.message_field(3)


@dataclass(eq=False, repr=False)
class Resource(betterproto.Message):
    type: "ResourceType" = betterproto.enum_field(1)
    name: str = betterproto.string_field(2)


@dataclass(eq=False, repr=False)
class ResourceDeclareRequest(betterproto.Message):
    resource: "Resource" = betterproto.message_field(1)
    policy: "PolicyResource" = betterproto.message_field(10, group="config")
    bucket: "BucketResource" = betterproto.message_field(11, group="config")
    queue: "QueueResource" = betterproto.message_field(12, group="config")
    topic: "TopicResource" = betterproto.message_field(13, group="config")
    collection: "CollectionResource" = betterproto.message_field(14, group="config")
    secret: "SecretResource" = betterproto.message_field(15, group="config")
    api: "ApiResource" = betterproto.message_field(16, group="config")


@dataclass(eq=False, repr=False)
class BucketResource(betterproto.Message):
    pass


@dataclass(eq=False, repr=False)
class QueueResource(betterproto.Message):
    pass


@dataclass(eq=False, repr=False)
class TopicResource(betterproto.Message):
    pass


@dataclass(eq=False, repr=False)
class CollectionResource(betterproto.Message):
    pass


@dataclass(eq=False, repr=False)
class SecretResource(betterproto.Message):
    pass


@dataclass(eq=False, repr=False)
class ApiSecurityDefinitionJwt(betterproto.Message):
    """protect your API with JWT authentication"""

    issuer: str = betterproto.string_field(1)
    audiences: List[str] = betterproto.string_field(2)


@dataclass(eq=False, repr=False)
class ApiSecurityDefinition(betterproto.Message):
    jwt: "ApiSecurityDefinitionJwt" = betterproto.message_field(1, group="definition")


@dataclass(eq=False, repr=False)
class ApiScopes(betterproto.Message):
    scopes: List[str] = betterproto.string_field(1)


@dataclass(eq=False, repr=False)
class ApiResource(betterproto.Message):
    security_definitions: Dict[str, "ApiSecurityDefinition"] = betterproto.map_field(
        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
    )
    """
    Security definitions for the api These may be used by registered routes and
    operations on the API
    """

    security: Dict[str, "ApiScopes"] = betterproto.map_field(
        2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
    )
    """root level security for this api"""


@dataclass(eq=False, repr=False)
class ResourceDeclareResponse(betterproto.Message):
    pass


@dataclass(eq=False, repr=False)
class ApiResourceDetails(betterproto.Message):
    url: str = betterproto.string_field(1)


@dataclass(eq=False, repr=False)
class ResourceDetailsRequest(betterproto.Message):
    resource: "Resource" = betterproto.message_field(1)


@dataclass(eq=False, repr=False)
class ResourceDetailsResponse(betterproto.Message):
    id: str = betterproto.string_field(1)
    """The identifier of the resource"""

    provider: str = betterproto.string_field(2)
    """The provider this resource is deployed with (e.g. aws)"""

    service: str = betterproto.string_field(3)
    """The service this resource is deployed on (e.g. ApiGateway)"""

    api: "ApiResourceDetails" = betterproto.message_field(10, group="details")


class ResourceServiceStub(betterproto.ServiceStub):
    async def declare(
        self,
        resource_declare_request: "ResourceDeclareRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> "ResourceDeclareResponse":
        return await self._unary_unary(
            "/nitric.resource.v1.ResourceService/Declare",
            resource_declare_request,
            ResourceDeclareResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

    async def details(
        self,
        resource_details_request: "ResourceDetailsRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> "ResourceDetailsResponse":
        return await self._unary_unary(
            "/nitric.resource.v1.ResourceService/Details",
            resource_details_request,
            ResourceDetailsResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )


class ResourceServiceBase(ServiceBase):
    async def declare(
        self, resource_declare_request: "ResourceDeclareRequest"
    ) -> "ResourceDeclareResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def details(
        self, resource_details_request: "ResourceDetailsRequest"
    ) -> "ResourceDetailsResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_declare(
        self,
        stream: "grpclib.server.Stream[ResourceDeclareRequest, ResourceDeclareResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.declare(request)
        await stream.send_message(response)

    async def __rpc_details(
        self,
        stream: "grpclib.server.Stream[ResourceDetailsRequest, ResourceDetailsResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.details(request)
        await stream.send_message(response)

    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
        return {
            "/nitric.resource.v1.ResourceService/Declare": grpclib.const.Handler(
                self.__rpc_declare,
                grpclib.const.Cardinality.UNARY_UNARY,
                ResourceDeclareRequest,
                ResourceDeclareResponse,
            ),
            "/nitric.resource.v1.ResourceService/Details": grpclib.const.Handler(
                self.__rpc_details,
                grpclib.const.Cardinality.UNARY_UNARY,
                ResourceDetailsRequest,
                ResourceDetailsResponse,
            ),
        }

Classes

class Action (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class Action(betterproto.Enum):
    BucketFileList = 0
    """Bucket Permissions: 0XX"""

    BucketFileGet = 1
    BucketFilePut = 2
    BucketFileDelete = 3
    TopicList = 200
    """Topic Permissions: 2XX"""

    TopicDetail = 201
    TopicEventPublish = 202
    QueueSend = 300
    """Queue Permissions: 3XX"""

    QueueReceive = 301
    QueueList = 302
    QueueDetail = 303
    CollectionDocumentRead = 400
    """Collection Permissions: 4XX"""

    CollectionDocumentWrite = 401
    CollectionDocumentDelete = 402
    CollectionQuery = 403
    CollectionList = 404
    SecretPut = 500
    """Secret Permissions: 5XX"""

    SecretAccess = 501

Ancestors

  • betterproto.Enum
  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var BucketFileDelete
var BucketFileGet
var BucketFileList

Bucket Permissions: 0XX

var BucketFilePut
var CollectionDocumentDelete
var CollectionDocumentRead

Collection Permissions: 4XX

var CollectionDocumentWrite
var CollectionList
var CollectionQuery
var QueueDetail
var QueueList
var QueueReceive
var QueueSend

Queue Permissions: 3XX

var SecretAccess
var SecretPut

Secret Permissions: 5XX

var TopicDetail
var TopicEventPublish
var TopicList

Topic Permissions: 2XX

class ApiResource (security_definitions: Dict[str, ForwardRef('ApiSecurityDefinition')] = <object object>, security: Dict[str, ForwardRef('ApiScopes')] = <object object>)

ApiResource(security_definitions: Dict[str, ForwardRef('ApiSecurityDefinition')] = , security: Dict[str, ForwardRef('ApiScopes')] = )

Expand source code
@dataclass(eq=False, repr=False)
class ApiResource(betterproto.Message):
    security_definitions: Dict[str, "ApiSecurityDefinition"] = betterproto.map_field(
        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
    )
    """
    Security definitions for the api These may be used by registered routes and
    operations on the API
    """

    security: Dict[str, "ApiScopes"] = betterproto.map_field(
        2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
    )
    """root level security for this api"""

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var security : Dict[str, ApiScopes]

root level security for this api

var security_definitions : Dict[str, ApiSecurityDefinition]

Security definitions for the api These may be used by registered routes and operations on the API

class ApiResourceDetails (url: str = <object object>)

ApiResourceDetails(url: str = )

Expand source code
@dataclass(eq=False, repr=False)
class ApiResourceDetails(betterproto.Message):
    url: str = betterproto.string_field(1)

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var url : str
class ApiScopes (scopes: List[str] = <object object>)

ApiScopes(scopes: List[str] = )

Expand source code
@dataclass(eq=False, repr=False)
class ApiScopes(betterproto.Message):
    scopes: List[str] = betterproto.string_field(1)

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var scopes : List[str]
class ApiSecurityDefinition (jwt: ApiSecurityDefinitionJwt = <object object>)

ApiSecurityDefinition(jwt: 'ApiSecurityDefinitionJwt' = )

Expand source code
@dataclass(eq=False, repr=False)
class ApiSecurityDefinition(betterproto.Message):
    jwt: "ApiSecurityDefinitionJwt" = betterproto.message_field(1, group="definition")

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var jwtApiSecurityDefinitionJwt
class ApiSecurityDefinitionJwt (issuer: str = <object object>, audiences: List[str] = <object object>)

protect your API with JWT authentication

Expand source code
@dataclass(eq=False, repr=False)
class ApiSecurityDefinitionJwt(betterproto.Message):
    """protect your API with JWT authentication"""

    issuer: str = betterproto.string_field(1)
    audiences: List[str] = betterproto.string_field(2)

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var audiences : List[str]
var issuer : str
class BucketResource

BucketResource()

Expand source code
@dataclass(eq=False, repr=False)
class BucketResource(betterproto.Message):
    pass

Ancestors

  • betterproto.Message
  • abc.ABC
class CollectionResource

CollectionResource()

Expand source code
@dataclass(eq=False, repr=False)
class CollectionResource(betterproto.Message):
    pass

Ancestors

  • betterproto.Message
  • abc.ABC
class PolicyResource (principals: List[ForwardRef('Resource')] = <object object>, actions: List[ForwardRef('Action')] = <object object>, resources: List[ForwardRef('Resource')] = <object object>)

PolicyResource(principals: List[ForwardRef('Resource')] = , actions: List[ForwardRef('Action')] = , resources: List[ForwardRef('Resource')] = )

Expand source code
@dataclass(eq=False, repr=False)
class PolicyResource(betterproto.Message):
    principals: List["Resource"] = betterproto.message_field(1)
    actions: List["Action"] = betterproto.enum_field(2)
    resources: List["Resource"] = betterproto.message_field(3)

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var actions : List[Action]
var principals : List[Resource]
var resources : List[Resource]
class QueueResource

QueueResource()

Expand source code
@dataclass(eq=False, repr=False)
class QueueResource(betterproto.Message):
    pass

Ancestors

  • betterproto.Message
  • abc.ABC
class Resource (type: ResourceType = <object object>, name: str = <object object>)

Resource(type: 'ResourceType' = , name: str = )

Expand source code
@dataclass(eq=False, repr=False)
class Resource(betterproto.Message):
    type: "ResourceType" = betterproto.enum_field(1)
    name: str = betterproto.string_field(2)

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var name : str
var typeResourceType
class ResourceDeclareRequest (resource: Resource = <object object>, policy: PolicyResource = <object object>, bucket: BucketResource = <object object>, queue: QueueResource = <object object>, topic: TopicResource = <object object>, collection: CollectionResource = <object object>, secret: SecretResource = <object object>, api: ApiResource = <object object>)

ResourceDeclareRequest(resource: 'Resource' = , policy: 'PolicyResource' = , bucket: 'BucketResource' = , queue: 'QueueResource' = , topic: 'TopicResource' = , collection: 'CollectionResource' = , secret: 'SecretResource' = , api: 'ApiResource' = )

Expand source code
@dataclass(eq=False, repr=False)
class ResourceDeclareRequest(betterproto.Message):
    resource: "Resource" = betterproto.message_field(1)
    policy: "PolicyResource" = betterproto.message_field(10, group="config")
    bucket: "BucketResource" = betterproto.message_field(11, group="config")
    queue: "QueueResource" = betterproto.message_field(12, group="config")
    topic: "TopicResource" = betterproto.message_field(13, group="config")
    collection: "CollectionResource" = betterproto.message_field(14, group="config")
    secret: "SecretResource" = betterproto.message_field(15, group="config")
    api: "ApiResource" = betterproto.message_field(16, group="config")

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var apiApiResource
var bucketBucketResource
var collectionCollectionResource
var policyPolicyResource
var queueQueueResource
var resourceResource
var secretSecretResource
var topicTopicResource
class ResourceDeclareResponse

ResourceDeclareResponse()

Expand source code
@dataclass(eq=False, repr=False)
class ResourceDeclareResponse(betterproto.Message):
    pass

Ancestors

  • betterproto.Message
  • abc.ABC
class ResourceDetailsRequest (resource: Resource = <object object>)

ResourceDetailsRequest(resource: 'Resource' = )

Expand source code
@dataclass(eq=False, repr=False)
class ResourceDetailsRequest(betterproto.Message):
    resource: "Resource" = betterproto.message_field(1)

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var resourceResource
class ResourceDetailsResponse (id: str = <object object>, provider: str = <object object>, service: str = <object object>, api: ApiResourceDetails = <object object>)

ResourceDetailsResponse(id: str = , provider: str = , service: str = , api: 'ApiResourceDetails' = )

Expand source code
@dataclass(eq=False, repr=False)
class ResourceDetailsResponse(betterproto.Message):
    id: str = betterproto.string_field(1)
    """The identifier of the resource"""

    provider: str = betterproto.string_field(2)
    """The provider this resource is deployed with (e.g. aws)"""

    service: str = betterproto.string_field(3)
    """The service this resource is deployed on (e.g. ApiGateway)"""

    api: "ApiResourceDetails" = betterproto.message_field(10, group="details")

Ancestors

  • betterproto.Message
  • abc.ABC

Class variables

var apiApiResourceDetails
var id : str

The identifier of the resource

var provider : str

The provider this resource is deployed with (e.g. aws)

var service : str

The service this resource is deployed on (e.g. ApiGateway)

class ResourceServiceBase

Base class for async gRPC servers.

Expand source code
class ResourceServiceBase(ServiceBase):
    async def declare(
        self, resource_declare_request: "ResourceDeclareRequest"
    ) -> "ResourceDeclareResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def details(
        self, resource_details_request: "ResourceDetailsRequest"
    ) -> "ResourceDetailsResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_declare(
        self,
        stream: "grpclib.server.Stream[ResourceDeclareRequest, ResourceDeclareResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.declare(request)
        await stream.send_message(response)

    async def __rpc_details(
        self,
        stream: "grpclib.server.Stream[ResourceDetailsRequest, ResourceDetailsResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.details(request)
        await stream.send_message(response)

    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
        return {
            "/nitric.resource.v1.ResourceService/Declare": grpclib.const.Handler(
                self.__rpc_declare,
                grpclib.const.Cardinality.UNARY_UNARY,
                ResourceDeclareRequest,
                ResourceDeclareResponse,
            ),
            "/nitric.resource.v1.ResourceService/Details": grpclib.const.Handler(
                self.__rpc_details,
                grpclib.const.Cardinality.UNARY_UNARY,
                ResourceDetailsRequest,
                ResourceDetailsResponse,
            ),
        }

Ancestors

  • betterproto.grpc.grpclib_server.ServiceBase
  • abc.ABC

Methods

async def declare(self, resource_declare_request: ResourceDeclareRequest) ‑> ResourceDeclareResponse
Expand source code
async def declare(
    self, resource_declare_request: "ResourceDeclareRequest"
) -> "ResourceDeclareResponse":
    raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def details(self, resource_details_request: ResourceDetailsRequest) ‑> ResourceDetailsResponse
Expand source code
async def details(
    self, resource_details_request: "ResourceDetailsRequest"
) -> "ResourceDetailsResponse":
    raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
class ResourceServiceStub (channel: Channel, *, timeout: Optional[float] = None, deadline: Optional[ForwardRef('Deadline')] = None, metadata: Union[Mapping[str, Union[str, bytes]], Collection[Tuple[str, Union[str, bytes]]], NoneType] = None)

Base class for async gRPC clients.

Expand source code
class ResourceServiceStub(betterproto.ServiceStub):
    async def declare(
        self,
        resource_declare_request: "ResourceDeclareRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> "ResourceDeclareResponse":
        return await self._unary_unary(
            "/nitric.resource.v1.ResourceService/Declare",
            resource_declare_request,
            ResourceDeclareResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

    async def details(
        self,
        resource_details_request: "ResourceDetailsRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> "ResourceDetailsResponse":
        return await self._unary_unary(
            "/nitric.resource.v1.ResourceService/Details",
            resource_details_request,
            ResourceDetailsResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

Ancestors

  • betterproto.grpc.grpclib_client.ServiceStub
  • abc.ABC

Methods

async def declare(self, resource_declare_request: ResourceDeclareRequest, *, timeout: Optional[float] = None, deadline: Optional[ForwardRef('Deadline')] = None, metadata: Optional[ForwardRef('MetadataLike')] = None) ‑> ResourceDeclareResponse
Expand source code
async def declare(
    self,
    resource_declare_request: "ResourceDeclareRequest",
    *,
    timeout: Optional[float] = None,
    deadline: Optional["Deadline"] = None,
    metadata: Optional["MetadataLike"] = None
) -> "ResourceDeclareResponse":
    return await self._unary_unary(
        "/nitric.resource.v1.ResourceService/Declare",
        resource_declare_request,
        ResourceDeclareResponse,
        timeout=timeout,
        deadline=deadline,
        metadata=metadata,
    )
async def details(self, resource_details_request: ResourceDetailsRequest, *, timeout: Optional[float] = None, deadline: Optional[ForwardRef('Deadline')] = None, metadata: Optional[ForwardRef('MetadataLike')] = None) ‑> ResourceDetailsResponse
Expand source code
async def details(
    self,
    resource_details_request: "ResourceDetailsRequest",
    *,
    timeout: Optional[float] = None,
    deadline: Optional["Deadline"] = None,
    metadata: Optional["MetadataLike"] = None
) -> "ResourceDetailsResponse":
    return await self._unary_unary(
        "/nitric.resource.v1.ResourceService/Details",
        resource_details_request,
        ResourceDetailsResponse,
        timeout=timeout,
        deadline=deadline,
        metadata=metadata,
    )
class ResourceType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class ResourceType(betterproto.Enum):
    Api = 0
    Function = 1
    Bucket = 2
    Queue = 3
    Topic = 4
    Schedule = 5
    Subscription = 6
    Collection = 7
    Policy = 8
    Secret = 9
    Notification = 10

Ancestors

  • betterproto.Enum
  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var Api
var Bucket
var Collection
var Function
var Notification
var Policy
var Queue
var Schedule
var Secret
var Subscription
var Topic
class SecretResource

SecretResource()

Expand source code
@dataclass(eq=False, repr=False)
class SecretResource(betterproto.Message):
    pass

Ancestors

  • betterproto.Message
  • abc.ABC
class TopicResource

TopicResource()

Expand source code
@dataclass(eq=False, repr=False)
class TopicResource(betterproto.Message):
    pass

Ancestors

  • betterproto.Message
  • abc.ABC