Module nitric.faas

Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
from enum import Enum

import functools
import json
import traceback
from typing import Dict, Union, List, TypeVar, Callable, Coroutine, Any, Optional
from opentelemetry import context, propagate

import betterproto
from betterproto.grpc.util.async_channel import AsyncChannel
from nitric.utils import new_default_channel
from nitric.proto.nitric.faas.v1 import (
    FaasServiceStub,
    InitRequest,
    ClientMessage,
    TriggerRequest,
    TriggerResponse,
    HeaderValue,
    HttpResponseContext,
    TopicResponseContext,
    ScheduleWorker,
    ApiWorker,
    SubscriptionWorker,
    ScheduleRate,
    BucketNotificationWorker,
    BucketNotificationConfig,
    BucketNotificationType,
    NotificationResponseContext,
)
import grpclib
import asyncio
from abc import ABC

Record = Dict[str, Union[str, List[str]]]
PROPAGATOR = propagate.get_global_textmap()


class HttpMethod(Enum):
    """Valid query expression operators."""

    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    DELETE = "DELETE"
    PATCH = "PATCH"
    OPTIONS = "OPTIONS"

    def __str__(self):
        return str(self.value)


class Request(ABC):
    """Represents an abstract trigger request."""

    def __init__(self, data: bytes, trace_context: Dict[str, str]):
        """Construct a new Request."""
        self.data = data
        self.trace_context = trace_context


class Response(ABC):
    """Represents an abstract trigger response."""

    pass


class TriggerContext(ABC):
    """Represents an abstract request/response context for any trigger."""

    def http(self) -> Union[HttpContext, None]:
        """Return this context as an HttpContext if it is one, otherwise returns None."""
        return None

    def event(self) -> Union[EventContext, None]:
        """Return this context as an EventContext if it is one, otherwise returns None."""
        return None

    def bucket_notification(self) -> Union[BucketNotificationContext, None]:
        """Return this context as a BucketNotificationContext if it is one, otherwise returns None."""
        return None


def _ctx_from_grpc_trigger_request(trigger_request: TriggerRequest, options: FaasWorkerOptions = None):
    """Return a TriggerContext from a TriggerRequest."""
    context_type, _ = betterproto.which_one_of(trigger_request, "context")
    if context_type == "http":
        return HttpContext.from_grpc_trigger_request(trigger_request)
    elif context_type == "topic":
        return EventContext.from_grpc_trigger_request(trigger_request)
    elif context_type == "notification":
        if isinstance(options, FileNotificationWorkerOptions):
            return FileNotificationContext.from_grpc_trigger_request(trigger_request, options)
        else:
            return BucketNotificationContext.from_grpc_trigger_request(trigger_request)
    else:
        print(f"Trigger with unknown context received, context type: {context_type}")
        raise Exception(f"Unknown trigger context, type: {context_type}")


def _grpc_response_from_ctx(ctx: TriggerContext) -> TriggerResponse:
    """
    Create a GRPC TriggerResponse from a TriggerContext.

    The ctx is used to determine the appropriate TriggerResponse content,
    the ctx.res is then used to construct the response.
    """
    if ctx.http():
        ctx = ctx.http()
        headers = {k: HeaderValue(value=v) for (k, v) in ctx.res.headers.items()}
        headers_old = {k: v[0] for (k, v) in ctx.res.headers.items()}
        data = ctx.res.body if ctx.res.body else bytes()

        return TriggerResponse(
            data=data,
            http=HttpResponseContext(status=ctx.res.status, headers=headers, headers_old=headers_old),
        )
    elif ctx.event():
        ctx = ctx.event()
        return TriggerResponse(
            topic=TopicResponseContext(
                success=ctx.res.success,
            ),
        )
    elif ctx.bucket_notification():
        ctx = ctx.bucket_notification()
        return TriggerResponse(notification=NotificationResponseContext(success=ctx.res.success))
    else:
        raise Exception("Unknown Trigger Context type, unable to return valid response")


# ====== HTTP ======


class HttpRequest(Request):
    """Represents a translated Http Request forwarded from the Nitric Membrane."""

    def __init__(
        self,
        data: bytes,
        method: str,
        path: str,
        params: Dict[str, str],
        query: Record,
        headers: Record,
        trace_context: Dict[str, str],
    ):
        """Construct a new HttpRequest."""
        super().__init__(data, trace_context)
        self.method = method
        self.path = path
        self.params = params
        self.query = query
        self.headers = headers

    @property
    def json(self) -> Optional[Any]:
        """Get the body of the request as JSON, returns None if request body is not JSON."""
        try:
            return json.loads(self.body)
        except json.JSONDecodeError:
            return None
        except TypeError:
            return None

    @property
    def body(self):
        """Get the body of the request as text."""
        return self.data.decode("utf-8")


class HttpResponse(Response):
    """Represents an HTTP Response to be generated by the Nitric Membrane in response to an HTTP Request Trigger."""

    def __init__(self, status: int = 200, headers: Record = None, body: bytes = None):
        """Construct a new HttpResponse."""
        self.status = status
        self.headers = headers if headers else {}
        self._body = body if body else bytes()

    @property
    def body(self):
        """Return the HTTP response body."""
        return self._body

    @body.setter
    def body(self, value: Union[str, bytes, Any]):
        if isinstance(value, str):
            self._body = value.encode("utf-8")
        elif isinstance(value, bytes):
            self._body = value
        else:
            self._body = json.dumps(value).encode("utf-8")
            self.headers["Content-Type"] = ["application/json"]


class HttpContext(TriggerContext):
    """Represents the full request/response context for an Http based trigger."""

    def __init__(self, request: HttpRequest, response: HttpResponse = None):
        """Construct a new HttpContext."""
        super().__init__()
        self.req = request
        self.res = response if response else HttpResponse()

    def http(self) -> HttpContext:
        """Return this HttpContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(trigger_request: TriggerRequest) -> HttpContext:
        """Construct a new HttpContext from an Http trigger from the Nitric Membrane."""
        if len(trigger_request.http.headers.keys()) > 0:
            headers = {k: v.value for (k, v) in trigger_request.http.headers.items()}
        else:
            headers = trigger_request.http.headers_old

        if len(trigger_request.http.query_params.keys()) > 0:
            query = {k: v.value for (k, v) in trigger_request.http.query_params.items()}
        else:
            query = trigger_request.http.query_params_old

        return HttpContext(
            request=HttpRequest(
                data=trigger_request.data,
                method=trigger_request.http.method,
                query=query,
                path=trigger_request.http.path,
                params={k: v for (k, v) in trigger_request.http.path_params.items()},
                headers=headers,
                trace_context=trigger_request.trace_context.values,
            )
        )


class EventRequest(Request):
    """Represents a translated Event, from a Subscribed Topic, forwarded from the Nitric Membrane."""

    def __init__(self, data: bytes, topic: str, trace_context: Dict[str, str]):
        """Construct a new EventRequest."""
        super().__init__(data, trace_context)
        self.topic = topic

    @property
    def payload(self) -> bytes:
        """Return the payload of this request as text."""
        return json.loads(self.data.decode("utf-8"))


class EventResponse(Response):
    """Represents the response to a trigger from an Event as a result of a Topic subscription."""

    def __init__(self, success: bool = True):
        """Construct a new EventResponse."""
        self.success = success


class EventContext(TriggerContext):
    """Represents the full request/response context for an Event based trigger."""

    def __init__(self, request: EventRequest, response: EventResponse = None):
        """Construct a new EventContext."""
        super().__init__()
        self.req = request
        self.res = response if response else EventResponse()

    def event(self) -> EventContext:
        """Return this EventContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(trigger_request: TriggerRequest):
        """Construct a new EventContext from an Event trigger from the Nitric Membrane."""
        return EventContext(
            request=EventRequest(
                data=trigger_request.data,
                topic=trigger_request.topic.topic,
                trace_context=trigger_request.trace_context.values,
            )
        )


