Skip to content

Dissecting storage adapter

This guide walks you through implementation of filesystem adapter. We'll look at every part of it from the outer layer and going inwards, analyzing the meaning of the units and reasons they exist.

Register an adapter

Adapters must be registered to make them available via make_storage.

Define an entry-point with the name file_keeper_ext in the distribution. This entry-point specifies python module that contains implementation of the pluggy hooks that extends file-keeper.

[project.entry-points.file_keeper_ext]
file_keeper = "file_keeper.default"

file-keeper expects to find a function decorated with @file_keeper.hookimpl and named register_adapters inside the module. This function registers all custom adapters of the module via call to register() method of the Registry with adapters. This call accepts the name of the adapter as a first argument, and the class of the adapter as a second argument.

@ext.hookimpl
def register_adapters(registry: Registry[type[Storage]]):
    """Built-in storage adapters."""
    registry.register("file_keeper:fs", adapters.FsStorage)

Adapter names have no restrictions regarding length or allowed symbols, but it's recommended to use <package>:<type> structure, like in file_keeper:fs. If you provide multiple similar adapters, consider adding third segment, i.e. file_keeper:fs:v1, file_keeper:fs:v2.

Create an adapter

The adapter itself is simple and usually contains just few lines of code.

It must extend Storage and, optionally, it can override SettingsFactory, UploaderFactory, ManagerFactory, and ReaderFactory.

class FsStorage(fk.Storage):
    """Filesystem storage adapter.

    Stores files on the local filesystem. The `path` setting must be
    configured to point to the base directory where files are stored.

    Example configuration:

    ```py
    import file_keeper as fk

    settings = {
        "type": "file_keeper:fs",
        "path": "/path/to/storage",
        "initialize": True,
        "override_existing": False,
    }
    storage = fk.make_storage("fs", settings)
    ```

    Note:
    * The `path` must be an absolute path.
    * The `path` directory must be writable by the application.
    * The `location` used in file operations is relative to the `path`.
    * The `location` is not sanitized and can lead outside the configured
      `path`. Consider using combination of `storage.prepare_location` with
      `settings.location_transformers` that sanitizes the path, like
      `safe_relative_path`.
    * If `initialize` is `True`, the storage will attempt to create the
      directory if it does not exist.
    * If `override_existing` is `False`, operations that would overwrite an
      existing file will raise an `ExistingFileError`.
    """

    settings: Settings

    SettingsFactory = Settings
    UploaderFactory = Uploader
    ReaderFactory = Reader
    ManagerFactory = Manager

Filesystem adapter overrides all these attributes because:

  • it contains custom settings(SettingsFactory)
  • it defines how the file is uploaded (UploaderFactory)
  • it defines how the file is managed, i.e. removed, copied, analyzed (ManagerFactory)
  • it defines how file is read (ReaderFactory)

Additionally it specifies type of settings attribute as settings: Settings, i.e. custom Settings class that is defined in the same module. This is done to simplify typing and does not affect the behavior of the adapter. Without this line typechecker assumes that storage uses base Settings and complains when custom options are accessed.

Define storage settings

Create a dataclass Settings to hold configuration options specific to your storage. This class should inherit from Settings.

@dataclasses.dataclass()
class Settings(fk.Settings):
    """Settings for FS storage."""

Filesystem settings do not introduce new options, so there are no attributes here. But some other provider would do the following:

@dataclasses.dataclass()
class Settings(fk.Settings):
    bucket: str = ""
    username: str = ""
    password: str = ""
    params: dict[str, Any] = dataclasses.field(default_factory=dict)

These options must include default values due to dataclass restrictions, even if storage will not work with these defaults. E.g., empty password and username won't work usually, but you still have to specify them.

And now happens validation. It's provided by the __post_init__ method of the dataclass.

    def __post_init__(self, **kwargs: Any):
        super().__post_init__(**kwargs)

        if not os.path.exists(self.path):
            if not self.initialize:
                raise fk.exc.InvalidStorageConfigurationError(
                    self.name,
                    f"path `{self.path}` does not exist",
                )

            try:
                os.makedirs(self.path)
            except PermissionError as err:
                raise fk.exc.InvalidStorageConfigurationError(
                    self.name,
                    f"path `{self.path}` is not writable",
                ) from err

Filesystem adapter relies on two generic options. First is initialize. When this flag is enabled, storage checks whether directory for files exists and, if it's missing, tries to create this directory. If task fails, InvalidStorageConfigurationError is raised. That's how storage reacts on problems with configuration.

Second option is path. path, usually absolute, defines the location in filesystem where files are stored. Other storages may treat it differently: cloud storages use path as a prefix of the file name, because cloud storages do not support directory hierarchy; SQLAlchemy storage ignores path as it has no meaning in DB context.

