Module nitric.resources
Nitric Python SDK API Documentation. See: https://nitric.io/docs?lang=python for full framework documentation.
Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Nitric Python SDK API Documentation. See: https://nitric.io/docs?lang=python for full framework documentation."""
from nitric.resources.apis import Api, api, MethodOptions, ApiOptions, ApiDetails, JwtSecurityDefinition
from nitric.resources.buckets import Bucket, bucket
from nitric.resources.collections import Collection, collection
from nitric.resources.queues import Queue, queue
from nitric.resources.schedules import Schedule, schedule
from nitric.resources.secrets import Secret, secret
from nitric.resources.topics import Topic, topic
__all__ = [
"api",
"Api",
"ApiOptions",
"ApiDetails",
"JwtSecurityDefinition",
"MethodOptions",
"bucket",
"Bucket",
"collection",
"Collection",
"queue",
"Queue",
"Schedule",
"schedule",
"secret",
"Secret",
"topic",
"Topic",
]
Sub-modules
nitric.resources.apis
nitric.resources.base
nitric.resources.buckets
nitric.resources.collections
nitric.resources.queues
nitric.resources.schedules
nitric.resources.secrets
nitric.resources.topics
Functions
def api(name: str, opts: ApiOptions = None) ‑> Api
-
Create a new API resource.
Expand source code
def api(name: str, opts: ApiOptions = None) -> Api: """Create a new API resource.""" return Nitric._create_resource(Api, name, opts=opts)
def bucket(name: str) ‑> Bucket
-
Create and register a bucket.
If a bucket has already been registered with the same name, the original reference will be reused.
Expand source code
def bucket(name: str) -> Bucket: """ Create and register a bucket. If a bucket has already been registered with the same name, the original reference will be reused. """ return Nitric._create_resource(Bucket, name)
def collection(name: str) ‑> Collection
-
Create and register a collection.
If a collection has already been registered with the same name, the original reference will be reused.
Expand source code
def collection(name: str) -> Collection: """ Create and register a collection. If a collection has already been registered with the same name, the original reference will be reused. """ return Nitric._create_resource(Collection, name)
def queue(name: str) ‑> Queue
-
Create and register a queue.
If a queue has already been registered with the same name, the original reference will be reused.
Expand source code
def queue(name: str) -> Queue: """ Create and register a queue. If a queue has already been registered with the same name, the original reference will be reused. """ return Nitric._create_resource(Queue, name)
def schedule(description: str, every: str)
-
Return a schedule decorator.
Expand source code
def schedule(description: str, every: str): """Return a schedule decorator.""" def decorator(func: EventMiddleware): r = Schedule(description) r.every(every, func) return r return decorator
def secret(name: str) ‑> Secret
-
Create and registers a secret.
If a secret has already been registered with the same name, the original reference will be reused.
Expand source code
def secret(name: str) -> Secret: """ Create and registers a secret. If a secret has already been registered with the same name, the original reference will be reused. """ return Nitric._create_resource(Secret, name)
def topic(name: str) ‑> Topic
-
Create and register a topic.
If a topic has already been registered with the same name, the original reference will be reused.
Expand source code
def topic(name: str) -> Topic: """ Create and register a topic. If a topic has already been registered with the same name, the original reference will be reused. """ return Nitric._create_resource(Topic, name)
Classes
class Api (name: str, opts: ApiOptions = None)
-
An HTTP API.
Construct a new HTTP API.
Expand source code
class Api(BaseResource): """An HTTP API.""" app: Nitric name: str path: str middleware: List[HttpMiddleware] routes: List[Route] security_definitions: dict[str, SecurityDefinition] security: dict[str, List[str]] def __init__(self, name: str, opts: ApiOptions = None): """Construct a new HTTP API.""" super().__init__() if opts is None: opts = ApiOptions() self.name = name self.middleware = opts.middleware if opts.middleware is not None else [] self.path = opts.path self.routes = [] self.security_definitions = opts.security_definitions self.security = opts.security async def _register(self): try: await self._resources_stub.declare( resource_declare_request=ResourceDeclareRequest( resource=_to_resource(self), api=ApiResource( security_definitions=_security_definition_to_grpc_declaration(self.security_definitions), security=_security_to_grpc_declaration(self.security), ), ) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def _route(self, match: str, opts: RouteOptions = None) -> Route: """Define an HTTP route to be handled by this API.""" if opts is None: opts = RouteOptions() r = Route(self, match, opts) self.routes.append(r) return r def all(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP GET requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.method( [ HttpMethod.GET, HttpMethod.POST, HttpMethod.PATCH, HttpMethod.PUT, HttpMethod.DELETE, HttpMethod.OPTIONS, ], function, opts=opts, ) return decorator def methods(self, methods: List[HttpMethod], match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to specific HTTP requests defined by a list of verbs.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.method(methods, function, opts=opts) return decorator def get(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP GET requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.get(function, opts=opts) return decorator def post(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP POST requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.post(function, opts=opts) return decorator def delete(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP DELETE requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.delete(function, opts=opts) return decorator def options(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP OPTIONS requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.options(function, opts=opts) return decorator def patch(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP PATCH requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.patch(function, opts=opts) return decorator def put(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP PUT requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.put(function, opts=opts) return decorator async def _details(self) -> ApiDetails: """Get the API deployment details.""" try: res = await self._resources_stub.details( resource_details_request=ResourceDetailsRequest( resource=_to_resource(self), ) ) return ApiDetails(res.id, res.provider, res.service, res.api.url) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) async def url(self) -> str: """Get the APIs live URL.""" details = await self._details() return details.url
Ancestors
- BaseResource
- abc.ABC
Class variables
var app : Nitric
var middleware : List[Callable[[HttpContext, Coroutine[Any, Any, Optional[HttpContext]]], Coroutine[Any, Any, Optional[HttpContext]]]]
var name : str
var path : str
var routes : List[Route]
var security : dict[str, typing.List[str]]
var security_definitions : dict[str, JwtSecurityDefinition]
Methods
def all(self, match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to HTTP GET requests.
Expand source code
def all(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP GET requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.method( [ HttpMethod.GET, HttpMethod.POST, HttpMethod.PATCH, HttpMethod.PUT, HttpMethod.DELETE, HttpMethod.OPTIONS, ], function, opts=opts, ) return decorator
def delete(self, match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to HTTP DELETE requests.
Expand source code
def delete(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP DELETE requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.delete(function, opts=opts) return decorator
def get(self, match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to HTTP GET requests.
Expand source code
def get(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP GET requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.get(function, opts=opts) return decorator
def methods(self, methods: List[HttpMethod], match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to specific HTTP requests defined by a list of verbs.
Expand source code
def methods(self, methods: List[HttpMethod], match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to specific HTTP requests defined by a list of verbs.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.method(methods, function, opts=opts) return decorator
def options(self, match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to HTTP OPTIONS requests.
Expand source code
def options(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP OPTIONS requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.options(function, opts=opts) return decorator
def patch(self, match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to HTTP PATCH requests.
Expand source code
def patch(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP PATCH requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.patch(function, opts=opts) return decorator
def post(self, match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to HTTP POST requests.
Expand source code
def post(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP POST requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.post(function, opts=opts) return decorator
def put(self, match: str, opts: MethodOptions = None)
-
Define an HTTP route which will respond to HTTP PUT requests.
Expand source code
def put(self, match: str, opts: MethodOptions = None): """Define an HTTP route which will respond to HTTP PUT requests.""" if opts is None: opts = MethodOptions() def decorator(function: HttpMiddleware): r = self._route(match) r.put(function, opts=opts) return decorator
async def url(self) ‑> str
-
Get the APIs live URL.
Expand source code
async def url(self) -> str: """Get the APIs live URL.""" details = await self._details() return details.url
Inherited members
class ApiDetails (id: str, provider: str, service: str, url: str)
-
Represents the APIs deployment details.
Expand source code
@dataclass class ApiDetails: """Represents the APIs deployment details.""" # the identifier of the resource id: str # The provider this resource is deployed with (e.g. aws) provider: str # The service this resource is deployed on (e.g. ApiGateway) service: str # The url of the API url: str
Class variables
var id : str
var provider : str
var service : str
var url : str
class ApiOptions (path: str = '', middleware: List[Middleware] = [], security_definitions: dict[str, SecurityDefinition] = {}, security: dict[str, List[str]] = {})
-
Represents options when creating an API, such as middleware to be applied to all HTTP request to the API.
Construct a new API options object.
Expand source code
class ApiOptions: """Represents options when creating an API, such as middleware to be applied to all HTTP request to the API.""" path: str middleware: Union[HttpMiddleware, List[HttpMiddleware]] security_definitions: dict[str, SecurityDefinition] security: dict[str, List[str]] def __init__( self, path: str = "", middleware: List[Middleware] = [], security_definitions: dict[str, SecurityDefinition] = {}, security: dict[str, List[str]] = {}, ): """Construct a new API options object.""" self.middleware = middleware self.security_definitions = security_definitions self.security = security self.path = path
Class variables
var middleware : Union[Callable[[HttpContext, Coroutine[Any, Any, Optional[HttpContext]]], Coroutine[Any, Any, Optional[HttpContext]]], List[Callable[[HttpContext, Coroutine[Any, Any, Optional[HttpContext]]], Coroutine[Any, Any, Optional[HttpContext]]]]]
var path : str
var security : dict[str, typing.List[str]]
var security_definitions : dict[str, JwtSecurityDefinition]
class Bucket (name: str)
-
A bucket resource, used for storage and retrieval of blob/binary data.
Create a bucket with the name provided or references it if it already exists.
Expand source code
class Bucket(SecureResource): """A bucket resource, used for storage and retrieval of blob/binary data.""" name: str actions: List[Action] _server: FunctionServer def __init__(self, name: str): """Create a bucket with the name provided or references it if it already exists.""" super().__init__() self.name = name async def _register(self): try: await self._resources_stub.declare( resource_declare_request=ResourceDeclareRequest(resource=self._to_resource()) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def _perms_to_actions(self, *args: List[Union[BucketPermission, str]]) -> List[Action]: permission_actions_map = { BucketPermission.reading: [Action.BucketFileGet, Action.BucketFileList], BucketPermission.writing: [Action.BucketFilePut], BucketPermission.deleting: [Action.BucketFileDelete], } # convert strings to the enum value where needed perms = [ permission if isinstance(permission, BucketPermission) else BucketPermission[permission.lower()] for permission in args ] return [action for perm in perms for action in permission_actions_map[perm]] def _to_resource(self) -> Resource: return Resource(name=self.name, type=ResourceType.Bucket) def allow(self, *args: Union[BucketPermission, str]) -> BucketRef: """Request the required permissions for this resource.""" self._register_policy(*args) return Storage().bucket(self.name) def on(self, notification_type: str, notification_prefix_filter: str): """Create and return a bucket notification decorator for this bucket.""" print("this has been called") def decorator(func: BucketNotificationMiddleware): print("this has been called") self._server = FunctionServer( BucketNotificationWorkerOptions( bucket_name=self.name, notification_type=notification_type, notification_prefix_filter=notification_prefix_filter, ) ) self._server.bucket_notification(func) return Nitric._register_worker(self._server) return decorator
Ancestors
- SecureResource
- BaseResource
- abc.ABC
Class variables
var actions : List[Action]
var name : str
Methods
def allow(self, *args: Union[BucketPermission, str]) ‑> BucketRef
-
Request the required permissions for this resource.
Expand source code
def allow(self, *args: Union[BucketPermission, str]) -> BucketRef: """Request the required permissions for this resource.""" self._register_policy(*args) return Storage().bucket(self.name)
def on(self, notification_type: str, notification_prefix_filter: str)
-
Create and return a bucket notification decorator for this bucket.
Expand source code
def on(self, notification_type: str, notification_prefix_filter: str): """Create and return a bucket notification decorator for this bucket.""" print("this has been called") def decorator(func: BucketNotificationMiddleware): print("this has been called") self._server = FunctionServer( BucketNotificationWorkerOptions( bucket_name=self.name, notification_type=notification_type, notification_prefix_filter=notification_prefix_filter, ) ) self._server.bucket_notification(func) return Nitric._register_worker(self._server) return decorator
Inherited members
class Collection (name: str)
-
A document collection resource.
Construct a new document collection.
Expand source code
class Collection(SecureResource): """A document collection resource.""" def __init__(self, name: str): """Construct a new document collection.""" super().__init__() self.name = name async def _register(self): try: await self._resources_stub.declare( resource_declare_request=ResourceDeclareRequest(resource=self._to_resource()) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def _to_resource(self) -> Resource: return Resource(name=self.name, type=ResourceType.Collection) def _perms_to_actions(self, *args: Union[CollectionPermission, str]) -> List[Action]: permission_actions_map = { CollectionPermission.reading: [ Action.CollectionDocumentRead, Action.CollectionQuery, Action.CollectionList, ], CollectionPermission.writing: [Action.CollectionDocumentWrite, Action.CollectionList], CollectionPermission.deleting: [Action.CollectionDocumentDelete, Action.CollectionList], } # convert strings to the enum value where needed perms = [ permission if isinstance(permission, CollectionPermission) else CollectionPermission[permission.lower()] for permission in args ] return [action for perm in perms for action in permission_actions_map[perm]] def allow(self, *args: Union[CollectionPermission, str]) -> CollectionRef: """Request the required permissions for this collection.""" # Ensure registration of the resource is complete before requesting permissions. self._register_policy(*args) return Documents().collection(self.name)
Ancestors
- SecureResource
- BaseResource
- abc.ABC
Methods
def allow(self, *args: Union[CollectionPermission, str]) ‑> CollectionRef
-
Request the required permissions for this collection.
Expand source code
def allow(self, *args: Union[CollectionPermission, str]) -> CollectionRef: """Request the required permissions for this collection.""" # Ensure registration of the resource is complete before requesting permissions. self._register_policy(*args) return Documents().collection(self.name)
Inherited members
class JwtSecurityDefinition (issuer: str, audiences: List[str])
-
Represents the JWT security definition for an API.
issuer (str): the JWT issuer audiences (List[str]): a list of the allowed audiences for the API
Expand source code
@dataclass class JwtSecurityDefinition: """ Represents the JWT security definition for an API. issuer (str): the JWT issuer audiences (List[str]): a list of the allowed audiences for the API """ issuer: str audiences: List[str]
Class variables
var audiences : List[str]
var issuer : str
class MethodOptions (security: dict[str, List[str]] = None)
-
Represents options when defining a method handler.
Construct a new HTTP method options object.
Expand source code
class MethodOptions: """Represents options when defining a method handler.""" security: dict[str, List[str]] def __init__(self, security: dict[str, List[str]] = None): """Construct a new HTTP method options object.""" self.security = security
Class variables
var security : dict[str, typing.List[str]]
class Queue (name: str)
-
A queue resource.
Construct a new queue resource.
Expand source code
class Queue(SecureResource): """A queue resource.""" name: str actions: List[Action] def __init__(self, name: str): """Construct a new queue resource.""" super().__init__() self.name = name def _to_resource(self) -> Resource: return Resource(name=self.name, type=ResourceType.Queue) def _perms_to_actions(self, *args: Union[QueuePermission, str]) -> List[Action]: permission_actions_map = { QueuePermission.sending: [Action.QueueSend, Action.QueueList, Action.QueueDetail], QueuePermission.receiving: [Action.QueueReceive, Action.QueueList, Action.QueueDetail], } # convert strings to the enum value where needed perms = [ permission if isinstance(permission, QueuePermission) else QueuePermission[permission.lower()] for permission in args ] return [action for perm in perms for action in permission_actions_map[perm]] async def _register(self): try: await self._resources_stub.declare( resource_declare_request=ResourceDeclareRequest(resource=self._to_resource()) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def allow(self, *args: Union[QueuePermission, str]) -> QueueRef: """Request the required permissions for this queue.""" # Ensure registration of the resource is complete before requesting permissions. self._register_policy(*args) return Queues().queue(self.name)
Ancestors
- SecureResource
- BaseResource
- abc.ABC
Class variables
var actions : List[Action]
var name : str
Methods
def allow(self, *args: Union[QueuePermission, str]) ‑> QueueRef
-
Request the required permissions for this queue.
Expand source code
def allow(self, *args: Union[QueuePermission, str]) -> QueueRef: """Request the required permissions for this queue.""" # Ensure registration of the resource is complete before requesting permissions. self._register_policy(*args) return Queues().queue(self.name)
Inherited members
class Schedule (description: str)
-
A schedule for running functions on a cadence.
Construct a new schedule.
Expand source code
class Schedule: """A schedule for running functions on a cadence.""" description: str server: FunctionServer def start(self): """Start the function server that executes the scheduled middleware.""" return self.server.start() def __init__(self, description: str): """Construct a new schedule.""" self.description = description def every(self, rate_description: str, *middleware: EventMiddleware): """ Register middleware to be run at the specified rate. E.g. every("3 hours") """ rate_description = rate_description.lower() if not any([frequency in rate_description for frequency in Frequency.as_str_list()]): # handle singular frequencies. e.g. every('day') rate_description = f"1 {rate_description}s" # 'day' becomes '1 days' try: rate, freq_str = rate_description.split(" ") freq = Frequency.from_str(freq_str) except Exception: raise Exception(f"invalid rate expression, frequency must be one of {Frequency.as_str_list()}") if not rate.isdigit(): raise Exception("invalid rate expression, expression must begin with a positive integer") opts = RateWorkerOptions(self.description, int(rate), freq) self.server = FunctionServer(opts) self.server.event(*middleware) return Nitric._register_worker(self.server)
Class variables
var description : str
var server : FunctionServer
Methods
def every(self, rate_description: str, *middleware: EventMiddleware)
-
Register middleware to be run at the specified rate.
E.g. every("3 hours")
Expand source code
def every(self, rate_description: str, *middleware: EventMiddleware): """ Register middleware to be run at the specified rate. E.g. every("3 hours") """ rate_description = rate_description.lower() if not any([frequency in rate_description for frequency in Frequency.as_str_list()]): # handle singular frequencies. e.g. every('day') rate_description = f"1 {rate_description}s" # 'day' becomes '1 days' try: rate, freq_str = rate_description.split(" ") freq = Frequency.from_str(freq_str) except Exception: raise Exception(f"invalid rate expression, frequency must be one of {Frequency.as_str_list()}") if not rate.isdigit(): raise Exception("invalid rate expression, expression must begin with a positive integer") opts = RateWorkerOptions(self.description, int(rate), freq) self.server = FunctionServer(opts) self.server.event(*middleware) return Nitric._register_worker(self.server)
def start(self)
-
Start the function server that executes the scheduled middleware.
Expand source code
def start(self): """Start the function server that executes the scheduled middleware.""" return self.server.start()
class Secret (name: str)
-
A secret resource, used for storing and retrieving secret versions and values.
Construct a new secret resource reference.
Expand source code
class Secret(SecureResource): """A secret resource, used for storing and retrieving secret versions and values.""" name: str actions: List[Action] def __init__(self, name: str): """Construct a new secret resource reference.""" super().__init__() self.name = name def _to_resource(self) -> Resource: return Resource(name=self.name, type=ResourceType.Secret) async def _register(self): try: await self._resources_stub.declare( resource_declare_request=ResourceDeclareRequest(resource=self._to_resource()) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def _perms_to_actions(self, *args: Union[SecretPermission, str]) -> List[Action]: permissions_actions_map = { SecretPermission.accessing: [Action.SecretAccess], SecretPermission.putting: [Action.SecretPut], } # convert strings to the enum value where needed perms = [ permission if isinstance(permission, SecretPermission) else SecretPermission[permission.lower()] for permission in args ] return [action for perm in perms for action in permissions_actions_map[perm]] def allow(self, *args: Union[SecretPermission, str]) -> SecretContainerRef: """Request the specified permissions to this resource.""" self._register_policy(*args) return Secrets().secret(self.name)
Ancestors
- SecureResource
- BaseResource
- abc.ABC
Class variables
var actions : List[Action]
var name : str
Methods
def allow(self, *args: Union[SecretPermission, str]) ‑> SecretContainerRef
-
Request the specified permissions to this resource.
Expand source code
def allow(self, *args: Union[SecretPermission, str]) -> SecretContainerRef: """Request the specified permissions to this resource.""" self._register_policy(*args) return Secrets().secret(self.name)
Inherited members
class Topic (name: str)
-
A topic resource, used for asynchronous messaging between functions.
Construct a new topic.
Expand source code
class Topic(SecureResource): """A topic resource, used for asynchronous messaging between functions.""" name: str actions: List[Action] def __init__(self, name: str): """Construct a new topic.""" super().__init__() self.name = name async def _register(self): try: await self._resources_stub.declare( resource_declare_request=ResourceDeclareRequest(resource=self._to_resource()) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) def _to_resource(self) -> Resource: return Resource(name=self.name, type=ResourceType.Topic) def _perms_to_actions(self, *args: Union[TopicPermission, str]) -> List[Action]: _permMap = {TopicPermission.publishing: [Action.TopicEventPublish]} # convert strings to the enum value where needed perms = [ permission if isinstance(permission, TopicPermission) else TopicPermission[permission.lower()] for permission in args ] return [action for perm in perms for action in _permMap[perm]] def allow(self, *args: Union[TopicPermission, str]) -> TopicRef: """Request the specified permissions to this resource.""" self._register_policy(*args) return Events().topic(self.name) def subscribe(self): """Create and return a subscription decorator for this topic.""" def decorator(func: EventMiddleware): self.server = FunctionServer(SubscriptionWorkerOptions(topic=self.name)) self.server.event(func) Nitric._register_worker(self.server) return decorator
Ancestors
- SecureResource
- BaseResource
- abc.ABC
Class variables
var actions : List[Action]
var name : str
Methods
def allow(self, *args: Union[TopicPermission, str]) ‑> TopicRef
-
Request the specified permissions to this resource.
Expand source code
def allow(self, *args: Union[TopicPermission, str]) -> TopicRef: """Request the specified permissions to this resource.""" self._register_policy(*args) return Events().topic(self.name)
def subscribe(self)
-
Create and return a subscription decorator for this topic.
Expand source code
def subscribe(self): """Create and return a subscription decorator for this topic.""" def decorator(func: EventMiddleware): self.server = FunctionServer(SubscriptionWorkerOptions(topic=self.name)) self.server.event(func) Nitric._register_worker(self.server) return decorator
Inherited members