Skip to content

Shared

All public utilites are collected inside ckanext.files.shared module. Avoid using anything that is not listed there. Do not import anything from modules other than shared.

get_storage(name=None)

Return existing storage instance.

Storages are initialized when plugin is loaded. As result, this function always returns the same storage object for the given name.

If no name specified, default storage is returned.

PARAMETER DESCRIPTION
name

name of the configured storage

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Storage

storage instance

RAISES DESCRIPTION
UnknownStorageError

storage with the given name is not configured

Example
default_storage = get_storage()
storage = get_storage("storage name")

make_storage(name, settings, prepare_settings=False)

Initialize storage instance with specified settings.

Storage adapter is defined by type key of the settings. All other settings depend on the specific adapter.

It's recommended to enable prepare_settings flag. When it's enabled, all standard parameters(max_size, supported_types) are added to settings if they are missing. But default this flag is disabled, because storages usually initialized using CKAN configuration, which is already validated by config declarations.

PARAMETER DESCRIPTION
name

name of the new storage

TYPE: str

settings

configuration for the storage

TYPE: dict[str, Any]

prepare_settings

add default values for missing options

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Storage

storage instance

RAISES DESCRIPTION
UnknownAdapterError

storage adapter is not registered

Example
storage = make_storage("memo", {"type": "files:redis"}, True)

make_upload(value)

Convert value into Upload object.

Use this function for simple and reliable initialization of Upload object. Avoid creating Upload manually, unless you are 100% sure you can provide correct MIMEtype, size and stream.

PARAMETER DESCRIPTION
value

content of the file

TYPE: Uploadable | Upload

RAISES DESCRIPTION
ValueError

incorrectly initialized cgi.FieldStorage passed as value

TypeError

content has unsupported type

RETURNS DESCRIPTION
Upload

upload object with specified content

Example
storage.upload("file.txt", make_upload(b"hello world"))

Upload dataclass

Standard upload details.

PARAMETER DESCRIPTION
stream

iterable of bytes or file-like object

TYPE: PUploadStream

filename

name of the file

TYPE: str

size

size of the file in bytes

TYPE: int

content_type

MIMEtype of the file

TYPE: str

Example
Upload(
    BytesIO(b"hello world"),
    "file.txt",
    11,
    "text/plain",
)

seekable_stream: types.PSeekableStream | None property

Return stream that can be rewinded after reading.

If internal stream does not support file-like seek, nothing is returned from this property.

Use this property if you want to read the file ahead, to get CSV column names, list of files inside ZIP, EXIF metadata. If you get None from it, stream does not support seeking and you won't be able to return cursor to the beginning of the file after reading something.

Example
upload = make_upload(...)
if fd := upload.seekable_stream():
    # read fragment of the file
    chunk = fd.read(1024)
    # move cursor to the end of the stream
    fd.seek(0, 2)
    # position of the cursor is the same as number of bytes in stream
    size = fd.tell()
    # move cursor back, because you don't want to accidentally loose
    # any bites from the beginning of stream when uploader reads from it
    fd.seek(0)
RETURNS DESCRIPTION
PSeekableStream | None

file-like stream or nothing

HashingReader

IO stream wrapper that computes content hash while stream is consumed.

PARAMETER DESCRIPTION
stream

iterable of bytes or file-like object

TYPE: PUploadStream

chunk_size

max number of bytes read at once

TYPE: int DEFAULT: CHUNK_SIZE

algorithm

hashing algorithm

TYPE: str DEFAULT: 'md5'

Example
reader = HashingReader(readable_stream)
for chunk in reader:
    ...
print(f"Hash: {reader.get_hash()}")

exhaust()

Exhaust internal stream to compute final version of content hash.

get_hash()

Get content hash as a string.

read()

Return content of the file as a single bytes object.

Capability

Bases: Flag

Enumeration of operations supported by the storage.

Example
read_and_write = Capability.STREAM | Capability.CREATE
if storage.supports(read_and_write)
    ...

exclude(*capabilities)

Remove capabilities from the cluster.

PARAMETER DESCRIPTION
capabilities

removed capabilities

TYPE: Capability

Example
cluster = cluster.exclude(Capability.REMOVE)

File

Bases: Base

Model with file details.

PARAMETER DESCRIPTION
name

name shown to users

TYPE: str

location

