Skip to content

Accessing data

This page contains examples of building collections using standard utilities and reading data from such collections.

Get records from the collection

Collection definition:

This example uses collection of numbers from 1 to 100

from ckan import model
from ckanext.collection.shared import collection, data, pager

class Example(collection.Collection):
    DataFactory = data.StaticData.with_attributes(data=range(1, 100))

Collection itself is iterable, so you can access records using for-loop:

col = Example()

for pkg in col:
    assert isinstance(pkg, int)

Or you can convert collection into a list:

items = list(col)

When you iterate over collection, only current page of records is yielded. The current page is controlled by pager service. Our example does not configures pager, so we are using default values. Default pager shows the first page, that contains 10 items from the data source.

>>> len(items)
10

To change the page you can initialize a new instance of the collection and modify pager settings

>>> col = Example(pager_settings={"page": 3})
>>> list(col)
[21, 22, 23, 24, 25, 26, 27, 28, 29, 30]

or you can replace pager service of the collection. To achieve it, initialize a new pager instance, pass collection to its constructor as a first positional argument, and specify pager settings as named arguments. You don't need to save a new pager instance inside the collection properties. You don't need to store pager instance inside any variable at all - when initialized, pager(and any other service) is injected into collection automatically.

Collections use pager.ClassicPager by default, and that's what we'll use here

