Skip to content

Task queue

One of the challenges introduced by independently managed files is related to file ownership. As long as you can call files_transfer_ownership manually, things are transparent. But as soon as you add custom file field to dataset, you probably want to automatically transfer ownership of the file refered by this custom field.

Imagine, that you have PDF file owned by you. And you specify ID of this file in the attachment_id field of the dataset. You want to show download link for this file on the dataset page. But if file owned by you, nobody will be able to download the file. So you decide to transfer file ownership to dataset, so that anyone who sees dataset, can see the file as well.

You cannot update dataset and transfer ownership after it, because there will be a time window between these two actions, when data is not valid. Or even worse, after updating dataset you'll lose internet connection and won't be able to finish the transfer.

Neither you can transfer ownership first and then update the dataset. attachment_id may have additional validators and you don't know in advance, whether you'll be able to successfully update dataset after the transfer.

This problem can be solved via queuing additional tasks inside the action. For example, validator that checks if certain file ID can be used as attachment_id can queue ownership transfer. If dataset update completed without errors, queued task is executed automatically and dataset becomes the owner of the file.

Task is queued via ckanext.files.shared.add_task function, which accepts objects inherited from ckanext.files.shared.Task. Task class requires implementing abstract method run(result: Any, idx: int, prev: Any), which is called when task is executed. This method receives the result of action which caused task execution, task's position in queue and the result of previous task.

Example

One of attachment_id validators can queue the following MyTask via add_task(MyTask(file_id)) to transfer file_id ownership to the updated dataset:

from ckanext.files.shared import Task

class MyTask(Task):
    def __init__(self, file_id):
        self.file_id = file_id

    def run(self, dataset, idx, prev):
        return tk.get_action("files_transfer_ownership")(
            {"ignore_auth": True},
            {
                "id": self.file_id,
                "owner_type": "package",
                "owner_id": dataset["id"],
                "pin": True,
            },
        )

As the first argument, Task.run receives the result of action which was called. Right now only following actions support tasks:

  • package_create
  • packaage_update
  • resource_create
  • resource_update
  • group_create
  • group_update
  • organization_create
  • organization_update
  • user_create
  • user_update

If you want to enable tasks support for your custom action, decorate it with ckanext.files.shared.with_task_queue decorator:

from ckanext.files.shared import with_task_queue

@with_task_queue
def my_action(context, data_dict)
    # you can call `add_task` inside this action's stack frame.
    ...

Good example of validator using tasks is files_transfer_ownership validator factory. It can be added to metadata schema as files_transfer_ownership(owner_type, name_of_id_field). For example, if you are adding this validator to resource, call it as files_transfer_ownership("resource", "id"). The second argument is the name of the ID field. As in most cases it's id, you can omit the second argument:

  • organization: files_transfer_ownership("organization")
  • dataset: files_transfer_ownership("package")
  • user: files_transfer_ownership("user")