location of the file inside storage

TYPE: str

content_type

MIMEtype

TYPE: str

size

size in bytes

TYPE: int

hash

checksum

TYPE: str

storage

storage that contains the file

TYPE: str

ctime

date of creation

TYPE: datetime

mtime

date of the last update

TYPE: datetime | None

atime

date of last access(unstable)

TYPE: datetime | None

storage_data

additional data set by storage

TYPE: dict[str, Any]

plugin_data

additional data set by plugins

TYPE: dict[str, Any]

Example
file = File(
    name="file.txt",
    location="relative/path/safe-name.txt",
    content_type="text/plain",
    size=100,
    hash="abc123",
    storage="default",
)

Multipart

Bases: Base

Model with details of incomplete upload.

PARAMETER DESCRIPTION
name

name shown to users

TYPE: str

location

location of the file inside storage

TYPE: str

content_type

expected MIMEtype

TYPE: str

size

expected size in bytes

TYPE: int

hash

expected checksum

TYPE: str

storage

storage that contains the file

TYPE: str

ctime

date of creation

TYPE: datetime

storage_data

additional data set by storage

TYPE: dict[str, Any]

plugin_data

additional data set by plugins

TYPE: dict[str, Any]

Example
upload = Multipart(
    name="file.txt",
    location="relative/path/safe-name.txt",
    content_type="text/plain",
    size=100,
    hash="abc123",
    storage="default",
)

Owner

Bases: Base

Model with details about current owner of an item.

PARAMETER DESCRIPTION
item_id

ID of the owned object

TYPE: str

item_type

type of the owned object

TYPE: str

owner_id

ID of the owner

TYPE: str

owner_type

Type of the owner

TYPE: str

pinned

is ownership protected from transfer

TYPE: bool

Example
owner = Owner(
    item_id=file.id,
    item_type="file",
    owner_id=user.id,
    owner_type="user,
)

TransferHistory

Bases: Base

Model for tracking ownership history of the file.

PARAMETER DESCRIPTION
item_id

ID of the owned object

TYPE: str

item_type

type of the owned object

TYPE: str

owner_id

ID of the owner

TYPE: str

owner_type

Type of the owner

TYPE: str

leave_date

date of ownership transfer to a different owner

TYPE: datetime

actor

user who initiated ownership transfer

TYPE: str | None

Example
record = TransferHistory(
    item_id=file.id,
    item_type="file",
    owner_id=prev_owner.owner_id,
    owner_type=prev_owner.owner_type,
)

FileData dataclass

Bases: BaseData[File]

Information required by storage to operate the file.

PARAMETER DESCRIPTION
location

filepath, filename or any other type of unique identifier

TYPE: str

size

size of the file in bytes

TYPE: int DEFAULT: 0

content_type

MIMEtype of the file

TYPE: str DEFAULT: 'application/octet-stream'

hash

checksum of the file

TYPE: str DEFAULT: ''

storage_data

additional details set by storage adapter

TYPE: dict[str, Any] DEFAULT: dict()

Example
FileData(
    "local/path.txt",
    123,
    "text/plain",
    md5_of_content,
)

MultipartData dataclass

Bases: BaseData[Multipart]

Information required by storage to operate the incomplete upload.

PARAMETER DESCRIPTION
location

filepath, filename or any other type of unique identifier

TYPE: str DEFAULT: ''

size

expected size of the file in bytes

TYPE: int DEFAULT: 0

content_type

expected MIMEtype of the file

TYPE: str DEFAULT: ''

hash

expected checksum of the file

TYPE: str DEFAULT: ''

storage_data

additional details set by storage adapter

TYPE: dict[str, Any] DEFAULT: dict()

Example
MultipartData(
    "local/path.txt",
    expected_size,
    expected_content_type,
    expected_hash,
)

IFiles

Bases: Interface

Extension point for ckanext-files.

This interface is not stabilized. Implement it with inherit=True.

Example
class MyPlugin(p.SingletonPlugin):
    p.implements(interfaces.IFiles, inherit=True)

files_file_allows(context, file, operation)

Decide if user is allowed to perform specified operation on the file.

Return True/False if user allowed/not allowed. Return None to rely on other plugins.

Default implementation relies on cascade_access config option. If owner of file is included into cascade access, user can perform operation on file if he can perform the same operation with file's owner.