>>> pager.ClassicPager(col, page=2)
>>> list(col)
[11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

If you want to modify size of the page, change value of row_per_page option of the pager. In the following snippet we don't specify page, so it's set to 1

>>> pager.ClassicPager(col, rows_per_page=2)
>>> list(col)
[1, 2]

Instead of pager.ClassicPager, which relies on page number and page size, you can use pager.OffsetPager, which is controlled by limit and offset. Results are the same, only concept is different.

>>> pager.OffsetPager(col, offset=14, limit=3)
>>> list(col)
[15, 16, 17]

If it's not enough, you can access data service of the collection directly as col.data. Data service exposes two ways of accessing the records:

  • data service itself is an iterable, that yields all available records
  • data service has range(start, end) method that returns slice of all available items.

If you want to process all records in a loop, or transform them into a list, data service is exactly what you need

for item in col.data:
    ...

items = list(col.data)

Data service ignores the pager. No matter what page and page size are configured, data service always returns all records when you transform it into a list. Big data sources can consume a lot of memory in this case, so avoid this transformation unless you are sure, your data sample is relatively small.

If you want to receive a slice of data source(and don't want to use pager.OffsetPager which does exactly this thing), you can call col.data.range(start, end). To get 5 items starting from 10th:

>>> list(col.data.range(10, 15))
[11, 12, 13, 14, 15]

range method of the data service is called by collection when it's used as iterable. Collection gets position of the first and last elements from the pager service and passes these parameters to data.range.

Implementation of range is provided by the data service. Some services accepts only positive integers as start and end point. Other services may accept negative offsets, float numbers or even dates. We are using StaticData in this example, which transform range calls into slicing of the interal data. I.e, as we are using squence of numbers as data, when range(x, y) is called, internally it's transformed into sequence[x:y].

It means, we can use negative indexes and None with the example collection

>>> list(col.data.range(-5, None))
[95, 96, 97, 98, 99]

But other data services may not support this type of invocation. It's always a good idea to check specification of the service, before using it.

Get data from DB using SQLAlchemy model

Collection definition:

This example uses collection of all packages from the DB

from ckan import model
from ckanext.collection.shared import collection, data

class Example(collection.Collection):
    DataFactory = data.ModelData.with_attributes(model=model.Package)

When you want fetch data from DB using a single model(something similar to model.Session.query(MODEL)), using data.ModelData is often the simplest option.

This service has one mandatory option - model. data.ModelData pulls all records of the specified model from the DB and, by default, returns tuple-like representation of each record.

>>> col = Example()
>>> list(col)[0]
('id-123', 'name', 'title')

If you prefer working with model instances(model.Package in our example), enable is_scalar option of the data.ModelData service.

>>> col = Example(data_settings={"is_scalar": True})
>>> list(col)[0]
<Package id=123 name=hehe>

To filter records, specify static_filters option of the collection. It accepts a collection of conditions that can be used with Query.filter/Select.where methods of SQLAlchemy.

All datasets with type=dataset that were crated during last 30 days are described by this collection:

from datetime import date, timedelta

filter_type = model.Package.type == "dataset"
filter_date = model.Package.metadata_created > date.today() - timedelta(days=30)

col = Example(
    data_settings={
        "is_scalar": True,
        "static_filters": [filter_type, filter_date],
    },
)

for item in col:
    assert item.type == "dataset"
    assert date.today() - item.metadata_created < timedelta(days=30)

Get data from API using package_search-like action

Collection definition:

This example uses collection of all packages from search API

from ckan import model
from ckanext.collection.shared import collection, data

class Example(collection.Collection):
    DataFactory = data.ApiSearchData.with_attributes(action="package_search")

Any action that accepts start and rows parameters and returns result as dictionary with count and results items, can be used with data.ApiSearchData.

This service automatically moves through results when you iterate over data. And you can loop through data service and process all the items:

col = Example()

for item in col.data:
    ...

If you are working with package_search action directly, you have to write an additional loop that moves offset further and fetches new portion of datasets:

start = 0
search = tk.get_action("package_search")

while True:
    resp = search({}, {"start": start})["results"]

    if not results:
        break

    for item in results:
        ...

    start += len(results)

And with data.ApiSearchData this happens behind the scene and you just access all the items using iteration over data service.

To set the user for the context, or enable ignore_auth flag, pass corresponding options to the data service:

col = Example(data_settings={
    "user": "custom-user",
    "ignore_auth": True,
})

And if you want to override search parameters or set the number of items processed at once, use payload option of data.ApiSearchData.

col = Example(data_settings={
    "payload": {
        "q": "test",
        "fq": "field:value",
        "rows": 100,
    },
})

Get data from DB using arbitrary select statement

Collection definition:

This example uses collection of all packages from search API

import sqlalchemy as sa

from ckan import model
from ckanext.collection.shared import collection, data

class Example(collection.Collection):
    DataFactory = data.StatementSaData

data.StatementSaData is a low-level version of data.ModelData. Instead of working with SQLAlchemy model, it accepts any SQL statement(sqlalchemy.select) and returns all records from it.

Select statement is controlled by statement option of the service.

>>> col = Example(data_settings={
>>>     "statement": sa.select(model.User.name, model.User.sysadmin).where(
>>>         model.User.email.endswith("gmail.com"),
>>>     ),
>>> })
>>> list(col)
[('first-user', False), ('second-user', True)]

Records returned as tuple-like object by default. If you select only one column, you still have to work with tuple-like object that contains a single item. To make your life easier, consider enabling is_scalar flag, to return only first column from every row of the data source.

>>> col = Example(data_settings={
>>>     "statement": sa.select(model.User.name, model.User.sysadmin).where(
>>>         model.User.email.endswith("gmail.com"),
>>>     ),
>>>     "is_scalar": True
>>> })
>>> list(col)
['first-user', 'second-user']

Select statement has no restrictions regarding its size or complexity. If required, use joins, group-by/having, subqueries, CTE, etc.

For example, a bit more complex collection that shows users and resources inside packages created by them.

col = Example(data_settings={
    "statement": sa.select(
        model.User.name,
        sa.func.count(model.Resource.id).label("res_number"),
        sa.func.array_agg(model.Package.name.distinct()).label("packages"),
        sa.func.array_agg(model.Resource.id).label("resources"),
    ).join(
        model.Package, model.Package.creator_user_id == model.User.id,
    ).join(
        model.Resource, model.Package.id == model.Resource.package_id
    ).group_by(model.User.id)
})
>>> list(col)
[('default', 2, ['dataset1'], ['123', '456']),
 ('another-user', 1, ['dataset2'], ['888-888'])]