class BucketNotificationRequest(Request):
    """Represents a translated Event, from a Subscribed Topic, forwarded from the Nitric Membrane."""

    def __init__(self, data: bytes, key: str, notification_type: BucketNotificationType, trace_context: Dict[str, str]):
        """Construct a new EventRequest."""
        super().__init__(data, trace_context)

        self.key = key
        self.notification_type = notification_type


class BucketNotificationResponse(Response):
    """Represents the response to a trigger from a Bucket"""

    def __init__(self, success: bool = True):
        """Construct a new BucketNotificationResponse."""
        self.success = success


class BucketNotificationContext(TriggerContext):
    """Represents the full request/response context for a bucket notification trigger."""

    def __init__(self, request: BucketNotificationRequest, response: BucketNotificationResponse = None):
        """Construct a new BucketNotificationContext."""
        super().__init__()
        self.req = request
        self.res = response if response else BucketNotificationResponse()

    def bucket_notification(self) -> BucketNotificationContext:
        """Return this BucketNotificationContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(trigger_request: TriggerRequest) -> BucketNotificationContext:
        """Construct a new BucketNotificationContext from a Bucket Notification trigger from the Nitric Membrane."""
        return BucketNotificationContext(
            request=BucketNotificationRequest(
                data=trigger_request.data,
                key=trigger_request.notification.bucket.key,
                notification_type=trigger_request.notification.bucket.type,
                trace_context=trigger_request.trace_context.values,
            )
        )


class FileNotificationRequest(BucketNotificationRequest):
    def __init__(
        self,
        data: bytes,
        bucket_ref: Any,  # can't import BucketRef due to circular dependency problems
        key: str,
        notification_type: BucketNotificationType,
        trace_context: Dict[str, str],
    ):
        super().__init__(data=data, key=key, notification_type=notification_type, trace_context=trace_context)
        self.file = bucket_ref.file(key)


class FileNotificationContext(TriggerContext):
    """Represents the full request/response context for an Http based trigger."""

    def __init__(self, request: FileNotificationRequest, response: BucketNotificationResponse = None):
        """Construct a new FileNotificationContext."""
        super().__init__()
        self.req = request
        self.res = response if response else BucketNotificationResponse()

    def bucket_notification(self) -> FileNotificationContext:
        """Return this FileNotificationContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(
        trigger_request: TriggerRequest, options: FileNotificationWorkerOptions
    ) -> FileNotificationContext:
        """Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane."""
        return FileNotificationContext(
            request=FileNotificationRequest(
                data=trigger_request.data,
                key=trigger_request.notification.bucket.key,
                bucket_ref=options.bucket_ref,
                notification_type=trigger_request.notification.bucket.type,
                trace_context=trigger_request.trace_context.values,
            )
        )


class RateWorkerOptions:
    """Options for rate workers."""

    description: str
    rate: int
    frequency: Frequency

    def __init__(self, description: str, rate: int, frequency: Frequency):
        """Construct a new options object."""
        self.description = description
        self.rate = rate
        self.frequency = frequency


class BucketNotificationWorkerOptions:
    """Options for bucket notification workers."""

    def __init__(self, bucket_name: str, notification_type: str, notification_prefix_filter: str):
        """Construct a new options object."""
        self.bucket_name = bucket_name
        self.notification_type = BucketNotificationWorkerOptions._to_grpc_event_type(notification_type.lower())
        self.notification_prefix_filter = notification_prefix_filter

    @staticmethod
    def _to_grpc_event_type(event_type: str) -> BucketNotificationType:
        if event_type == "write":
            return BucketNotificationType.Created
        elif event_type == "delete":
            return BucketNotificationType.Deleted
        else:
            raise ValueError(f"Event type {event_type} is unsupported")


class FileNotificationWorkerOptions(BucketNotificationWorkerOptions):
    """Options for bucket notification workers with file references."""

    def __init__(self, bucket, notification_type: str, notification_prefix_filter: str):
        super().__init__(bucket.name, notification_type, notification_prefix_filter)

        self.bucket_ref = bucket


class ApiWorkerOptions:
    """Options for API workers."""

    def __init__(self, api: str, route: str, methods: List[Union[str, HttpMethod]], opts: MethodOptions):
        """Construct a new options object."""
        self.api = api
        self.route = route
        self.methods = [str(method) for method in methods]
        self.opts = opts


class MethodOptions:
    """Represents options when defining a method handler."""

    security: dict[str, List[str]]

    def __init__(self, security: dict[str, List[str]] = None):
        """Construct a new HTTP method options object."""
        self.security = security


class SubscriptionWorkerOptions:
    """Options for subscription workers."""

    def __init__(self, topic: str):
        """Construct a new options object."""
        self.topic = topic


class Frequency(Enum):
    """Valid schedule frequencies."""

    seconds = "seconds"
    minutes = "minutes"
    hours = "hours"
    days = "days"

    @staticmethod
    def from_str(value: str) -> Frequency:
        """Convert a string frequency value to a Frequency."""
        try:
            return Frequency[value.strip().lower()]
        except Exception:
            raise ValueError(f"{value} is not valid frequency")

    @staticmethod
    def as_str_list() -> List[str]:
        """Return all frequency values as a list of strings."""
        return [str(frequency.value) for frequency in Frequency]


class FaasWorkerOptions:
    """Empty worker options for generic function handlers."""

    pass


FaasClientOptions = Union[
    ApiWorkerOptions,
    RateWorkerOptions,
    SubscriptionWorkerOptions,
    BucketNotificationWorkerOptions,
    FileNotificationWorkerOptions,
    FaasWorkerOptions,
]