If current owner is not affected by cascade access, user can perform operation on file only if user owns the file.

PARAMETER DESCRIPTION
context

API context

TYPE: Context

file

accessed file object

TYPE: File | Multipart

operation

performed operation

TYPE: FileOperation

RETURNS DESCRIPTION
bool | None

decision whether operation is allowed for the file

Example
def files_file_allows(
        self, context,
        file: shared.File | shared.Multipart,
        operation: shared.types.FileOperation
) -> bool | None:
    if file.owner_info and file.owner_info.owner_type == "resource":
        return is_authorized_boolean(
            f"resource_{operation}",
            context,
            {"id": file.owner_info.id}
        )

    return None

files_get_storage_adapters()

Return mapping of storage type to adapter class.

RETURNS DESCRIPTION
dict[str, Any]

adapters provided by the implementation

Example
def files_get_storage_adapters(self):
    return {
        "my_ext:dropbox": DropboxStorage,
    }

files_owner_allows(context, owner_type, owner_id, operation)

Decide if user is allowed to perform specified operation on the owner.

Return True/False if user allowed/not allowed. Return None to rely on other plugins.

PARAMETER DESCRIPTION
context

API context

TYPE: Context

owner_type

type of the tested owner

TYPE: str

owner_id

type of the tested owner

TYPE: str

operation

performed operation

TYPE: OwnerOperation

RETURNS DESCRIPTION
bool | None

decision whether operation is allowed for the owner

Example
def files_owner_allows(
        self, context,
        owner_type: str, owner_id: str,
        operation: shared.types.OwnerOperation
) -> bool | None:
    if owner_type == "resource" and operation == "file_transfer":
        return is_authorized_boolean(
            f"resource_update",
            context,
            {"id": owner_id}
        )

    return None

files_register_owner_getters()

Return mapping with lookup functions for owner types.

Name of the getter is the name used as Owner.owner_type. The getter itself is a function that accepts owner ID and returns optional owner entity.

RETURNS DESCRIPTION
dict[str, Callable[[str], Any]]

getters for specific owner types

Example
def files_register_owner_getters(self):
    return {"resource": model.Resource.get}

Storage

Bases: OptionChecker, ABC

Base class for storage implementation.

PARAMETER DESCRIPTION
settings

storage configuration

TYPE: dict[str, Any]

Example
class MyStorage(Storage):
    def make_uploader(self):
        return MyUploader(self)

    def make_reader(self):
        return MyReader(self)

    def make_manager(self):
        return MyManager(self)

max_size: int property

Max allowed upload size.

Max size set to 0 removes all limitations.

supported_types: list[str] property

List of supported MIMEtypes or their parts.

prepare_settings(settings) classmethod

Add all required items to settings.

This is usually done by config declarations. But when storage is initialized manually, via make_storage, settings are not validated.

Use this method to transform arbitrary dictionary into expected form of settings. Don't do too much work, just adding missing parameters should be enough.

Passing sane values with valid types is still responsibility of the developer.

PARAMETER DESCRIPTION
settings

configuration that required preparation

TYPE: dict[str, Any]

Example
settings = Storage.prepare_settings({})
storage = Storage(settings)

range(data, start=0, end=None, /, **kwargs)

Return byte-stream of the file content.

stream_as_upload(data, **kwargs)

Make an Upload with file content.

Uploader

Bases: StorageService

Service responsible for writing data into a storage.

Storage internally calls methods of this service. For example, Storage.upload(location, upload, **kwargs) results in Uploader.upload(location, upload, kwargs).

Example
class MyUploader(Uploader):
    def upload(
        self, location: str, upload: Upload, extras: dict[str, Any]
    ) -> FileData:
        reader = upload.hashing_reader()

        with open(location, "wb") as dest:
            dest.write(reader.read())

        return FileData(
            location, upload.size,
            upload.content_type,
            reader.get_hash()
        )

multipart_complete(data, extras)

Verify file integrity and finalize incomplete upload.

multipart_refresh(data, extras)

Show details of the incomplete upload.

multipart_start(location, data, extras)

Prepare everything for multipart(resumable) upload.

multipart_update(data, extras)

Add data to the incomplete upload.

upload(location, upload, extras)

Upload file using single stream.

Reader

Bases: StorageService

Service responsible for reading data from the storage.