Apart from this, storages often initialize and store connections to external services as storage attributes. For example, file_keeper:azure_blob has container_name string options that holds the name of cloud container where files are stored. Inside __post_init__ it connects to the container and stores the container object itself as container property, so that storage doesn't need to constantly re-connect to the containers.

Create the uploader service

The next target is Uploader. It's a service reesponsible for file creation that must extend Uploader.

class Uploader(fk.Uploader):
    """Filesystem uploader."""

Any service consists of two parts - methods and capabilities. Methods describe the logic but are hidden from storage initially. I.e., if you only define methods and ignore capabilities, storage will pretend that service cannot perform the task.

    @override
    def upload(self, location: fk.types.Location, upload: fk.Upload, extras: dict[str, Any]) -> fk.FileData:
        ...

And capabilities actually tell storage about operations supported by storage. Because of this separation, you can pretend that storage cannot perform an operation, even when it's supported. In this way you can transform filesystem into a read-only storage without code changes and guarantee that files won't be accidentally removed from it.

    capabilities: fk.Capability = fk.Capability.CREATE | fk.Capability.RESUMABLE

As you can see, FS storage supports CREATE and RESUMABLE. Capabilities are implemented as bit masks and can be combined using | operator.

Let's look closer at the upload() method.

It computes full path to the file location in the beginning. full_path() is a generic method of the storage that naively combine path option of the storage with location of the file. Every storage has path option and every storage can use full_path() to attach path as a prefix to the location, if it makes any sense.

        dest = self.storage.full_path(location)

Now storage uses another generic option, override_existing. If it's disabled and given location already taken by another file, uploader raises ExistingFileError. That's recommended reaction in such situation and you'll notice that other storages also follow this process.

        if not self.storage.settings.override_existing and os.path.exists(dest):
            raise fk.exc.ExistingFileError(self.storage, location)

Then storage ensures that all intermediate folders from the final file's location are present. If you expect that files are loaded directly into path, it may seem redundant. But FS storage does not imply such restrictions and there may be nested directories under the path, so verifying that all folders are created is a safest option.

        os.makedirs(os.path.dirname(dest), exist_ok=True)

Then file is actually written to the FS. You can read file content using upload.stream.read(), but here we create HashingReader using hashing_reader() method of the Upload. This object also has read() method, but in addition it computes the content hash of the file while it's consumed. As result we have the hash in the end almost for free.

        reader = upload.hashing_reader()
        with open(dest, "wb") as fd:
            for chunk in reader:
                fd.write(chunk)

Other storages, like AWS S3 or Azure Blob Storage, do not need this step, because content hash is computed by cloud provided and returned with the metadata of the uploaded object. But if you don't have a cheap way to obtain the hash, using HashingReader is the recommended option.

In the end, upload() method of the service builds FileData with details of the uploaded file and returns it to the caller.

        return fk.FileData(
            location,
            os.path.getsize(dest),
            upload.content_type,
            reader.get_hash(),
        )

Create the reader service

Reader service is much simpler than Uploader. It exposes STREAM capability to notify the storage, that files can be read from the storage. And it implements stream() method, that explains how exactly bytes of content are obtained.

class Reader(fk.Reader):
    """Filesystem reader."""

    storage: FsStorage
    capabilities: fk.Capability = fk.Capability.STREAM

    @override
    def stream(self, data: fk.FileData, extras: dict[str, Any]) -> IO[bytes]:
        filepath = self.storage.full_path(data.location)
        if not os.path.exists(filepath):
            raise fk.exc.MissingFileError(self.storage, data.location)

        return open(filepath, "rb")  # noqa: SIM115

Note how it computes the path to the file using full_path(), just as Uploader did. Basically, every method that access the file should use full_path().

Also, pay attention to MissingFileError raised if file does not exist. That's the recommended way to handle missing files.

Finally, look at return result. The stream() method must return Iterable[bytes], but not just bytes, e.g. return b"hello" is not valid output.

Anything that can be used in a for-loop and produce bytes is a valid output of the steam() method. Few examples:

...
return [b"hello", b" ", b"world"]
...
yield b"hello"
yield b" "
yield b"world"
...
return BytesIO(b"hello world")
...
return open(path, "rb")

Create the manager service

Manager service contains a lot of methods and capabilities, but all of them are pretty straightforward.

Capabilities tell the storage "what" service can do, while method implementations explain "how" it's done. Usually, capability and method come in pair, unless you are certain that you need to separate them.

    capabilities: fk.Capability = (
        fk.Capability.REMOVE
        | fk.Capability.SCAN
        | fk.Capability.EXISTS
        | fk.Capability.ANALYZE
        | fk.Capability.COPY
        | fk.Capability.MOVE
        | fk.Capability.COMPOSE
        | fk.Capability.APPEND
    )

