Module nitric.api.storage
Expand source code
#
# Copyright (c) 2021 Nitric Technologies Pty Ltd.
#
# This file is part of Nitric Python 3 SDK.
# See https://github.com/nitrictech/python-sdk for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dataclasses import dataclass
from typing import Union
from grpclib import GRPCError
from nitric.exception import exception_from_grpc_error, InvalidArgumentException
from nitric.application import Nitric
from nitric.faas import FunctionServer, FileNotificationWorkerOptions, FileNotificationMiddleware
from nitric.utils import new_default_channel
from nitric.proto.nitric.storage.v1 import (
StorageServiceStub,
StoragePreSignUrlRequestOperation,
StorageWriteRequest,
StorageReadRequest,
StorageDeleteRequest,
StoragePreSignUrlRequest,
StorageListFilesRequest,
)
from enum import Enum
from warnings import warn
class Storage(object):
"""
Nitric generic blob storage client.
This client insulates application code from stack specific blob store operations or SDKs.
"""
def __init__(self):
"""Construct a Nitric Storage Client."""
self._channel = new_default_channel()
self._storage_stub = StorageServiceStub(channel=self._channel)
def __del__(self):
# close the channel when this client is destroyed
if self._channel is not None:
self._channel.close()
def bucket(self, name: str):
"""Return a reference to a bucket from the connected storage service."""
return BucketRef(_storage=self, name=name, _server=None)
@dataclass(order=True)
class BucketRef(object):
"""A reference to a bucket in a storage service, used to the perform operations on that bucket."""
_storage: Storage
name: str
_server: Union[FunctionServer, None]
def file(self, key: str):
"""Return a reference to a file in this bucket."""
return File(_storage=self._storage, _bucket=self.name, key=key)
async def files(self):
"""Return a list of files in this bucket."""
resp = await self._storage._storage_stub.list_files(
storage_list_files_request=StorageListFilesRequest(bucket_name=self.name)
)
return [self.file(f.key) for f in resp.files]
def on(self, notification_type: str, notification_prefix_filter: str):
"""Create and return a bucket notification decorator for this bucket."""
def decorator(func: FileNotificationMiddleware):
self._server = FunctionServer(
FileNotificationWorkerOptions(
bucket=self,
notification_type=notification_type,
notification_prefix_filter=notification_prefix_filter,
)
)
self._server.bucket_notification(func)
Nitric._register_worker(self._server)
return decorator
class FileMode(Enum):
"""Definition of available operation modes for file signed URLs."""
READ = 0
WRITE = 1
def to_request_operation(self) -> StoragePreSignUrlRequestOperation:
"""Convert FileMode to a StoragePreSignUrlRequestOperation."""
if self == FileMode.READ:
return StoragePreSignUrlRequestOperation.READ
elif self == FileMode.WRITE:
return StoragePreSignUrlRequestOperation.WRITE
else:
raise InvalidArgumentException("Invalid FileMode")
@dataclass(frozen=True, order=True)
class File(object):
"""A reference to a file in a bucket, used to perform operations on that file."""
_storage: Storage
_bucket: str
key: str
async def write(self, body: bytes):
"""
Write the bytes as the content of this file.
Will create the file if it doesn't already exist.
"""
try:
await self._storage._storage_stub.write(
storage_write_request=StorageWriteRequest(bucket_name=self._bucket, key=self.key, body=body)
)
except GRPCError as grpc_err:
raise exception_from_grpc_error(grpc_err)
async def read(self) -> bytes:
"""Read this files contents from the bucket."""
try:
response = await self._storage._storage_stub.read(
storage_read_request=StorageReadRequest(bucket_name=self._bucket, key=self.key)
)
return response.body
except GRPCError as grpc_err:
raise exception_from_grpc_error(grpc_err)
async def delete(self):
"""Delete this file from the bucket."""
try:
await self._storage._storage_stub.delete(
storage_delete_request=StorageDeleteRequest(bucket_name=self._bucket, key=self.key)
)
except GRPCError as grpc_err:
raise exception_from_grpc_error(grpc_err)
async def upload_url(self, expiry: int = 600):
"""Get a temporary writable URL to this file."""
return await self.sign_url(mode=FileMode.WRITE, expiry=expiry)
async def download_url(self, expiry: int = 600):
"""Get a temporary readable URL to this file."""
return await self.sign_url(mode=FileMode.READ, expiry=expiry)
async def sign_url(self, mode: FileMode = FileMode.READ, expiry: int = 3600):
"""Generate a signed URL for reading or writing to a file."""
warn("File.sign_url() is deprecated, use upload_url() or download_url() instead", DeprecationWarning)
try:
response = await self._storage._storage_stub.pre_sign_url(
storage_pre_sign_url_request=StoragePreSignUrlRequest(
bucket_name=self._bucket, key=self.key, operation=mode.to_request_operation(), expiry=expiry
)
)
return response.url
except GRPCError as grpc_err:
raise exception_from_grpc_error(grpc_err)
Classes
class BucketRef (_storage: Storage, name: str, _server: Optional[FunctionServer])
-
A reference to a bucket in a storage service, used to the perform operations on that bucket.
Expand source code
@dataclass(order=True) class BucketRef(object): """A reference to a bucket in a storage service, used to the perform operations on that bucket.""" _storage: Storage name: str _server: Union[FunctionServer, None] def file(self, key: str): """Return a reference to a file in this bucket.""" return File(_storage=self._storage, _bucket=self.name, key=key) async def files(self): """Return a list of files in this bucket.""" resp = await self._storage._storage_stub.list_files( storage_list_files_request=StorageListFilesRequest(bucket_name=self.name) ) return [self.file(f.key) for f in resp.files] def on(self, notification_type: str, notification_prefix_filter: str): """Create and return a bucket notification decorator for this bucket.""" def decorator(func: FileNotificationMiddleware): self._server = FunctionServer( FileNotificationWorkerOptions( bucket=self, notification_type=notification_type, notification_prefix_filter=notification_prefix_filter, ) ) self._server.bucket_notification(func) Nitric._register_worker(self._server) return decorator
Class variables
var name : str
Methods
def file(self, key: str)
-
Return a reference to a file in this bucket.
Expand source code
def file(self, key: str): """Return a reference to a file in this bucket.""" return File(_storage=self._storage, _bucket=self.name, key=key)
async def files(self)
-
Return a list of files in this bucket.
Expand source code
async def files(self): """Return a list of files in this bucket.""" resp = await self._storage._storage_stub.list_files( storage_list_files_request=StorageListFilesRequest(bucket_name=self.name) ) return [self.file(f.key) for f in resp.files]
def on(self, notification_type: str, notification_prefix_filter: str)
-
Create and return a bucket notification decorator for this bucket.
Expand source code
def on(self, notification_type: str, notification_prefix_filter: str): """Create and return a bucket notification decorator for this bucket.""" def decorator(func: FileNotificationMiddleware): self._server = FunctionServer( FileNotificationWorkerOptions( bucket=self, notification_type=notification_type, notification_prefix_filter=notification_prefix_filter, ) ) self._server.bucket_notification(func) Nitric._register_worker(self._server) return decorator
class File (_storage: Storage, _bucket: str, key: str)
-
A reference to a file in a bucket, used to perform operations on that file.
Expand source code
@dataclass(frozen=True, order=True) class File(object): """A reference to a file in a bucket, used to perform operations on that file.""" _storage: Storage _bucket: str key: str async def write(self, body: bytes): """ Write the bytes as the content of this file. Will create the file if it doesn't already exist. """ try: await self._storage._storage_stub.write( storage_write_request=StorageWriteRequest(bucket_name=self._bucket, key=self.key, body=body) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) async def read(self) -> bytes: """Read this files contents from the bucket.""" try: response = await self._storage._storage_stub.read( storage_read_request=StorageReadRequest(bucket_name=self._bucket, key=self.key) ) return response.body except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) async def delete(self): """Delete this file from the bucket.""" try: await self._storage._storage_stub.delete( storage_delete_request=StorageDeleteRequest(bucket_name=self._bucket, key=self.key) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err) async def upload_url(self, expiry: int = 600): """Get a temporary writable URL to this file.""" return await self.sign_url(mode=FileMode.WRITE, expiry=expiry) async def download_url(self, expiry: int = 600): """Get a temporary readable URL to this file.""" return await self.sign_url(mode=FileMode.READ, expiry=expiry) async def sign_url(self, mode: FileMode = FileMode.READ, expiry: int = 3600): """Generate a signed URL for reading or writing to a file.""" warn("File.sign_url() is deprecated, use upload_url() or download_url() instead", DeprecationWarning) try: response = await self._storage._storage_stub.pre_sign_url( storage_pre_sign_url_request=StoragePreSignUrlRequest( bucket_name=self._bucket, key=self.key, operation=mode.to_request_operation(), expiry=expiry ) ) return response.url except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)
Class variables
var key : str
Methods
async def delete(self)
-
Delete this file from the bucket.
Expand source code
async def delete(self): """Delete this file from the bucket.""" try: await self._storage._storage_stub.delete( storage_delete_request=StorageDeleteRequest(bucket_name=self._bucket, key=self.key) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)
async def download_url(self, expiry: int = 600)
-
Get a temporary readable URL to this file.
Expand source code
async def download_url(self, expiry: int = 600): """Get a temporary readable URL to this file.""" return await self.sign_url(mode=FileMode.READ, expiry=expiry)
async def read(self) ‑> bytes
-
Read this files contents from the bucket.
Expand source code
async def read(self) -> bytes: """Read this files contents from the bucket.""" try: response = await self._storage._storage_stub.read( storage_read_request=StorageReadRequest(bucket_name=self._bucket, key=self.key) ) return response.body except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)
async def sign_url(self, mode: FileMode = FileMode.READ, expiry: int = 3600)
-
Generate a signed URL for reading or writing to a file.
Expand source code
async def sign_url(self, mode: FileMode = FileMode.READ, expiry: int = 3600): """Generate a signed URL for reading or writing to a file.""" warn("File.sign_url() is deprecated, use upload_url() or download_url() instead", DeprecationWarning) try: response = await self._storage._storage_stub.pre_sign_url( storage_pre_sign_url_request=StoragePreSignUrlRequest( bucket_name=self._bucket, key=self.key, operation=mode.to_request_operation(), expiry=expiry ) ) return response.url except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)
async def upload_url(self, expiry: int = 600)
-
Get a temporary writable URL to this file.
Expand source code
async def upload_url(self, expiry: int = 600): """Get a temporary writable URL to this file.""" return await self.sign_url(mode=FileMode.WRITE, expiry=expiry)
async def write(self, body: bytes)
-
Write the bytes as the content of this file.
Will create the file if it doesn't already exist.
Expand source code
async def write(self, body: bytes): """ Write the bytes as the content of this file. Will create the file if it doesn't already exist. """ try: await self._storage._storage_stub.write( storage_write_request=StorageWriteRequest(bucket_name=self._bucket, key=self.key, body=body) ) except GRPCError as grpc_err: raise exception_from_grpc_error(grpc_err)
class FileMode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Definition of available operation modes for file signed URLs.
Expand source code
class FileMode(Enum): """Definition of available operation modes for file signed URLs.""" READ = 0 WRITE = 1 def to_request_operation(self) -> StoragePreSignUrlRequestOperation: """Convert FileMode to a StoragePreSignUrlRequestOperation.""" if self == FileMode.READ: return StoragePreSignUrlRequestOperation.READ elif self == FileMode.WRITE: return StoragePreSignUrlRequestOperation.WRITE else: raise InvalidArgumentException("Invalid FileMode")
Ancestors
- enum.Enum
Class variables
var READ
var WRITE
Methods
def to_request_operation(self) ‑> StoragePreSignUrlRequestOperation
-
Convert FileMode to a StoragePreSignUrlRequestOperation.
Expand source code
def to_request_operation(self) -> StoragePreSignUrlRequestOperation: """Convert FileMode to a StoragePreSignUrlRequestOperation.""" if self == FileMode.READ: return StoragePreSignUrlRequestOperation.READ elif self == FileMode.WRITE: return StoragePreSignUrlRequestOperation.WRITE else: raise InvalidArgumentException("Invalid FileMode")
class Storage
-
Nitric generic blob storage client.
This client insulates application code from stack specific blob store operations or SDKs.
Construct a Nitric Storage Client.
Expand source code
class Storage(object): """ Nitric generic blob storage client. This client insulates application code from stack specific blob store operations or SDKs. """ def __init__(self): """Construct a Nitric Storage Client.""" self._channel = new_default_channel() self._storage_stub = StorageServiceStub(channel=self._channel) def __del__(self): # close the channel when this client is destroyed if self._channel is not None: self._channel.close() def bucket(self, name: str): """Return a reference to a bucket from the connected storage service.""" return BucketRef(_storage=self, name=name, _server=None)
Methods
def bucket(self, name: str)
-
Return a reference to a bucket from the connected storage service.
Expand source code
def bucket(self, name: str): """Return a reference to a bucket from the connected storage service.""" return BucketRef(_storage=self, name=name, _server=None)