Storage internally calls methods of this service. For example, Storage.stream(data, **kwargs) results in Reader.stream(data, kwargs).

Example
class MyReader(Reader):
    def stream(
        self, data: FileData, extras: dict[str, Any]
    ) -> Iterable[bytes]:
        return open(data.location, "rb")

content(data, extras)

Return file content as a single byte object.

Return one-time download link.

Return permanent download link.

Return public link.

range(data, start, end, extras)

Return byte-stream of the file content.

stream(data, extras)

Return byte-stream of the file content.

Return temporal download link.

extras["ttl"] controls lifetime of the link(30 seconds by default).

Manager

Bases: StorageService

Service responsible for maintenance file operations.

Storage internally calls methods of this service. For example, Storage.remove(data, **kwargs) results in Manager.remove(data, kwargs).

Example
class MyManager(Manager):
    def remove(
        self, data: FileData|MultipartData, extras: dict[str, Any]
    ) -> bool:
        os.remove(data.location)
        return True

analyze(location, extras)

Return all details about filename.

append(data, upload, extras)

Append content to existing file.

compose(datas, location, extras)

Combine multipe file inside the storage into a new one.

copy(data, location, extras)

Copy file inside the storage.

exists(data, extras)

Check if file exists in the storage.

move(data, location, extras)

Move file to a different location inside the storage.

remove(data, extras)

Remove file from the storage.

scan(extras)

List all locations(filenames) in storage.

add_task(task)

Add task to the current task queue.

This function can be called only inside task queue context. Such context initialized automatically inside functions decorated with with_task_queue:

Example
@with_task_queue
def taks_producer():
    add_task(...)

task_producer()

Task queue context can be initialized manually using TaskQueue and with statement:

Example
queue = TaskQueue()
with queue:
    add_task(...)

queue.process(execution_data)

with_task_queue(func, name=None)

Decorator for functions that schedule tasks.

Decorated function automatically initializes separate task queue which is processed when function is finished. All tasks receive function's result as execution data(first argument of Task.run).

Without this decorator, you have to manually create task queue context before queuing tasks.

Example
@with_task_queue
def my_action(context, data_dict):
    ...

Task

Bases: ABC

Base task for TaskQueue.

The only requirement for subclasses is implementing Task.run.

extract(source, path) staticmethod

Extract value from dictionary using FlattenKey from validators.

PARAMETER DESCRIPTION
source

dictionary with data

TYPE: dict[str, Any]

path

path to the extracted member from source

TYPE: FlattenKey

Example
data = {"a": {"b": {"c": 42}}}
assert Task.extract(data, ("a", "b", "c")) == 42

run(result, idx, prev) abstractmethod

Execute task.

result is an arbitrary data passed into every task in the queue. idx reflects current task's position in queue. prev contains result of previous task or Task.NO_PREVIOUS_TASK if current task is the first in the queue.

TaskQueue

Thread-safe context for managing tasks.

Example
queue = TaskQueue()
with queue:
    function_that_adds_tasks_to_queue()
data_passed_into_tasks = ...
queue.process(data_passed_into_tasks)

process(data)

Execute queued tasks in FIFO order.

data is passed to every task as a first argument. In addition, task receives its position in execution queue and results of the previous task. The first task receives Task.NO_PREVIOUS_TASK as third argument, because there are no results from previous task yet.

types

Types for the extension.

Details

config

Configuration readers of the extension.

This module contains functions that simplify accessing configuration option from the CKAN config file.

It's recommended to use these functions istead of accessing config options by name, if you want your code to be more compatible with different versions of the extension.

Details

exc

Exception definitions for the extension.

Hierarchy:

  • Exception
    • FilesError
      • QueueError
        • OutOfQueueError
      • StorageError
        • UnknownAdapterError
        • UnknownStorageError
        • UnsupportedOperationError
        • PermissionError
        • MissingFileError
        • ExistingFileError
        • ExtrasError
          • MissingExtrasError
        • InvalidStorageConfigurationError
          • MissingStorageConfigurationError
        • UploadError
          • WrongUploadTypeError
          • NameStrategyError
          • ContentError
          • LargeUploadError
            • UploadOutOfBoundError
          • UploadMismatchError
            • UploadTypeMismatchError
            • UploadHashMismatchError
            • UploadSizeMismatchError

Details