Shared
All public utilites are collected inside ckanext.files.shared
module. Avoid
using anything that is not listed there. Do not import anything from modules
other than shared
.
get_storage(name=None)
Return existing storage instance.
Storages are initialized when plugin is loaded. As result, this function always returns the same storage object for the given name.
If no name specified, default storage is returned.
PARAMETER | DESCRIPTION |
---|---|
name |
name of the configured storage
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Storage
|
storage instance |
RAISES | DESCRIPTION |
---|---|
UnknownStorageError
|
storage with the given name is not configured |
Example
default_storage = get_storage()
storage = get_storage("storage name")
make_storage(name, settings, prepare_settings=False)
Initialize storage instance with specified settings.
Storage adapter is defined by type
key of the settings. All other
settings depend on the specific adapter.
It's recommended to enable prepare_settings
flag. When it's enabled, all
standard parameters(max_size, supported_types) are added to settings if
they are missing. But default this flag is disabled, because storages
usually initialized using CKAN configuration, which is already validated by
config declarations.
PARAMETER | DESCRIPTION |
---|---|
name |
name of the new storage
TYPE:
|
settings |
configuration for the storage
TYPE:
|
prepare_settings |
add default values for missing options
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Storage
|
storage instance |
RAISES | DESCRIPTION |
---|---|
UnknownAdapterError
|
storage adapter is not registered |
Example
storage = make_storage("memo", {"type": "files:redis"}, True)
make_upload(value)
Convert value into Upload object.
Use this function for simple and reliable initialization of Upload object. Avoid creating Upload manually, unless you are 100% sure you can provide correct MIMEtype, size and stream.
PARAMETER | DESCRIPTION |
---|---|
value |
content of the file
TYPE:
|
RAISES | DESCRIPTION |
---|---|
ValueError
|
incorrectly initialized cgi.FieldStorage passed as value |
TypeError
|
content has unsupported type |
RETURNS | DESCRIPTION |
---|---|
Upload
|
upload object with specified content |
Example
storage.upload("file.txt", make_upload(b"hello world"))
Upload
dataclass
Standard upload details.
PARAMETER | DESCRIPTION |
---|---|
stream |
iterable of bytes or file-like object
TYPE:
|
filename |
name of the file
TYPE:
|
size |
size of the file in bytes
TYPE:
|
content_type |
MIMEtype of the file
TYPE:
|
Example
Upload(
BytesIO(b"hello world"),
"file.txt",
11,
"text/plain",
)
seekable_stream: types.PSeekableStream | None
property
Return stream that can be rewinded after reading.
If internal stream does not support file-like seek
, nothing is
returned from this property.
Use this property if you want to read the file ahead, to get CSV column
names, list of files inside ZIP, EXIF metadata. If you get None
from
it, stream does not support seeking and you won't be able to return
cursor to the beginning of the file after reading something.
Example
upload = make_upload(...)
if fd := upload.seekable_stream():
# read fragment of the file
chunk = fd.read(1024)
# move cursor to the end of the stream
fd.seek(0, 2)
# position of the cursor is the same as number of bytes in stream
size = fd.tell()
# move cursor back, because you don't want to accidentally loose
# any bites from the beginning of stream when uploader reads from it
fd.seek(0)
RETURNS | DESCRIPTION |
---|---|
PSeekableStream | None
|
file-like stream or nothing |
HashingReader
IO stream wrapper that computes content hash while stream is consumed.
PARAMETER | DESCRIPTION |
---|---|
stream |
iterable of bytes or file-like object
TYPE:
|
chunk_size |
max number of bytes read at once
TYPE:
|
algorithm |
hashing algorithm
TYPE:
|
Example
reader = HashingReader(readable_stream)
for chunk in reader:
...
print(f"Hash: {reader.get_hash()}")
exhaust()
Exhaust internal stream to compute final version of content hash.
get_hash()
Get content hash as a string.
read()
Return content of the file as a single bytes object.
Capability
Bases: Flag
Enumeration of operations supported by the storage.
Example
read_and_write = Capability.STREAM | Capability.CREATE
if storage.supports(read_and_write)
...
exclude(*capabilities)
Remove capabilities from the cluster.
PARAMETER | DESCRIPTION |
---|---|
capabilities |
removed capabilities
TYPE:
|
Example
cluster = cluster.exclude(Capability.REMOVE)
File
Bases: Base
Model with file details.
PARAMETER | DESCRIPTION |
---|---|
name |
name shown to users
TYPE:
|
location |
location of the file inside storage
TYPE:
|
content_type |
MIMEtype
TYPE:
|
size |
size in bytes
TYPE:
|
hash |
checksum
TYPE:
|
storage |
storage that contains the file
TYPE:
|
ctime |
date of creation
TYPE:
|
mtime |
date of the last update
TYPE:
|
atime |
date of last access(unstable)
TYPE:
|
storage_data |
additional data set by storage
TYPE:
|
plugin_data |
additional data set by plugins
TYPE:
|
Example
file = File(
name="file.txt",
location="relative/path/safe-name.txt",
content_type="text/plain",
size=100,
hash="abc123",
storage="default",
)
Multipart
Bases: Base
Model with details of incomplete upload.
PARAMETER | DESCRIPTION |
---|---|
name |
name shown to users
TYPE:
|
location |
location of the file inside storage
TYPE:
|
content_type |
expected MIMEtype
TYPE:
|
size |
expected size in bytes
TYPE:
|
hash |
expected checksum
TYPE:
|
storage |
storage that contains the file
TYPE:
|
ctime |
date of creation
TYPE:
|
storage_data |
additional data set by storage
TYPE:
|
plugin_data |
additional data set by plugins
TYPE:
|
Example
upload = Multipart(
name="file.txt",
location="relative/path/safe-name.txt",
content_type="text/plain",
size=100,
hash="abc123",
storage="default",
)
Owner
Bases: Base
Model with details about current owner of an item.
PARAMETER | DESCRIPTION |
---|---|
item_id |
ID of the owned object
TYPE:
|
item_type |
type of the owned object
TYPE:
|
owner_id |
ID of the owner
TYPE:
|
owner_type |
Type of the owner
TYPE:
|
pinned |
is ownership protected from transfer
TYPE:
|
Example
owner = Owner(
item_id=file.id,
item_type="file",
owner_id=user.id,
owner_type="user,
)
TransferHistory
Bases: Base
Model for tracking ownership history of the file.
PARAMETER | DESCRIPTION |
---|---|
item_id |
ID of the owned object
TYPE:
|
item_type |
type of the owned object
TYPE:
|
owner_id |
ID of the owner
TYPE:
|
owner_type |
Type of the owner
TYPE:
|
leave_date |
date of ownership transfer to a different owner
TYPE:
|
actor |
user who initiated ownership transfer
TYPE:
|
Example
record = TransferHistory(
item_id=file.id,
item_type="file",
owner_id=prev_owner.owner_id,
owner_type=prev_owner.owner_type,
)
FileData
dataclass
Bases: BaseData[File]
Information required by storage to operate the file.
PARAMETER | DESCRIPTION |
---|---|
location |
filepath, filename or any other type of unique identifier
TYPE:
|
size |
size of the file in bytes
TYPE:
|
content_type |
MIMEtype of the file
TYPE:
|
hash |
checksum of the file
TYPE:
|
storage_data |
additional details set by storage adapter
TYPE:
|
Example
FileData(
"local/path.txt",
123,
"text/plain",
md5_of_content,
)
MultipartData
dataclass
Bases: BaseData[Multipart]
Information required by storage to operate the incomplete upload.
PARAMETER | DESCRIPTION |
---|---|
location |
filepath, filename or any other type of unique identifier
TYPE:
|
size |
expected size of the file in bytes
TYPE:
|
content_type |
expected MIMEtype of the file
TYPE:
|
hash |
expected checksum of the file
TYPE:
|
storage_data |
additional details set by storage adapter
TYPE:
|
Example
MultipartData(
"local/path.txt",
expected_size,
expected_content_type,
expected_hash,
)
IFiles
Bases: Interface
Extension point for ckanext-files.
This interface is not stabilized. Implement it with inherit=True
.
Example
class MyPlugin(p.SingletonPlugin):
p.implements(interfaces.IFiles, inherit=True)
files_file_allows(context, file, operation)
Decide if user is allowed to perform specified operation on the file.
Return True/False if user allowed/not allowed. Return None
to rely on
other plugins.
Default implementation relies on cascade_access config option. If owner of file is included into cascade access, user can perform operation on file if he can perform the same operation with file's owner.
If current owner is not affected by cascade access, user can perform operation on file only if user owns the file.
PARAMETER | DESCRIPTION |
---|---|
context |
API context
TYPE:
|
file |
accessed file object
TYPE:
|
operation |
performed operation
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool | None
|
decision whether operation is allowed for the file |
Example
def files_file_allows(
self, context,
file: shared.File | shared.Multipart,
operation: shared.types.FileOperation
) -> bool | None:
if file.owner_info and file.owner_info.owner_type == "resource":
return is_authorized_boolean(
f"resource_{operation}",
context,
{"id": file.owner_info.id}
)
return None
files_get_storage_adapters()
Return mapping of storage type to adapter class.
RETURNS | DESCRIPTION |
---|---|
dict[str, Any]
|
adapters provided by the implementation |
Example
def files_get_storage_adapters(self):
return {
"my_ext:dropbox": DropboxStorage,
}
files_owner_allows(context, owner_type, owner_id, operation)
Decide if user is allowed to perform specified operation on the owner.
Return True/False if user allowed/not allowed. Return None
to rely on
other plugins.
PARAMETER | DESCRIPTION |
---|---|
context |
API context
TYPE:
|
owner_type |
type of the tested owner
TYPE:
|
owner_id |
type of the tested owner
TYPE:
|
operation |
performed operation
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
bool | None
|
decision whether operation is allowed for the owner |
Example
def files_owner_allows(
self, context,
owner_type: str, owner_id: str,
operation: shared.types.OwnerOperation
) -> bool | None:
if owner_type == "resource" and operation == "file_transfer":
return is_authorized_boolean(
f"resource_update",
context,
{"id": owner_id}
)
return None
files_register_owner_getters()
Return mapping with lookup functions for owner types.
Name of the getter is the name used as Owner.owner_type
. The getter
itself is a function that accepts owner ID and returns optional owner
entity.
RETURNS | DESCRIPTION |
---|---|
dict[str, Callable[[str], Any]]
|
getters for specific owner types |
Example
def files_register_owner_getters(self):
return {"resource": model.Resource.get}
Storage
Bases: OptionChecker
, ABC
Base class for storage implementation.
PARAMETER | DESCRIPTION |
---|---|
settings |
storage configuration
TYPE:
|
Example
class MyStorage(Storage):
def make_uploader(self):
return MyUploader(self)
def make_reader(self):
return MyReader(self)
def make_manager(self):
return MyManager(self)
max_size: int
property
Max allowed upload size.
Max size set to 0 removes all limitations.
supported_types: list[str]
property
List of supported MIMEtypes or their parts.
prepare_settings(settings)
classmethod
Add all required items to settings.
This is usually done by config declarations. But when storage is
initialized manually, via make_storage
, settings are not validated.
Use this method to transform arbitrary dictionary into expected form of settings. Don't do too much work, just adding missing parameters should be enough.
Passing sane values with valid types is still responsibility of the developer.
PARAMETER | DESCRIPTION |
---|---|
settings |
configuration that required preparation
TYPE:
|
Example
settings = Storage.prepare_settings({})
storage = Storage(settings)
range(data, start=0, end=None, /, **kwargs)
Return byte-stream of the file content.
stream_as_upload(data, **kwargs)
Make an Upload with file content.
Uploader
Bases: StorageService
Service responsible for writing data into a storage.
Storage
internally calls methods of this service. For example,
Storage.upload(location, upload, **kwargs)
results in
Uploader.upload(location, upload, kwargs)
.
Example
class MyUploader(Uploader):
def upload(
self, location: str, upload: Upload, extras: dict[str, Any]
) -> FileData:
reader = upload.hashing_reader()
with open(location, "wb") as dest:
dest.write(reader.read())
return FileData(
location, upload.size,
upload.content_type,
reader.get_hash()
)
multipart_complete(data, extras)
Verify file integrity and finalize incomplete upload.
multipart_refresh(data, extras)
Show details of the incomplete upload.
multipart_start(location, data, extras)
Prepare everything for multipart(resumable) upload.
multipart_update(data, extras)
Add data to the incomplete upload.
upload(location, upload, extras)
Upload file using single stream.
Reader
Bases: StorageService
Service responsible for reading data from the storage.
Storage
internally calls methods of this service. For example,
Storage.stream(data, **kwargs)
results in Reader.stream(data, kwargs)
.
Example
class MyReader(Reader):
def stream(
self, data: FileData, extras: dict[str, Any]
) -> Iterable[bytes]:
return open(data.location, "rb")
content(data, extras)
Return file content as a single byte object.
one_time_link(data, extras)
Return one-time download link.
permanent_link(data, extras)
Return permanent download link.
public_link(data, extras)
Return public link.
range(data, start, end, extras)
Return byte-stream of the file content.
stream(data, extras)
Return byte-stream of the file content.
temporal_link(data, extras)
Return temporal download link.
extras["ttl"] controls lifetime of the link(30 seconds by default).
Manager
Bases: StorageService
Service responsible for maintenance file operations.
Storage
internally calls methods of this service. For example,
Storage.remove(data, **kwargs)
results in Manager.remove(data, kwargs)
.
Example
class MyManager(Manager):
def remove(
self, data: FileData|MultipartData, extras: dict[str, Any]
) -> bool:
os.remove(data.location)
return True
analyze(location, extras)
Return all details about filename.
append(data, upload, extras)
Append content to existing file.
compose(datas, location, extras)
Combine multipe file inside the storage into a new one.
copy(data, location, extras)
Copy file inside the storage.
exists(data, extras)
Check if file exists in the storage.
move(data, location, extras)
Move file to a different location inside the storage.
remove(data, extras)
Remove file from the storage.
scan(extras)
List all locations(filenames) in storage.
add_task(task)
Add task to the current task queue.
This function can be called only inside task queue context. Such context
initialized automatically inside functions decorated with
with_task_queue
:
Example
@with_task_queue
def taks_producer():
add_task(...)
task_producer()
Task queue context can be initialized manually using TaskQueue and
with
statement:
Example
queue = TaskQueue()
with queue:
add_task(...)
queue.process(execution_data)
with_task_queue(func, name=None)
Decorator for functions that schedule tasks.
Decorated function automatically initializes separate task queue which is
processed when function is finished. All tasks receive function's result as
execution data(first argument of Task.run
).
Without this decorator, you have to manually create task queue context before queuing tasks.
Example
@with_task_queue
def my_action(context, data_dict):
...
Task
Bases: ABC
Base task for TaskQueue.
The only requirement for subclasses is implementing Task.run.
extract(source, path)
staticmethod
Extract value from dictionary using FlattenKey from validators.
PARAMETER | DESCRIPTION |
---|---|
source |
dictionary with data
TYPE:
|
path |
path to the extracted member from source
TYPE:
|
Example
data = {"a": {"b": {"c": 42}}}
assert Task.extract(data, ("a", "b", "c")) == 42
run(result, idx, prev)
abstractmethod
Execute task.
result
is an arbitrary data passed into every task in the
queue. idx
reflects current task's position in queue. prev
contains
result of previous task or Task.NO_PREVIOUS_TASK if current task is the
first in the queue.
TaskQueue
Thread-safe context for managing tasks.
Example
queue = TaskQueue()
with queue:
function_that_adds_tasks_to_queue()
data_passed_into_tasks = ...
queue.process(data_passed_into_tasks)
process(data)
Execute queued tasks in FIFO order.
data
is passed to every task as a first argument. In addition, task
receives its position in execution queue and results of the previous
task. The first task receives Task.NO_PREVIOUS_TASK as third argument,
because there are no results from previous task yet.
types
Types for the extension.
config
Configuration readers of the extension.
This module contains functions that simplify accessing configuration option from the CKAN config file.
It's recommended to use these functions istead of accessing config options by name, if you want your code to be more compatible with different versions of the extension.
exc
Exception definitions for the extension.
Hierarchy:
- Exception
- FilesError
- QueueError
- OutOfQueueError
- StorageError
- UnknownAdapterError
- UnknownStorageError
- UnsupportedOperationError
- PermissionError
- MissingFileError
- ExistingFileError
- ExtrasError
- MissingExtrasError
- InvalidStorageConfigurationError
- MissingStorageConfigurationError
- UploadError
- WrongUploadTypeError
- NameStrategyError
- ContentError
- LargeUploadError
- UploadOutOfBoundError
- UploadMismatchError
- UploadTypeMismatchError
- UploadHashMismatchError
- UploadSizeMismatchError
- QueueError
- FilesError