Dissecting storage adapter
This guide walks you through implementation of filesystem adapter. We'll look at every part of it from the outer layer and going inwards, analyzing the meaning of the units and reasons they exist.
Register an adapter
Adapters must be registered to make them available via make_storage.
Define an
entry-point
with the name file_keeper_ext
in the distribution. This entry-point specifies
python module that contains implementation of the pluggy
hooks that extends
file-keeper.
[project.entry-points.file_keeper_ext]
file_keeper = "file_keeper.default"
file-keeper expects to find a function decorated with @file_keeper.hookimpl
and named register_adapters
inside the module. This function registers all
custom adapters of the module via call to register()
method of the
Registry with adapters. This call accepts the name of
the adapter as a first argument, and the class of the adapter as a second
argument.
@ext.hookimpl
def register_adapters(registry: Registry[type[Storage]]):
"""Built-in storage adapters."""
registry.register("file_keeper:fs", adapters.FsStorage)
Adapter names have no restrictions regarding length or allowed symbols, but
it's recommended to use <package>:<type>
structure, like in
file_keeper:fs
. If you provide multiple similar adapters, consider adding
third segment, i.e. file_keeper:fs:v1
, file_keeper:fs:v2
.
Create an adapter
The adapter itself is simple and usually contains just few lines of code.
It must extend Storage and, optionally, it can override SettingsFactory, UploaderFactory, ManagerFactory, and ReaderFactory.
class FsStorage(fk.Storage):
"""Filesystem storage adapter.
Stores files on the local filesystem. The `path` setting must be
configured to point to the base directory where files are stored.
Example configuration:
```py
import file_keeper as fk
settings = {
"type": "file_keeper:fs",
"path": "/path/to/storage",
"initialize": True,
"override_existing": False,
}
storage = fk.make_storage("fs", settings)
```
Note:
* The `path` must be an absolute path.
* The `path` directory must be writable by the application.
* The `location` used in file operations is relative to the `path`.
* The `location` is not sanitized and can lead outside the configured
`path`. Consider using combination of `storage.prepare_location` with
`settings.location_transformers` that sanitizes the path, like
`safe_relative_path`.
* If `initialize` is `True`, the storage will attempt to create the
directory if it does not exist.
* If `override_existing` is `False`, operations that would overwrite an
existing file will raise an `ExistingFileError`.
"""
settings: Settings
SettingsFactory = Settings
UploaderFactory = Uploader
ReaderFactory = Reader
ManagerFactory = Manager
Filesystem adapter overrides all these attributes because:
- it contains custom settings(
SettingsFactory
) - it defines how the file is uploaded (
UploaderFactory
) - it defines how the file is managed, i.e. removed, copied, analyzed (
ManagerFactory
) - it defines how file is read (
ReaderFactory
)
Additionally it specifies type of settings
attribute as settings: Settings
,
i.e. custom Settings
class that is defined in the same module. This is done
to simplify typing and does not affect the behavior of the adapter. Without
this line typechecker assumes that storage uses base
Settings and complains when custom options are
accessed.
Define storage settings
Create a dataclass Settings
to hold configuration options specific to your
storage. This class should inherit from Settings.
@dataclasses.dataclass()
class Settings(fk.Settings):
"""Settings for FS storage."""
Filesystem settings do not introduce new options, so there are no attributes here. But some other provider would do the following:
@dataclasses.dataclass()
class Settings(fk.Settings):
bucket: str = ""
username: str = ""
password: str = ""
params: dict[str, Any] = dataclasses.field(default_factory=dict)
These options must include default values due to dataclass restrictions, even if storage will not work with these defaults. E.g., empty password and username won't work usually, but you still have to specify them.
And now happens validation. It's provided by the __post_init__
method of
the dataclass.
def __post_init__(self, **kwargs: Any):
super().__post_init__(**kwargs)
if not os.path.exists(self.path):
if not self.initialize:
raise fk.exc.InvalidStorageConfigurationError(
self.name,
f"path `{self.path}` does not exist",
)
try:
os.makedirs(self.path)
except PermissionError as err:
raise fk.exc.InvalidStorageConfigurationError(
self.name,
f"path `{self.path}` is not writable",
) from err
Filesystem adapter relies on two generic options. First is initialize
. When
this flag is enabled, storage checks whether directory for files exists and, if
it's missing, tries to create this directory. If task fails,
InvalidStorageConfigurationError
is raised. That's how storage reacts on problems with configuration.
Second option is path
. path
, usually absolute, defines the location in
filesystem where files are stored. Other storages may treat it differently:
cloud storages use path
as a prefix of the file name, because cloud storages
do not support directory hierarchy; SQLAlchemy storage ignores path as it has
no meaning in DB context.
Apart from this, storages often initialize and store connections to external
services as storage attributes. For example, file_keeper:azure_blob
has
container_name
string options that holds the name of cloud container where
files are stored. Inside __post_init__
it connects to the container and
stores the container object itself as container
property, so that storage
doesn't need to constantly re-connect to the containers.
Create the uploader service
The next target is Uploader
. It's a service reesponsible for file creation
that must extend Uploader.
class Uploader(fk.Uploader):
"""Filesystem uploader."""
Any service consists of two parts - methods and capabilities. Methods describe the logic but are hidden from storage initially. I.e., if you only define methods and ignore capabilities, storage will pretend that service cannot perform the task.
@override
def upload(self, location: fk.types.Location, upload: fk.Upload, extras: dict[str, Any]) -> fk.FileData:
...
And capabilities actually tell storage about operations supported by storage. Because of this separation, you can pretend that storage cannot perform an operation, even when it's supported. In this way you can transform filesystem into a read-only storage without code changes and guarantee that files won't be accidentally removed from it.
capabilities: fk.Capability = fk.Capability.CREATE | fk.Capability.RESUMABLE
As you can see, FS storage supports CREATE and
RESUMABLE. Capabilities are implemented as
bit masks and can be combined using |
operator.
Let's look closer at the upload()
method.
It computes full path to the file location in the beginning. full_path()
is a
generic method of the storage that naively combine path
option of the storage
with location
of the file. Every storage has path
option and every storage
can use full_path()
to attach path
as a prefix to the location, if it
makes any sense.
dest = self.storage.full_path(location)
Now storage uses another generic option, override_existing
. If it's disabled
and given location already taken by another file, uploader raises
ExistingFileError. That's recommended
reaction in such situation and you'll notice that other storages also follow
this process.
if not self.storage.settings.override_existing and os.path.exists(dest):
raise fk.exc.ExistingFileError(self.storage, location)
Then storage ensures that all intermediate folders from the final file's
location are present. If you expect that files are loaded directly into path
,
it may seem redundant. But FS storage does not imply such restrictions and
there may be nested directories under the path
, so verifying that all folders
are created is a safest option.
os.makedirs(os.path.dirname(dest), exist_ok=True)
Then file is actually written to the FS. You can read file content using
upload.stream.read()
, but here we create
HashingReader using hashing_reader()
method of
the Upload. This object also has read()
method, but in
addition it computes the content hash of the file while it's consumed. As
result we have the hash in the end almost for free.
reader = upload.hashing_reader()
with open(dest, "wb") as fd:
for chunk in reader:
fd.write(chunk)
Other storages, like AWS S3 or Azure Blob Storage, do not need this step, because content hash is computed by cloud provided and returned with the metadata of the uploaded object. But if you don't have a cheap way to obtain the hash, using HashingReader is the recommended option.
In the end, upload()
method of the service builds
FileData with details of the uploaded file and returns
it to the caller.
return fk.FileData(
location,
os.path.getsize(dest),
upload.content_type,
reader.get_hash(),
)
Create the reader service
Reader
service is much simpler than Uploader
. It exposes
STREAM capability to notify the storage, that
files can be read from the storage. And it implements stream()
method, that
explains how exactly bytes of content are obtained.
class Reader(fk.Reader):
"""Filesystem reader."""
storage: FsStorage
capabilities: fk.Capability = fk.Capability.STREAM
@override
def stream(self, data: fk.FileData, extras: dict[str, Any]) -> IO[bytes]:
filepath = self.storage.full_path(data.location)
if not os.path.exists(filepath):
raise fk.exc.MissingFileError(self.storage, data.location)
return open(filepath, "rb") # noqa: SIM115
Note how it computes the path to the file using full_path()
, just as
Uploader
did. Basically, every method that access the file should use
full_path()
.
Also, pay attention to MissingFileError raised if file does not exist. That's the recommended way to handle missing files.
Finally, look at return result. The stream()
method must return
Iterable[bytes]
, but not just bytes, e.g. return b"hello"
is not valid
output.
Anything that can be used in a for-loop and produce bytes
is a valid output
of the steam()
method. Few examples:
...
return [b"hello", b" ", b"world"]
...
yield b"hello"
yield b" "
yield b"world"
...
return BytesIO(b"hello world")
...
return open(path, "rb")
Create the manager service
Manager
service contains a lot of methods and capabilities, but all of them
are pretty straightforward.
Capabilities tell the storage "what" service can do, while method implementations explain "how" it's done. Usually, capability and method come in pair, unless you are certain that you need to separate them.
capabilities: fk.Capability = (
fk.Capability.REMOVE
| fk.Capability.SCAN
| fk.Capability.EXISTS
| fk.Capability.ANALYZE
| fk.Capability.COPY
| fk.Capability.MOVE
| fk.Capability.COMPOSE
| fk.Capability.APPEND
)
remove()
method removes the object. If it's removed, the result is True
. If
it's not removed(because it does not exists), the result is False
.
@override
def remove(self, data: fk.FileData, extras: dict[str, Any]) -> bool:
filepath = self.storage.full_path(data.location)
if not os.path.exists(filepath):
return False
os.remove(filepath)
return True
scan()
returns an iterable of strings with names of all files available in
the storage. Note, this method yields path relative to the value of path
option of the storage. In this way, when you iterate through scan()
results
and pass filenames back to storage, it can wrap filenames into
storage.full_path()
and build correct locations.
If os.path.relname()
is not called, when you use items from the ckan()
,
path
would be included twice, producing incorrect locations.
@override
def scan(self, extras: dict[str, Any]) -> Iterable[str]:
path = self.storage.settings.path
search_path = os.path.join(path, "**")
for entry in glob.glob(search_path, recursive=True):
if not os.path.isfile(entry):
continue
yield os.path.relpath(entry, path)
exists()
returns True
if file exists, and False
if file is missing.
@override
def exists(self, data: fk.FileData, extras: dict[str, Any]) -> bool:
filepath = self.storage.full_path(data.location)
return os.path.exists(filepath)
analyze()
returns the same FileData as one, produced
during upload()
.
@override
def analyze(self, location: fk.types.Location, extras: dict[str, Any]) -> fk.FileData:
filepath = self.storage.full_path(location)
if not os.path.exists(filepath):
raise fk.exc.MissingFileError(self.storage, location)
with open(filepath, "rb") as src:
reader = fk.HashingReader(src)
content_type = magic.from_buffer(next(reader, b""), True)
reader.exhaust()
return fk.FileData(
location,
size=reader.position,
content_type=content_type,
hash=reader.get_hash(),
)
copy()
creates a copy of the file, raising
MissingFileError if source file is missing
and ExistingFileError if destination file
already exist and override_existing
is disabled.
@override
def copy(self, location: fk.types.Location, data: fk.FileData, extras: dict[str, Any]) -> fk.FileData:
src = self.storage.full_path(data.location)
dest = self.storage.full_path(location)
if not os.path.exists(src):
raise fk.exc.MissingFileError(self.storage, data.location)
if os.path.exists(dest) and not self.storage.settings.override_existing:
raise fk.exc.ExistingFileError(self.storage, location)
shutil.copy(src, dest)
return fk.FileData.from_object(data, location=location)
move()
behaves exactly like copy()
. In addition, the original file is not
available after the move. Basically, move()
does "rename" if possible, and
"copy" + "remove" if not.
@override
def move(self, location: fk.types.Location, data: fk.FileData, extras: dict[str, Any]) -> fk.FileData:
src = self.storage.full_path(data.location)
dest = self.storage.full_path(location)
if not os.path.exists(src):
raise fk.exc.MissingFileError(self.storage, data.location)
if os.path.exists(dest):
if self.storage.settings.override_existing:
os.remove(dest)
else:
raise fk.exc.ExistingFileError(self.storage, location)
shutil.move(src, dest)
return fk.FileData.from_object(data, location=location)
compose()
is the most challenging method of the Manager
. It takes few
existing files and combines them into a new one, similar to the cat
utility.
@override
def compose(self, location: fk.types.Location, datas: Iterable[fk.FileData], extras: dict[str, Any]) -> fk.FileData:
dest = self.storage.full_path(location)
if os.path.exists(dest) and not self.storage.settings.override_existing:
raise fk.exc.ExistingFileError(self.storage, location)
sources: list[str] = []
for data in datas:
src = self.storage.full_path(data.location)
if not os.path.exists(src):
raise fk.exc.MissingFileError(self.storage, data.location)
sources.append(src)
with open(dest, "wb") as to_fd:
for src in sources:
with open(src, "rb") as from_fd:
shutil.copyfileobj(from_fd, to_fd)
return self.analyze(location, extras)
append()
takes content of the existing file and adds it in the end of another
file.
@override
def append(self, data: fk.FileData, upload: fk.Upload, extras: dict[str, Any]) -> fk.FileData:
dest = self.storage.full_path(data.location)
if not os.path.exists(dest):
raise fk.exc.MissingFileError(self.storage, data.location)
with open(dest, "ab") as fd:
fd.write(upload.stream.read())
return self.analyze(data.location, extras)