# Defining Function Handlers and Middleware
C = TypeVar("C", TriggerContext, HttpContext, EventContext)
Middleware = Callable
Handler = Coroutine[Any, Any, C]
HttpHandler = Coroutine[Any, Any, Optional[HttpContext]]
EventHandler = Coroutine[Any, Any, Optional[EventContext]]
BucketNotificationHandler = Coroutine[Any, Any, Optional[BucketNotificationContext]]
FileNotificationHandler = Coroutine[Any, Any, Optional[FileNotificationContext]]

Middleware = Callable[[C, Middleware], Handler]
HttpMiddleware = Callable[[HttpContext, HttpHandler], HttpHandler]
EventMiddleware = Callable[[EventContext, EventHandler], EventHandler]
BucketNotificationMiddleware = Callable[
    [BucketNotificationContext, BucketNotificationHandler], BucketNotificationHandler
]
FileNotificationMiddleware = Callable[[FileNotificationContext, FileNotificationHandler], FileNotificationHandler]


def compose_middleware(*middlewares: Union[Middleware, List[Middleware]]) -> Middleware:
    """
    Compose multiple middleware functions into a single middleware function.

    The resulting middleware will effectively be a chain of the provided middleware,
    where each calls the next in the chain when they're successful.
    """
    middlewares = list(middlewares)
    if len(middlewares) == 1 and not isinstance(middlewares[0], list):
        return middlewares[0]

    middlewares = [compose_middleware(m) if isinstance(m, list) else m for m in middlewares]

    async def handler(ctx, next_middleware=lambda ctx: ctx):
        def reduce_chain(acc_next, cur):
            async def chained_middleware(mw_ctx):
                # Count the positional arguments to determine if the function is a handler or middleware.
                all_args = cur.__code__.co_argcount
                kwargs = len(cur.__defaults__) if cur.__defaults__ is not None else 0
                pos_args = all_args - kwargs
                if pos_args == 2:
                    # Call the middleware with next and return the result
                    return (await cur(mw_ctx, acc_next)) if asyncio.iscoroutinefunction(cur) else cur(mw_ctx, acc_next)
                else:
                    # Call the handler with ctx only, then call the remainder of the middleware chain
                    result = (await cur(mw_ctx)) if asyncio.iscoroutinefunction(cur) else cur(mw_ctx)
                    return (await acc_next(result)) if asyncio.iscoroutinefunction(acc_next) else acc_next(result)

            return chained_middleware

        middleware_chain = functools.reduce(reduce_chain, reversed(middlewares + [next_middleware]))
        return await middleware_chain(ctx)

    return handler


# ====== Function Server ======


def _create_internal_error_response(req: TriggerRequest) -> TriggerResponse:
    """Create a general error response based on the trigger request type."""
    context_type, ctx = betterproto.which_one_of(req, "context")
    if context_type == "http":
        return TriggerResponse(data=bytes(), http=HttpResponseContext(status=500))
    elif context_type == "topic":
        return TriggerResponse(data=bytes(), topic=TopicResponseContext(success=False))
    else:
        raise Exception(f"Unknown trigger type: {context_type}, unable to generate expected response")


