Skip to content

Data

Overview

This service produces the data for collection. Every data service must:

  • be Iterable
  • yield all existing records during iteration. I.e, if data service produces datasets from package_search API, list(data) must contain all datasets from the search index, not only first 10 or 20.
  • define total property, that reflects number of available records so that len(list(data)) == data.total
  • define range(start: Any, end: Any) method that returns slice of the data

Base class for data services - Data - already contains a simple version of this logic. Just override compute_data() and return a sequence with records from it, to satisfy minimal requirements of the data service.

Example

class MyData(data.Data):
    def compute_data(self):
        return "abcdefghijklmnopqrstuvwxyz"
>>> col = collection.Collection(data_factory=MyData)
>>> list(col)
["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
>>> col.data.total
26
>>> col.data.range(-3, None)
"xyz"

Using compute_data simplifies defining data services, but it's not required. You can explicitly implement all methods

Example

class MyData(data.Data):
    letters = "abcdefghijklmnopqrstuvwxyz"

    @property
    def total(self):
        return len(self.letters)

    def __iter__(self):
        yield from self.letters

    def range(self, start, end):
        return self.letters[start:end]

Base Data class

This class defines a couple of standard helpers in addition to minimal requirements of data service

The most important, it caches result of compute_data when data or data length is accessed. Because of it, items and length of the data service are not updated in runtime.

Example

In the following example, items from data service and its length are not changed after assigning to items, because of compute_data called only during first access to data. After this point, data service uses cached result of the first compute_data call.

class MyData(data.Data):
    items = [1, 2, 3]

    def compute_data(self):
      return self.items
>>> col = collection.Collection(data_factory=MyData)
>>> list(col.data)
[1, 2, 3]
>>> col.data.total
3
>>> col.data.items = [] # (1)!
>>> list(col.data)
[1, 2, 3]
>>> col.data.total
3

  1. This has no sense, because data is already cached and items property will not be used anymore

To reset the cache and use compute_data again, call refresh_data() method of the data service.

>>> col.data.items = "hello"
>>> col.data.refresh_data()
>>> list(col.data)
["h", "e", "l", "l", "o"]
>>> col.data.total
5

Base Data class expects that compute_data returns a collections.abc.Sequence. With this expectation it implements range(start, end) that returns slice of the data, and at(index) that returns element with specified index.

Example

class MyData(data.Data):
    def compute_data(self):
       return "hello world"
>>> col = collection.Collection(data_factory=MyData)
>>> col.data.at(4)
"o"
>>> col.data.range(6, None)
"world"

These methods are also accessible via index operator.

>>> col.data[4]
"o"
>>> col.data[6:]
"world"

If you are not going to rely on compute_data when extending Data class, implement your own caching logic and index-acces, if you need them.

Available data factories

These factories are available at ckanext.collection.shared.data.

Data

Base data source for collection.

This class defines an outline of the data service. In basic case, sublcass should override compute_data method and return a Sequence from it to keep all methods functional.

Example
class MyData(data.Data):
    def compute_data(self):
        return range(1, 20)

StaticData

Static data source.

This class produce items from its data attribute. Use any sequence as a value for data during initialization.

Warning

Iteration and size measurement uses cached version of data. If data attribute was overriden after service initialization, call refresh_data() method of the service to reset the cache.

ATTRIBUTE DESCRIPTION
data

sequence of items produced by the service

TYPE: Iterable[TData]

Example

NumericData = data.StaticData.with_attributes(data=range(1, 20))

UppercaseData = data.StaticData.with_attributes(
    data="ABCDEFGHIJKLMNOPQRSTUVWXYZ",
)
>>> col = collection.Collection(data_factory=NumericData)
>>> list(col)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

CsvFileData

Data source for CSV file.

CSV file available at path specified by source attribute of the service is read into memory and its every row transformed into dictionary.

Warning

Iteration and size measurement uses cached version of source's content. If source attribute was overriden or its content was modified after service initialization, call refresh_data() method of the service to reset the cache.

ATTRIBUTE DESCRIPTION
source

path to CSV source

TYPE: str

Example
>>> col = collection.Collection(
>>>     data_factory=data.CsvFileData,
>>>     data_settings={"source": "/path/to/file.csv"},
>>> )
>>> list(col)
[
    {"column_1": "value_1", "column_2": "value_2"},
    ...
]

ApiData

API data source.

This base class is suitable for building API calls. Its compute_data makes the single request to the specified API action and yields items from the response.

Warning

Iteration and size measurement uses cached version of action's result. To call action again and use the fresh result, use refresh_data() method of the service.

ATTRIBUTE DESCRIPTION
action

API action that returns the data

TYPE: str

payload

parameters passed to the action

TYPE: dict[str, Any]

ignore_auth

skip authorization checks

TYPE: bool

user

name of the user for the action. Default: tk.current_user.name

TYPE: str

Example
>>> col = collection.Collection(
>>>     data_factory=data.ApiData,
>>>     data_settings={"action": "group_list_authz", "user": "default"},
>>> )
>>> list(col)
[{...}, {...}]

ApiSearchData

Bases: ApiData[TData, TDataCollection]

API data source optimized for package_search-like actions.

This class expects that API action accepts start and rows parameters that controls offset and limit. And result of the action must contain count and results keys.

This data service can iterate over huge number of items, reading just few of them into the memory at once. It means you can iterate over thousands of datasets in an efficient way without messing with offsets and limits:

packages = collection.Collection(
    data_factory=data.ApiSearchData,
    data_settings={"action": "package_search"},
)

# the following loop goes over every available package, no matter how may
# of them are available inside search index
for pkg in packages.data: # (1)!
    ...

# total number of items processed in the previous step is the same as
# number of public datasets
assert packages.data.total == tk.get_action("package_search")({}, {})["count"]
  1. data service is iterated directly to access all the items. Iterating over packages would yield just first 10 items because of pagination.
Warning

Size measurement uses cached number of records. To refresh the total number of records, call refresh_data() method of the service. Such thing can be used if a new item was created after initialization of the collection

The records are not cached, to reduce memory usage. Every separate iteration over data service and every range call initiates a fresh API request.

Example
>>> col = collection.Collection(
>>>     data_factory=data.ApiSearchData,
>>>     data_settings={
>>>         "action": "package_search",
>>>         "payload": {"q": "res_format:CSV"},
>>>     },
>>> )
>>> list(col)
[{...}, {...}]

BaseSaData

Abstract data source for SQL statements.

This class can be extended to build data source over SQL statement. Its compute_data calls 4 methods:

  • get_base_statement: produces initial statement
  • alter_statement(stmt): modifies statement or replaces it completely
  • statement_with_filters(stmt): apply WHERE and HAVING conditions
  • statement_with_sorting(stmt): apply ORDER BY

These methods do nothing by default, but can be replaced in sublcasses to build SQL statement gradually.

Warning

Final statement produced by compute_data and total number of results are cached. Call refresh_data() method of the service to rebuild the statement and refresh number of rows.

ATTRIBUTE DESCRIPTION
use_naive_filters

search by filterable columns from params. Default: true

TYPE: bool

use_naive_search

if params contains q, ILIKE it against searchable columns. Default: true

TYPE: bool

session

SQLAlchemy session

TYPE: AlchemySession

is_scalar

return only first column from each row

TYPE: bool

Example
import sqlalchemy as sa
from ckan import model

class UserData(data.BaseSaData):
    def get_base_statement(self):
        return sa.select(model.User.name)
>>> col = collection.Collection(data_factory=UserData)
>>> list(col)
[("default",), (...,)]

TemporalSaData

Bases: BaseSaData[TStatement, TData, TDataCollection]

Data source that supports pagination by datetime column.

ATTRIBUTE DESCRIPTION
temporal_column

column used for pagination

TYPE: ColumnElement[Any]

Example

This class can be used as a base for SQL statement based data services. Collection that uses such data service must also use pager.TemporalPager instead of the default pager.ClassicPager.

import sqlalchemy as sa
from datetime import date, timedelta
from ckan import model

class TemporalPackageData(data.TemporalSaData):
    def get_base_statement(self):
        return sa.select(model.Package.name, model.Package.metadata_created)
>>> col = collection.Collection(
>>>     data_factory=TemporalPackageData,
>>>     data_settings={"temporal_column": model.Package.metadata_created},
>>>     pager_factory=pager.TemporalPager,
>>>     pager_settings={"since": datetime.now() - timedelta(days=40)},
>>> )
>>> list(col)
[("pkg1", datetime.datetime(2024, 6, 13, 10, 40, 22, 518511)), ...]

StatementSaData

Bases: BaseSaData[Select, TData, TDataCollection]

Data source for arbitrary SQL statement.

Warning

Final statement produced by compute_data and total number of results are cached. Call refresh_data() method of the service to rebuild the statement and refresh number of rows.

ATTRIBUTE DESCRIPTION
statement

select statement

TYPE: Select

Example
>>> col = collection.Collection(
>>>     data_factory=data.StatementSaData,
>>>     data_settings={"statement": sa.select(model.User.name)},
>>> )
>>> list(col)
[("default",), (...,)]

UnionSaData

Bases: BaseSaData[Select, TData, TDataCollection]

Data source for multiple SQL statement merged with UNION ALL.

Warning

Final statement produced by compute_data and total number of results are cached. Call refresh_data() method of the service to rebuild the statement and refresh number of rows.

ATTRIBUTE DESCRIPTION
statements

select statements

TYPE: Select

Example
>>> col = collection.Collection(
>>>     data_factory=data.UnionSaData,
>>>     data_settings={"statements": [
>>>         sa.select(model.User.name, sa.literal("user")),
>>>         sa.select(model.Package.name, sa.literal("package")),
>>>         sa.select(model.Group.name, sa.literal("group")),
>>>     ]},
>>> )
>>> list(col)
[("default", "user"),
("warandpeace", "package"),
("my-cool-group", "group")]

ModelData

Bases: BaseSaData[Select, TData, TDataCollection]

Data source for SQLAlchemy model.

Warning

Final statement produced by compute_data and total number of results are cached. Call refresh_data() method of the service to rebuild the statement and refresh number of rows.

ATTRIBUTE DESCRIPTION
model

main model used by data source

TYPE: Any

is_scalar

return model instance instead of collection of columns.

Example

Non-scalar collection returns rows as tuples of columns

>>> col = collection.Collection(
>>>     data_factory=data.ModelData,
>>>     data_settings={
>>>         "model": model.User,
>>>         "is_scalar": False,
>>>     },
>>> )
>>> list(col)
[("id-123-123", "user-name", ...), ...]

Scalar collection yields model instances

>>> col = collection.Collection(
>>>     data_factory=data.ModelData,
>>>     data_settings={
>>>         "model": model.User,
>>>         "is_scalar": True,
>>>     },
>>> )
>>> list(col)
[<User id=id-123-123 name=user-name>, ...]

TYPE: Any

static_columns

select only specified columns. If is_scalar flag enabled, only first columns from this list is returned.

Example

Non-scalar collection with static_columns produces tuples with values

>>> col = collection.Collection(
>>>     data_factory=data.ModelData,
>>>     data_settings={
>>>         "model": model.User,
>>>         "is_scalar": False,
>>>         "static_columns": [model.User.name, model.User.sysadmin],
>>>     },
>>> )
>>> list(col)
[("default", True), ...]

Scalar collection with static_columns yields values of the first column

>>> col = collection.Collection(
>>>     data_factory=data.ModelData,
>>>     data_settings={
>>>         "model": model.User,
>>>         "is_scalar": True,
>>>         "static_columns": [model.User.name, model.User.sysadmin],
>>>     },
>>> )
>>> list(col)
["default", ...]

TYPE: list[Column[Any] | Label[Any]]

static_filters

apply filters to the select statement

Example

Filters are values produced by operations with model columns. The same thing you'd pass into .filter/.where methods.

>>> col = collection.Collection(
>>>     data_factory=data.ModelData,
>>>     data_settings={
>>>         "model": model.User,
>>>         "is_scalar": True,
>>>         "static_filters": [model.User.sysadmin == True],
>>>     },
>>> )
>>> list(col)
[<User sysadmin=True ...>, ...]

TYPE: list[Any]

Example
>>> col = collection.Collection(
>>>     data_factory=data.ModelData,
>>>     data_settings={"model": model.User, "is_scalar": True},
>>> )
>>> list(col)
[<User ...>, ...]