Skip to content

Task queue

Warning

This feature is experimental and may change in future. Keep track of changelog if you are using tasks or validators that schedule tasks, like files_transfer_ownership.

One of the challenges introduced by independently managed files is related to file ownership. As long as you can call files_transfer_ownership manually, things are transparent. But as soon as you add custom file field to dataset, you probably want to automatically transfer ownership of the file refered by this custom field.

Imagine, that you have PDF file owned by you. One day you specify ID of this file in the attachment_id field of the dataset and now download link for this file is shown on the dataset page. But if file owned by you, nobody else can download the the file. So you decide to transfer file ownership to dataset, allowing anyone who sees dataset, see the file as well.

You prefer to avoid two independent API calls: update dataset and transfer ownership after it, because there will be a time window between these two actions, when data is not valid. Or even worse, after updating dataset you'll lose internet connection and won't be able to finish the transfer.

Neither you can transfer ownership first and then update the dataset. attachment_id may have additional validators and you don't know in advance, whether you'll be able to successfully update dataset after the transfer.

This problem can be solved via queuing additional tasks inside the action. For example, validator that checks whether certain file ID can be used as an attachment_id, also can schedule the ownership transfer. If dataset update completed without errors, queued task is executed automatically and dataset becomes the owner of the file.

Task is queued via ckanext.files.shared.add_task function, which accepts callables that represent the task. Task callable has signature (result: Any, idx: int, prev: Any) -> Any. It receives the result of action which caused task execution, task's position in queue and the result of previous task.

Example

One of attachment_id validators can queue the following tak via add_task(transfer_attachment_to_dataset_task) if attachment_id field of the dataset contains the ID of the file that requires transfer after successful dataset modification:

def transfer_attachment_to_dataset_task(dataset: dict[str, Any], idx: int, prev: Any) -> Any:
    return tk.get_action("files_transfer_ownership")(
        {"ignore_auth": True},
        {
            "id": dataset["attachment_id"],
            "owner_type": "package",
            "owner_id": dataset["id"],
            "pin": True,
        },
    )

As the first argument, the task receives the result of action which was called. Right now only following actions support tasks:

  • package_create
  • packaage_update
  • resource_create
  • resource_update
  • group_create
  • group_update
  • organization_create
  • organization_update
  • user_create
  • user_update

If you want to enable tasks support for your custom action, decorate it with ckanext.files.shared.with_task_queue decorator:

from ckanext.files.shared import with_task_queue

@with_task_queue
def my_action(context, data_dict)
    # you can call `add_task` inside this action's stack frame.
    ...

Good example of validator using tasks is files_transfer_ownership validator factory. It can be added to metadata schema as files_transfer_ownership(owner_type, name_of_id_field). For example, if you are adding this validator to resource, call it as files_transfer_ownership("resource", "id"). The second argument is the name of the ID field. As in most cases it's id, you can omit it and rely on the default value:

  • organization: files_transfer_ownership("organization")
  • dataset: files_transfer_ownership("package")
  • user: files_transfer_ownership("user")

Alternative

Instead of using tasks, you can use CKAN Signals. Create a listener for the action_succeeded signal and transfer ownership inside the listener.

This approach resembles logic of the activity extension that creates activity records after API action finishes its execution.