class FunctionServer:
    """A Function as a Service server, which acts as a faas handler for the Nitric Membrane."""

    def __init__(self, opts: FaasClientOptions):
        """Construct a new function server."""
        self.__http_handler = None
        self.__event_handler = None
        self.__bucket_notification_handler = None
        self._any_handler = None
        self._opts = opts

    def http(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
        """
        Register one or more HTTP Trigger Handlers or Middleware.

        When multiple handlers are provided, they will be called in order.
        """
        self.__http_handler = compose_middleware(*handlers)
        return self

    def event(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
        """
        Register one or more Event Trigger Handlers or Middleware.

        When multiple handlers are provided, they will be called in order.
        """
        self.__event_handler = compose_middleware(*handlers)
        return self

    def bucket_notification(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
        """
        Register one or more Bucket Notification Trigger Handlers or Middleware.

        When multiple handlers are provided, they will be called in order.
        """
        self.__bucket_notification_handler = compose_middleware(*handlers)
        return self

    async def start(self, *handlers: Union[Middleware, List[Middleware]]):
        """Start the function server using the provided trigger handlers."""
        self._any_handler = compose_middleware(*handlers) if len(handlers) > 0 else None
        if (
            not self._any_handler
            and not self._http_handler
            and not self._event_handler
            and not self.__bucket_notification_handler
        ):
            raise Exception("At least one handler function must be provided.")

        await self._run()

    @property
    def _http_handler(self):
        return self.__http_handler if self.__http_handler else self._any_handler

    @property
    def _event_handler(self):
        return self.__event_handler if self.__event_handler else self._any_handler

    @property
    def _bucket_notification_handler(self):
        return self.__bucket_notification_handler if self.__bucket_notification_handler else self._any_handler

    async def _run(self):
        """Register a new FaaS worker with the Membrane, using the provided function as the handler."""
        channel = new_default_channel()
        client = FaasServiceStub(channel)
        request_channel = AsyncChannel(close=True)
        # We can start be sending all the requests we already have
        try:
            init_request = InitRequest()
            # Construct init request based on API worker options
            if isinstance(self._opts, ApiWorkerOptions):
                init_request = InitRequest(
                    api=ApiWorker(api=self._opts.api, path=self._opts.route, methods=self._opts.methods)
                )
            elif isinstance(self._opts, RateWorkerOptions):
                # TODO: Populate rate
                init_request = InitRequest(
                    schedule=ScheduleWorker(
                        key=self._opts.description, rate=ScheduleRate(rate=f"{self._opts.rate} {self._opts.frequency}")
                    )
                )
            elif isinstance(self._opts, SubscriptionWorkerOptions):
                init_request = InitRequest(subscription=SubscriptionWorker(topic=self._opts.topic))
            elif isinstance(self._opts, BucketNotificationWorkerOptions) or isinstance(
                self._opts, FileNotificationWorkerOptions
            ):
                config = BucketNotificationConfig(
                    notification_type=self._opts.notification_type,
                    notification_prefix_filter=self._opts.notification_prefix_filter,
                )
                init_request = InitRequest(
                    bucket_notification=BucketNotificationWorker(bucket=self._opts.bucket_name, config=config)
                )

            # let the membrane server know we're ready to start
            await request_channel.send(ClientMessage(init_request=init_request))
            async for srv_msg in client.trigger_stream(request_channel):
                # The response iterator will remain active until the connection is closed
                msg_type, val = betterproto.which_one_of(srv_msg, "content")

                if msg_type == "init_response":
                    print("function connected to Membrane")
                    # We don't need to reply
                    # proceed to the next available message
                    continue
                if msg_type == "trigger_request":
                    ctx = _ctx_from_grpc_trigger_request(srv_msg.trigger_request, self._opts)

                    try:
                        if len(ctx.req.trace_context) > 0:
                            context.attach(PROPAGATOR().extract(ctx.req.trace_context))

                        if ctx.http():
                            func = self._http_handler
                        elif ctx.event():
                            func = self._event_handler
                        elif ctx.bucket_notification():
                            func = self._bucket_notification_handler
                        else:
                            func = self._any_handler

                        response_ctx = (await func(ctx)) if asyncio.iscoroutinefunction(func) else func(ctx)

                        if response_ctx is None:
                            response_ctx = ctx

                        # Send function response back to server
                        await request_channel.send(
                            ClientMessage(
                                id=srv_msg.id,
                                trigger_response=_grpc_response_from_ctx(response_ctx),
                            )
                        )
                    except Exception:
                        # Any unhandled exceptions in the above code will end the loop
                        # and stop processing future triggers, we catch them here as a last resort.
                        print("An unexpected error occurred processing trigger or response")
                        traceback.print_exc()
                        response = _create_internal_error_response(srv_msg.trigger_request)
                        await request_channel.send(ClientMessage(id=srv_msg.id, trigger_response=response))
                else:
                    print(f"unhandled message type {msg_type}, skipping")
                    continue
                if request_channel.done():
                    break
        except grpclib.exceptions.StreamTerminatedError:
            print("stream from Membrane closed, closing client stream")
        except asyncio.CancelledError:
            # Membrane has closed stream after init
            print("stream from Membrane closed, closing client stream")
        except ConnectionRefusedError as cre:
            traceback.print_exc()
            raise ConnectionRefusedError("Failed to register function with Membrane") from cre
        except Exception as e:
            traceback.print_exc()
            raise Exception("An unexpected error occurred.") from e
        finally:
            # The channel must be closed to complete the gRPC connection
            request_channel.close()
            channel.close()


# Convenience functions to create function servers


def http(*handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Create a new Function Server and Register one or more HTTP Trigger Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    return FunctionServer(opts=[]).http(*handlers)


def event(*handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Create a new Function Server and Register one or more Event Trigger Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    return FunctionServer(opts=[]).event(*handlers)


def bucket_notification(*handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Create a new Function Server and Register one or more Bucket Notification Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    return FunctionServer(opts=[]).bucket_notification(*handlers)


def start(*handlers: Union[Middleware, List[Middleware]]):
    """Create a new Function Server and start it using the provided trigger handlers."""
    if len(handlers) < 1:
        raise Exception("At least one handler must be provided.")
    return FunctionServer(opts=[]).start(*handlers)

Functions

def bucket_notification(*handlers: Union[Middleware, List[Middleware]]) ‑> FunctionServer

Create a new Function Server and Register one or more Bucket Notification Handlers or Middleware.

When multiple handlers are provided, they will be called in order.

Expand source code
def bucket_notification(*handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Create a new Function Server and Register one or more Bucket Notification Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    return FunctionServer(opts=[]).bucket_notification(*handlers)
def compose_middleware(*middlewares: Union[Middleware, List[Middleware]]) ‑> Callable[[~C, Callable], Coroutine[Any, Any, ~C]]

Compose multiple middleware functions into a single middleware function.

The resulting middleware will effectively be a chain of the provided middleware, where each calls the next in the chain when they're successful.

Expand source code
def compose_middleware(*middlewares: Union[Middleware, List[Middleware]]) -> Middleware:
    """
    Compose multiple middleware functions into a single middleware function.

    The resulting middleware will effectively be a chain of the provided middleware,
    where each calls the next in the chain when they're successful.
    """
    middlewares = list(middlewares)
    if len(middlewares) == 1 and not isinstance(middlewares[0], list):
        return middlewares[0]

    middlewares = [compose_middleware(m) if isinstance(m, list) else m for m in middlewares]

    async def handler(ctx, next_middleware=lambda ctx: ctx):
        def reduce_chain(acc_next, cur):
            async def chained_middleware(mw_ctx):
                # Count the positional arguments to determine if the function is a handler or middleware.
                all_args = cur.__code__.co_argcount
                kwargs = len(cur.__defaults__) if cur.__defaults__ is not None else 0
                pos_args = all_args - kwargs
                if pos_args == 2:
                    # Call the middleware with next and return the result
                    return (await cur(mw_ctx, acc_next)) if asyncio.iscoroutinefunction(cur) else cur(mw_ctx, acc_next)
                else:
                    # Call the handler with ctx only, then call the remainder of the middleware chain
                    result = (await cur(mw_ctx)) if asyncio.iscoroutinefunction(cur) else cur(mw_ctx)
                    return (await acc_next(result)) if asyncio.iscoroutinefunction(acc_next) else acc_next(result)

            return chained_middleware

        middleware_chain = functools.reduce(reduce_chain, reversed(middlewares + [next_middleware]))
        return await middleware_chain(ctx)

    return handler
def event(*handlers: Union[Middleware, List[Middleware]]) ‑> FunctionServer

Create a new Function Server and Register one or more Event Trigger Handlers or Middleware.

When multiple handlers are provided, they will be called in order.

Expand source code
def event(*handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Create a new Function Server and Register one or more Event Trigger Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    return FunctionServer(opts=[]).event(*handlers)
def http(*handlers: Union[Middleware, List[Middleware]]) ‑> FunctionServer

Create a new Function Server and Register one or more HTTP Trigger Handlers or Middleware.

When multiple handlers are provided, they will be called in order.

Expand source code
def http(*handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Create a new Function Server and Register one or more HTTP Trigger Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    return FunctionServer(opts=[]).http(*handlers)
def start(*handlers: Union[Middleware, List[Middleware]])

Create a new Function Server and start it using the provided trigger handlers.

Expand source code
def start(*handlers: Union[Middleware, List[Middleware]]):
    """Create a new Function Server and start it using the provided trigger handlers."""
    if len(handlers) < 1:
        raise Exception("At least one handler must be provided.")
    return FunctionServer(opts=[]).start(*handlers)

Classes

class ApiWorkerOptions (api: str, route: str, methods: List[Union[str, HttpMethod]], opts: MethodOptions)

Options for API workers.

Construct a new options object.

Expand source code
class ApiWorkerOptions:
    """Options for API workers."""

    def __init__(self, api: str, route: str, methods: List[Union[str, HttpMethod]], opts: MethodOptions):
        """Construct a new options object."""
        self.api = api
        self.route = route
        self.methods = [str(method) for method in methods]
        self.opts = opts
class BucketNotificationContext (request: BucketNotificationRequest, response: BucketNotificationResponse = None)

Represents the full request/response context for a bucket notification trigger.

Construct a new BucketNotificationContext.

Expand source code
class BucketNotificationContext(TriggerContext):
    """Represents the full request/response context for a bucket notification trigger."""

    def __init__(self, request: BucketNotificationRequest, response: BucketNotificationResponse = None):
        """Construct a new BucketNotificationContext."""
        super().__init__()
        self.req = request
        self.res = response if response else BucketNotificationResponse()

    def bucket_notification(self) -> BucketNotificationContext:
        """Return this BucketNotificationContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(trigger_request: TriggerRequest) -> BucketNotificationContext:
        """Construct a new BucketNotificationContext from a Bucket Notification trigger from the Nitric Membrane."""
        return BucketNotificationContext(
            request=BucketNotificationRequest(
                data=trigger_request.data,
                key=trigger_request.notification.bucket.key,
                notification_type=trigger_request.notification.bucket.type,
                trace_context=trigger_request.trace_context.values,
            )
        )

Ancestors

Static methods

def from_grpc_trigger_request(trigger_request: TriggerRequest) ‑> BucketNotificationContext

Construct a new BucketNotificationContext from a Bucket Notification trigger from the Nitric Membrane.

Expand source code
@staticmethod
def from_grpc_trigger_request(trigger_request: TriggerRequest) -> BucketNotificationContext:
    """Construct a new BucketNotificationContext from a Bucket Notification trigger from the Nitric Membrane."""
    return BucketNotificationContext(
        request=BucketNotificationRequest(
            data=trigger_request.data,
            key=trigger_request.notification.bucket.key,
            notification_type=trigger_request.notification.bucket.type,
            trace_context=trigger_request.trace_context.values,
        )
    )

Methods

def bucket_notification(self) ‑> BucketNotificationContext

Return this BucketNotificationContext, used when determining the context type of a trigger.

Expand source code
def bucket_notification(self) -> BucketNotificationContext:
    """Return this BucketNotificationContext, used when determining the context type of a trigger."""
    return self

Inherited members

class BucketNotificationRequest (data: bytes, key: str, notification_type: BucketNotificationType, trace_context: Dict[str, str])

Represents a translated Event, from a Subscribed Topic, forwarded from the Nitric Membrane.

Construct a new EventRequest.

Expand source code
class BucketNotificationRequest(Request):
    """Represents a translated Event, from a Subscribed Topic, forwarded from the Nitric Membrane."""

    def __init__(self, data: bytes, key: str, notification_type: BucketNotificationType, trace_context: Dict[str, str]):
        """Construct a new EventRequest."""
        super().__init__(data, trace_context)

        self.key = key
        self.notification_type = notification_type

Ancestors

Subclasses

class BucketNotificationResponse (success: bool = True)

Represents the response to a trigger from a Bucket

Construct a new BucketNotificationResponse.

Expand source code
class BucketNotificationResponse(Response):
    """Represents the response to a trigger from a Bucket"""

    def __init__(self, success: bool = True):
        """Construct a new BucketNotificationResponse."""
        self.success = success

Ancestors

class BucketNotificationWorkerOptions (bucket_name: str, notification_type: str, notification_prefix_filter: str)

Options for bucket notification workers.

Construct a new options object.

Expand source code
class BucketNotificationWorkerOptions:
    """Options for bucket notification workers."""

    def __init__(self, bucket_name: str, notification_type: str, notification_prefix_filter: str):
        """Construct a new options object."""
        self.bucket_name = bucket_name
        self.notification_type = BucketNotificationWorkerOptions._to_grpc_event_type(notification_type.lower())
        self.notification_prefix_filter = notification_prefix_filter

    @staticmethod
    def _to_grpc_event_type(event_type: str) -> BucketNotificationType:
        if event_type == "write":
            return BucketNotificationType.Created
        elif event_type == "delete":
            return BucketNotificationType.Deleted
        else:
            raise ValueError(f"Event type {event_type} is unsupported")

Subclasses

class EventContext (request: EventRequest, response: EventResponse = None)

Represents the full request/response context for an Event based trigger.

Construct a new EventContext.

Expand source code
class EventContext(TriggerContext):
    """Represents the full request/response context for an Event based trigger."""

    def __init__(self, request: EventRequest, response: EventResponse = None):
        """Construct a new EventContext."""
        super().__init__()
        self.req = request
        self.res = response if response else EventResponse()

    def event(self) -> EventContext:
        """Return this EventContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(trigger_request: TriggerRequest):
        """Construct a new EventContext from an Event trigger from the Nitric Membrane."""
        return EventContext(
            request=EventRequest(
                data=trigger_request.data,
                topic=trigger_request.topic.topic,
                trace_context=trigger_request.trace_context.values,
            )
        )

Ancestors

Static methods

def from_grpc_trigger_request(trigger_request: TriggerRequest)

Construct a new EventContext from an Event trigger from the Nitric Membrane.

Expand source code
@staticmethod
def from_grpc_trigger_request(trigger_request: TriggerRequest):
    """Construct a new EventContext from an Event trigger from the Nitric Membrane."""
    return EventContext(
        request=EventRequest(
            data=trigger_request.data,
            topic=trigger_request.topic.topic,
            trace_context=trigger_request.trace_context.values,
        )
    )

Methods

def event(self) ‑> EventContext

Return this EventContext, used when determining the context type of a trigger.

Expand source code
def event(self) -> EventContext:
    """Return this EventContext, used when determining the context type of a trigger."""
    return self

Inherited members

class EventRequest (data: bytes, topic: str, trace_context: Dict[str, str])

Represents a translated Event, from a Subscribed Topic, forwarded from the Nitric Membrane.

Construct a new EventRequest.

Expand source code
class EventRequest(Request):
    """Represents a translated Event, from a Subscribed Topic, forwarded from the Nitric Membrane."""

    def __init__(self, data: bytes, topic: str, trace_context: Dict[str, str]):
        """Construct a new EventRequest."""
        super().__init__(data, trace_context)
        self.topic = topic

    @property
    def payload(self) -> bytes:
        """Return the payload of this request as text."""
        return json.loads(self.data.decode("utf-8"))

Ancestors

Instance variables

var payload : bytes

Return the payload of this request as text.

Expand source code
@property
def payload(self) -> bytes:
    """Return the payload of this request as text."""
    return json.loads(self.data.decode("utf-8"))
class EventResponse (success: bool = True)

Represents the response to a trigger from an Event as a result of a Topic subscription.

Construct a new EventResponse.

Expand source code
class EventResponse(Response):
    """Represents the response to a trigger from an Event as a result of a Topic subscription."""

    def __init__(self, success: bool = True):
        """Construct a new EventResponse."""
        self.success = success

Ancestors

class FaasWorkerOptions

Empty worker options for generic function handlers.

Expand source code
class FaasWorkerOptions:
    """Empty worker options for generic function handlers."""

    pass
class FileNotificationContext (request: FileNotificationRequest, response: BucketNotificationResponse = None)

Represents the full request/response context for an Http based trigger.

Construct a new FileNotificationContext.

Expand source code
class FileNotificationContext(TriggerContext):
    """Represents the full request/response context for an Http based trigger."""

    def __init__(self, request: FileNotificationRequest, response: BucketNotificationResponse = None):
        """Construct a new FileNotificationContext."""
        super().__init__()
        self.req = request
        self.res = response if response else BucketNotificationResponse()

    def bucket_notification(self) -> FileNotificationContext:
        """Return this FileNotificationContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(
        trigger_request: TriggerRequest, options: FileNotificationWorkerOptions
    ) -> FileNotificationContext:
        """Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane."""
        return FileNotificationContext(
            request=FileNotificationRequest(
                data=trigger_request.data,
                key=trigger_request.notification.bucket.key,
                bucket_ref=options.bucket_ref,
                notification_type=trigger_request.notification.bucket.type,
                trace_context=trigger_request.trace_context.values,
            )
        )

Ancestors

Static methods

def from_grpc_trigger_request(trigger_request: TriggerRequest, options: FileNotificationWorkerOptions) ‑> FileNotificationContext

Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane.

Expand source code
@staticmethod
def from_grpc_trigger_request(
    trigger_request: TriggerRequest, options: FileNotificationWorkerOptions
) -> FileNotificationContext:
    """Construct a new FileNotificationTrigger from a Bucket Notification trigger from the Nitric Membrane."""
    return FileNotificationContext(
        request=FileNotificationRequest(
            data=trigger_request.data,
            key=trigger_request.notification.bucket.key,
            bucket_ref=options.bucket_ref,
            notification_type=trigger_request.notification.bucket.type,
            trace_context=trigger_request.trace_context.values,
        )
    )

Methods

def bucket_notification(self) ‑> FileNotificationContext

Return this FileNotificationContext, used when determining the context type of a trigger.

Expand source code
def bucket_notification(self) -> FileNotificationContext:
    """Return this FileNotificationContext, used when determining the context type of a trigger."""
    return self

Inherited members

class FileNotificationRequest (data: bytes, bucket_ref: Any, key: str, notification_type: BucketNotificationType, trace_context: Dict[str, str])

Represents a translated Event, from a Subscribed Topic, forwarded from the Nitric Membrane.

Construct a new EventRequest.

Expand source code
class FileNotificationRequest(BucketNotificationRequest):
    def __init__(
        self,
        data: bytes,
        bucket_ref: Any,  # can't import BucketRef due to circular dependency problems
        key: str,
        notification_type: BucketNotificationType,
        trace_context: Dict[str, str],
    ):
        super().__init__(data=data, key=key, notification_type=notification_type, trace_context=trace_context)
        self.file = bucket_ref.file(key)

Ancestors

class FileNotificationWorkerOptions (bucket, notification_type: str, notification_prefix_filter: str)

Options for bucket notification workers with file references.

Construct a new options object.

Expand source code
class FileNotificationWorkerOptions(BucketNotificationWorkerOptions):
    """Options for bucket notification workers with file references."""

    def __init__(self, bucket, notification_type: str, notification_prefix_filter: str):
        super().__init__(bucket.name, notification_type, notification_prefix_filter)

        self.bucket_ref = bucket

Ancestors

class Frequency (value, names=None, *, module=None, qualname=None, type=None, start=1)

Valid schedule frequencies.

Expand source code
class Frequency(Enum):
    """Valid schedule frequencies."""

    seconds = "seconds"
    minutes = "minutes"
    hours = "hours"
    days = "days"

    @staticmethod
    def from_str(value: str) -> Frequency:
        """Convert a string frequency value to a Frequency."""
        try:
            return Frequency[value.strip().lower()]
        except Exception:
            raise ValueError(f"{value} is not valid frequency")

    @staticmethod
    def as_str_list() -> List[str]:
        """Return all frequency values as a list of strings."""
        return [str(frequency.value) for frequency in Frequency]

Ancestors

  • enum.Enum

Class variables

var days
var hours
var minutes
var seconds

Static methods

def as_str_list() ‑> List[str]

Return all frequency values as a list of strings.

Expand source code
@staticmethod
def as_str_list() -> List[str]:
    """Return all frequency values as a list of strings."""
    return [str(frequency.value) for frequency in Frequency]
def from_str(value: str) ‑> Frequency

Convert a string frequency value to a Frequency.

Expand source code
@staticmethod
def from_str(value: str) -> Frequency:
    """Convert a string frequency value to a Frequency."""
    try:
        return Frequency[value.strip().lower()]
    except Exception:
        raise ValueError(f"{value} is not valid frequency")
class FunctionServer (opts: FaasClientOptions)

A Function as a Service server, which acts as a faas handler for the Nitric Membrane.

Construct a new function server.

Expand source code
class FunctionServer:
    """A Function as a Service server, which acts as a faas handler for the Nitric Membrane."""

    def __init__(self, opts: FaasClientOptions):
        """Construct a new function server."""
        self.__http_handler = None
        self.__event_handler = None
        self.__bucket_notification_handler = None
        self._any_handler = None
        self._opts = opts

    def http(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
        """
        Register one or more HTTP Trigger Handlers or Middleware.

        When multiple handlers are provided, they will be called in order.
        """
        self.__http_handler = compose_middleware(*handlers)
        return self

    def event(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
        """
        Register one or more Event Trigger Handlers or Middleware.

        When multiple handlers are provided, they will be called in order.
        """
        self.__event_handler = compose_middleware(*handlers)
        return self

    def bucket_notification(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
        """
        Register one or more Bucket Notification Trigger Handlers or Middleware.

        When multiple handlers are provided, they will be called in order.
        """
        self.__bucket_notification_handler = compose_middleware(*handlers)
        return self

    async def start(self, *handlers: Union[Middleware, List[Middleware]]):
        """Start the function server using the provided trigger handlers."""
        self._any_handler = compose_middleware(*handlers) if len(handlers) > 0 else None
        if (
            not self._any_handler
            and not self._http_handler
            and not self._event_handler
            and not self.__bucket_notification_handler
        ):
            raise Exception("At least one handler function must be provided.")

        await self._run()

    @property
    def _http_handler(self):
        return self.__http_handler if self.__http_handler else self._any_handler

    @property
    def _event_handler(self):
        return self.__event_handler if self.__event_handler else self._any_handler

    @property
    def _bucket_notification_handler(self):
        return self.__bucket_notification_handler if self.__bucket_notification_handler else self._any_handler

    async def _run(self):
        """Register a new FaaS worker with the Membrane, using the provided function as the handler."""
        channel = new_default_channel()
        client = FaasServiceStub(channel)
        request_channel = AsyncChannel(close=True)
        # We can start be sending all the requests we already have
        try:
            init_request = InitRequest()
            # Construct init request based on API worker options
            if isinstance(self._opts, ApiWorkerOptions):
                init_request = InitRequest(
                    api=ApiWorker(api=self._opts.api, path=self._opts.route, methods=self._opts.methods)
                )
            elif isinstance(self._opts, RateWorkerOptions):
                # TODO: Populate rate
                init_request = InitRequest(
                    schedule=ScheduleWorker(
                        key=self._opts.description, rate=ScheduleRate(rate=f"{self._opts.rate} {self._opts.frequency}")
                    )
                )
            elif isinstance(self._opts, SubscriptionWorkerOptions):
                init_request = InitRequest(subscription=SubscriptionWorker(topic=self._opts.topic))
            elif isinstance(self._opts, BucketNotificationWorkerOptions) or isinstance(
                self._opts, FileNotificationWorkerOptions
            ):
                config = BucketNotificationConfig(
                    notification_type=self._opts.notification_type,
                    notification_prefix_filter=self._opts.notification_prefix_filter,
                )
                init_request = InitRequest(
                    bucket_notification=BucketNotificationWorker(bucket=self._opts.bucket_name, config=config)
                )

            # let the membrane server know we're ready to start
            await request_channel.send(ClientMessage(init_request=init_request))
            async for srv_msg in client.trigger_stream(request_channel):
                # The response iterator will remain active until the connection is closed
                msg_type, val = betterproto.which_one_of(srv_msg, "content")

                if msg_type == "init_response":
                    print("function connected to Membrane")
                    # We don't need to reply
                    # proceed to the next available message
                    continue
                if msg_type == "trigger_request":
                    ctx = _ctx_from_grpc_trigger_request(srv_msg.trigger_request, self._opts)

                    try:
                        if len(ctx.req.trace_context) > 0:
                            context.attach(PROPAGATOR().extract(ctx.req.trace_context))

                        if ctx.http():
                            func = self._http_handler
                        elif ctx.event():
                            func = self._event_handler
                        elif ctx.bucket_notification():
                            func = self._bucket_notification_handler
                        else:
                            func = self._any_handler

                        response_ctx = (await func(ctx)) if asyncio.iscoroutinefunction(func) else func(ctx)

                        if response_ctx is None:
                            response_ctx = ctx

                        # Send function response back to server
                        await request_channel.send(
                            ClientMessage(
                                id=srv_msg.id,
                                trigger_response=_grpc_response_from_ctx(response_ctx),
                            )
                        )
                    except Exception:
                        # Any unhandled exceptions in the above code will end the loop
                        # and stop processing future triggers, we catch them here as a last resort.
                        print("An unexpected error occurred processing trigger or response")
                        traceback.print_exc()
                        response = _create_internal_error_response(srv_msg.trigger_request)
                        await request_channel.send(ClientMessage(id=srv_msg.id, trigger_response=response))
                else:
                    print(f"unhandled message type {msg_type}, skipping")
                    continue
                if request_channel.done():
                    break
        except grpclib.exceptions.StreamTerminatedError:
            print("stream from Membrane closed, closing client stream")
        except asyncio.CancelledError:
            # Membrane has closed stream after init
            print("stream from Membrane closed, closing client stream")
        except ConnectionRefusedError as cre:
            traceback.print_exc()
            raise ConnectionRefusedError("Failed to register function with Membrane") from cre
        except Exception as e:
            traceback.print_exc()
            raise Exception("An unexpected error occurred.") from e
        finally:
            # The channel must be closed to complete the gRPC connection
            request_channel.close()
            channel.close()

Methods

def bucket_notification(self, *handlers: Union[Middleware, List[Middleware]]) ‑> FunctionServer

Register one or more Bucket Notification Trigger Handlers or Middleware.

When multiple handlers are provided, they will be called in order.

Expand source code
def bucket_notification(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Register one or more Bucket Notification Trigger Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    self.__bucket_notification_handler = compose_middleware(*handlers)
    return self
def event(self, *handlers: Union[Middleware, List[Middleware]]) ‑> FunctionServer

Register one or more Event Trigger Handlers or Middleware.

When multiple handlers are provided, they will be called in order.

Expand source code
def event(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Register one or more Event Trigger Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    self.__event_handler = compose_middleware(*handlers)
    return self
def http(self, *handlers: Union[Middleware, List[Middleware]]) ‑> FunctionServer

Register one or more HTTP Trigger Handlers or Middleware.

When multiple handlers are provided, they will be called in order.

Expand source code
def http(self, *handlers: Union[Middleware, List[Middleware]]) -> FunctionServer:
    """
    Register one or more HTTP Trigger Handlers or Middleware.

    When multiple handlers are provided, they will be called in order.
    """
    self.__http_handler = compose_middleware(*handlers)
    return self
async def start(self, *handlers: Union[Middleware, List[Middleware]])

Start the function server using the provided trigger handlers.

Expand source code
async def start(self, *handlers: Union[Middleware, List[Middleware]]):
    """Start the function server using the provided trigger handlers."""
    self._any_handler = compose_middleware(*handlers) if len(handlers) > 0 else None
    if (
        not self._any_handler
        and not self._http_handler
        and not self._event_handler
        and not self.__bucket_notification_handler
    ):
        raise Exception("At least one handler function must be provided.")

    await self._run()
class HttpContext (request: HttpRequest, response: HttpResponse = None)

Represents the full request/response context for an Http based trigger.

Construct a new HttpContext.

Expand source code
class HttpContext(TriggerContext):
    """Represents the full request/response context for an Http based trigger."""

    def __init__(self, request: HttpRequest, response: HttpResponse = None):
        """Construct a new HttpContext."""
        super().__init__()
        self.req = request
        self.res = response if response else HttpResponse()

    def http(self) -> HttpContext:
        """Return this HttpContext, used when determining the context type of a trigger."""
        return self

    @staticmethod
    def from_grpc_trigger_request(trigger_request: TriggerRequest) -> HttpContext:
        """Construct a new HttpContext from an Http trigger from the Nitric Membrane."""
        if len(trigger_request.http.headers.keys()) > 0:
            headers = {k: v.value for (k, v) in trigger_request.http.headers.items()}
        else:
            headers = trigger_request.http.headers_old

        if len(trigger_request.http.query_params.keys()) > 0:
            query = {k: v.value for (k, v) in trigger_request.http.query_params.items()}
        else:
            query = trigger_request.http.query_params_old

        return HttpContext(
            request=HttpRequest(
                data=trigger_request.data,
                method=trigger_request.http.method,
                query=query,
                path=trigger_request.http.path,
                params={k: v for (k, v) in trigger_request.http.path_params.items()},
                headers=headers,
                trace_context=trigger_request.trace_context.values,
            )
        )

Ancestors

Static methods

def from_grpc_trigger_request(trigger_request: TriggerRequest) ‑> HttpContext

Construct a new HttpContext from an Http trigger from the Nitric Membrane.

Expand source code
@staticmethod
def from_grpc_trigger_request(trigger_request: TriggerRequest) -> HttpContext:
    """Construct a new HttpContext from an Http trigger from the Nitric Membrane."""
    if len(trigger_request.http.headers.keys()) > 0:
        headers = {k: v.value for (k, v) in trigger_request.http.headers.items()}
    else:
        headers = trigger_request.http.headers_old

    if len(trigger_request.http.query_params.keys()) > 0:
        query = {k: v.value for (k, v) in trigger_request.http.query_params.items()}
    else:
        query = trigger_request.http.query_params_old

    return HttpContext(
        request=HttpRequest(
            data=trigger_request.data,
            method=trigger_request.http.method,
            query=query,
            path=trigger_request.http.path,
            params={k: v for (k, v) in trigger_request.http.path_params.items()},
            headers=headers,
            trace_context=trigger_request.trace_context.values,
        )
    )

Methods

def http(self) ‑> HttpContext

Return this HttpContext, used when determining the context type of a trigger.

Expand source code
def http(self) -> HttpContext:
    """Return this HttpContext, used when determining the context type of a trigger."""
    return self

Inherited members

class HttpMethod (value, names=None, *, module=None, qualname=None, type=None, start=1)

Valid query expression operators.

Expand source code
class HttpMethod(Enum):
    """Valid query expression operators."""

    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    DELETE = "DELETE"
    PATCH = "PATCH"
    OPTIONS = "OPTIONS"

    def __str__(self):
        return str(self.value)

Ancestors

  • enum.Enum

Class variables

var DELETE
var GET
var OPTIONS
var PATCH
var POST
var PUT
class HttpRequest (data: bytes, method: str, path: str, params: Dict[str, str], query: Record, headers: Record, trace_context: Dict[str, str])

Represents a translated Http Request forwarded from the Nitric Membrane.

Construct a new HttpRequest.

Expand source code
class HttpRequest(Request):
    """Represents a translated Http Request forwarded from the Nitric Membrane."""

    def __init__(
        self,
        data: bytes,
        method: str,
        path: str,
        params: Dict[str, str],
        query: Record,
        headers: Record,
        trace_context: Dict[str, str],
    ):
        """Construct a new HttpRequest."""
        super().__init__(data, trace_context)
        self.method = method
        self.path = path
        self.params = params
        self.query = query
        self.headers = headers

    @property
    def json(self) -> Optional[Any]:
        """Get the body of the request as JSON, returns None if request body is not JSON."""
        try:
            return json.loads(self.body)
        except json.JSONDecodeError:
            return None
        except TypeError:
            return None

    @property
    def body(self):
        """Get the body of the request as text."""
        return self.data.decode("utf-8")

Ancestors

Instance variables

var body

Get the body of the request as text.

Expand source code
@property
def body(self):
    """Get the body of the request as text."""
    return self.data.decode("utf-8")
var json : Optional[Any]

Get the body of the request as JSON, returns None if request body is not JSON.

Expand source code
@property
def json(self) -> Optional[Any]:
    """Get the body of the request as JSON, returns None if request body is not JSON."""
    try:
        return json.loads(self.body)
    except json.JSONDecodeError:
        return None
    except TypeError:
        return None
class HttpResponse (status: int = 200, headers: Record = None, body: bytes = None)

Represents an HTTP Response to be generated by the Nitric Membrane in response to an HTTP Request Trigger.

Construct a new HttpResponse.

Expand source code
class HttpResponse(Response):
    """Represents an HTTP Response to be generated by the Nitric Membrane in response to an HTTP Request Trigger."""

    def __init__(self, status: int = 200, headers: Record = None, body: bytes = None):
        """Construct a new HttpResponse."""
        self.status = status
        self.headers = headers if headers else {}
        self._body = body if body else bytes()

    @property
    def body(self):
        """Return the HTTP response body."""
        return self._body

    @body.setter
    def body(self, value: Union[str, bytes, Any]):
        if isinstance(value, str):
            self._body = value.encode("utf-8")
        elif isinstance(value, bytes):
            self._body = value
        else:
            self._body = json.dumps(value).encode("utf-8")
            self.headers["Content-Type"] = ["application/json"]

Ancestors

Instance variables

var body

Return the HTTP response body.

Expand source code
@property
def body(self):
    """Return the HTTP response body."""
    return self._body
class MethodOptions (security: dict[str, List[str]] = None)

Represents options when defining a method handler.

Construct a new HTTP method options object.

Expand source code
class MethodOptions:
    """Represents options when defining a method handler."""

    security: dict[str, List[str]]

    def __init__(self, security: dict[str, List[str]] = None):
        """Construct a new HTTP method options object."""
        self.security = security

Class variables

var security : dict[str, typing.List[str]]
class RateWorkerOptions (description: str, rate: int, frequency: Frequency)

Options for rate workers.

Construct a new options object.

Expand source code
class RateWorkerOptions:
    """Options for rate workers."""

    description: str
    rate: int
    frequency: Frequency

    def __init__(self, description: str, rate: int, frequency: Frequency):
        """Construct a new options object."""
        self.description = description
        self.rate = rate
        self.frequency = frequency

Class variables

var description : str
var frequencyFrequency
var rate : int
class Request (data: bytes, trace_context: Dict[str, str])

Represents an abstract trigger request.

Construct a new Request.

Expand source code
class Request(ABC):
    """Represents an abstract trigger request."""

    def __init__(self, data: bytes, trace_context: Dict[str, str]):
        """Construct a new Request."""
        self.data = data
        self.trace_context = trace_context

Ancestors

  • abc.ABC

Subclasses

class Response

Represents an abstract trigger response.

Expand source code
class Response(ABC):
    """Represents an abstract trigger response."""

    pass

Ancestors

  • abc.ABC

Subclasses

class SubscriptionWorkerOptions (topic: str)

Options for subscription workers.

Construct a new options object.

Expand source code
class SubscriptionWorkerOptions:
    """Options for subscription workers."""

    def __init__(self, topic: str):
        """Construct a new options object."""
        self.topic = topic
class TriggerContext

Represents an abstract request/response context for any trigger.

Expand source code
class TriggerContext(ABC):
    """Represents an abstract request/response context for any trigger."""

    def http(self) -> Union[HttpContext, None]:
        """Return this context as an HttpContext if it is one, otherwise returns None."""
        return None

    def event(self) -> Union[EventContext, None]:
        """Return this context as an EventContext if it is one, otherwise returns None."""
        return None

    def bucket_notification(self) -> Union[BucketNotificationContext, None]:
        """Return this context as a BucketNotificationContext if it is one, otherwise returns None."""
        return None

Ancestors

  • abc.ABC

Subclasses

Methods

def bucket_notification(self) ‑> Optional[BucketNotificationContext]

Return this context as a BucketNotificationContext if it is one, otherwise returns None.

Expand source code
def bucket_notification(self) -> Union[BucketNotificationContext, None]:
    """Return this context as a BucketNotificationContext if it is one, otherwise returns None."""
    return None
def event(self) ‑> Optional[EventContext]

Return this context as an EventContext if it is one, otherwise returns None.

Expand source code
def event(self) -> Union[EventContext, None]:
    """Return this context as an EventContext if it is one, otherwise returns None."""
    return None
def http(self) ‑> Optional[HttpContext]

Return this context as an HttpContext if it is one, otherwise returns None.

Expand source code
def http(self) -> Union[HttpContext, None]:
    """Return this context as an HttpContext if it is one, otherwise returns None."""
    return None