remove() method removes the object. If it's removed, the result is True. If it's not removed(because it does not exists), the result is False.

    @override
    def remove(self, data: fk.FileData, extras: dict[str, Any]) -> bool:
        filepath = self.storage.full_path(data.location)
        if not os.path.exists(filepath):
            return False

        os.remove(filepath)
        return True

scan() returns an iterable of strings with names of all files available in the storage. Note, this method yields path relative to the value of path option of the storage. In this way, when you iterate through scan() results and pass filenames back to storage, it can wrap filenames into storage.full_path() and build correct locations.

If os.path.relname() is not called, when you use items from the ckan(), path would be included twice, producing incorrect locations.

    @override
    def scan(self, extras: dict[str, Any]) -> Iterable[str]:
        path = self.storage.settings.path
        search_path = os.path.join(path, "**")

        for entry in glob.glob(search_path, recursive=True):
            if not os.path.isfile(entry):
                continue
            yield os.path.relpath(entry, path)

exists() returns True if file exists, and False if file is missing.

    @override
    def exists(self, data: fk.FileData, extras: dict[str, Any]) -> bool:
        filepath = self.storage.full_path(data.location)
        return os.path.exists(filepath)

analyze() returns the same FileData as one, produced during upload().

    @override
    def analyze(self, location: fk.types.Location, extras: dict[str, Any]) -> fk.FileData:
        filepath = self.storage.full_path(location)
        if not os.path.exists(filepath):
            raise fk.exc.MissingFileError(self.storage, location)

        with open(filepath, "rb") as src:
            reader = fk.HashingReader(src)
            content_type = magic.from_buffer(next(reader, b""), True)
            reader.exhaust()

        return fk.FileData(
            location,
            size=reader.position,
            content_type=content_type,
            hash=reader.get_hash(),
        )

copy() creates a copy of the file, raising MissingFileError if source file is missing and ExistingFileError if destination file already exist and override_existing is disabled.

    @override
    def copy(self, location: fk.types.Location, data: fk.FileData, extras: dict[str, Any]) -> fk.FileData:
        src = self.storage.full_path(data.location)
        dest = self.storage.full_path(location)

        if not os.path.exists(src):
            raise fk.exc.MissingFileError(self.storage, data.location)

        if os.path.exists(dest) and not self.storage.settings.override_existing:
            raise fk.exc.ExistingFileError(self.storage, location)

        shutil.copy(src, dest)
        return fk.FileData.from_object(data, location=location)

move() behaves exactly like copy(). In addition, the original file is not available after the move. Basically, move() does "rename" if possible, and "copy" + "remove" if not.

    @override
    def move(self, location: fk.types.Location, data: fk.FileData, extras: dict[str, Any]) -> fk.FileData:
        src = self.storage.full_path(data.location)
        dest = self.storage.full_path(location)

        if not os.path.exists(src):
            raise fk.exc.MissingFileError(self.storage, data.location)

        if os.path.exists(dest):
            if self.storage.settings.override_existing:
                os.remove(dest)
            else:
                raise fk.exc.ExistingFileError(self.storage, location)

        shutil.move(src, dest)
        return fk.FileData.from_object(data, location=location)

compose() is the most challenging method of the Manager. It takes few existing files and combines them into a new one, similar to the cat utility.

    @override
    def compose(self, location: fk.types.Location, datas: Iterable[fk.FileData], extras: dict[str, Any]) -> fk.FileData:
        dest = self.storage.full_path(location)

        if os.path.exists(dest) and not self.storage.settings.override_existing:
            raise fk.exc.ExistingFileError(self.storage, location)

        sources: list[str] = []
        for data in datas:
            src = self.storage.full_path(data.location)

            if not os.path.exists(src):
                raise fk.exc.MissingFileError(self.storage, data.location)

            sources.append(src)

        with open(dest, "wb") as to_fd:
            for src in sources:
                with open(src, "rb") as from_fd:
                    shutil.copyfileobj(from_fd, to_fd)

        return self.analyze(location, extras)

append() takes content of the existing file and adds it in the end of another file.

    @override
    def append(self, data: fk.FileData, upload: fk.Upload, extras: dict[str, Any]) -> fk.FileData:
        dest = self.storage.full_path(data.location)
        if not os.path.exists(dest):
            raise fk.exc.MissingFileError(self.storage, data.location)

        with open(dest, "ab") as fd:
            fd.write(upload.stream.read())

        return self.analyze(data.location, extras)