Data
Overview
This service produces the data for collection. Every data service must:
- be
Iterable
- yield all existing records during iteration. I.e, if data service produces
datasets from
package_search
API,list(data)
must contain all datasets from the search index, not only first 10 or 20. - define
total
property, that reflects number of available records so thatlen(list(data)) == data.total
- define
range(start: Any, end: Any)
method that returns slice of the data
Base class for data services - Data
- already contains a simple version of
this logic. Just override compute_data()
and return a sequence with records
from it, to satisfy minimal requirements of the data service.
Example
class MyData(data.Data):
def compute_data(self):
return "abcdefghijklmnopqrstuvwxyz"
>>> col = collection.Collection(data_factory=MyData)
>>> list(col)
["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]
>>> col.data.total
26
>>> col.data.range(-3, None)
"xyz"
Using compute_data
simplifies defining data services, but it's not
required. You can explicitly implement all methods
Example
class MyData(data.Data):
letters = "abcdefghijklmnopqrstuvwxyz"
@property
def total(self):
return len(self.letters)
def __iter__(self):
yield from self.letters
def range(self, start, end):
return self.letters[start:end]
Base Data
class
This class defines a couple of standard helpers in addition to minimal requirements of data service
The most important, it caches result of compute_data
when data or data length
is accessed. Because of it, items and length of the data service are not
updated in runtime.
Example
In the following example, items from data service and its length are not
changed after assigning to items
, because of compute_data
called only
during first access to data. After this point, data service uses cached result
of the first compute_data
call.
class MyData(data.Data):
items = [1, 2, 3]
def compute_data(self):
return self.items
>>> col = collection.Collection(data_factory=MyData)
>>> list(col.data)
[1, 2, 3]
>>> col.data.total
3
>>> col.data.items = [] # (1)!
>>> list(col.data)
[1, 2, 3]
>>> col.data.total
3
- This has no sense, because data is already cached and
items
property will not be used anymore
To reset the cache and use compute_data
again, call refresh_data()
method
of the data service.
>>> col.data.items = "hello"
>>> col.data.refresh_data()
>>> list(col.data)
["h", "e", "l", "l", "o"]
>>> col.data.total
5
Base Data
class expects that compute_data
returns a
collections.abc.Sequence
.
With this expectation it implements range(start, end)
that returns slice of
the data, and at(index)
that returns element with specified index.
Example
class MyData(data.Data):
def compute_data(self):
return "hello world"
>>> col = collection.Collection(data_factory=MyData)
>>> col.data.at(4)
"o"
>>> col.data.range(6, None)
"world"
These methods are also accessible via index operator.
>>> col.data[4]
"o"
>>> col.data[6:]
"world"
If you are not going to rely on compute_data
when extending Data
class,
implement your own caching logic and index-acces, if you need them.
Available data factories
These factories are available at ckanext.collection.shared.data
.
Data
Base data source for collection.
This class defines an outline of the data service. In basic case, sublcass
should override compute_data
method and return a Sequence from it to keep
all methods functional.
Example
class MyData(data.Data):
def compute_data(self):
return range(1, 20)
StaticData
Static data source.
This class produce items from its data
attribute. Use any sequence as a
value for data
during initialization.
Warning
Iteration and size measurement uses cached version of data
. If data
attribute was overriden after service initialization, call
refresh_data()
method of the service to reset the cache.
ATTRIBUTE | DESCRIPTION |
---|---|
data |
sequence of items produced by the service
TYPE:
|
Example
NumericData = data.StaticData.with_attributes(data=range(1, 20))
UppercaseData = data.StaticData.with_attributes(
data="ABCDEFGHIJKLMNOPQRSTUVWXYZ",
)
>>> col = collection.Collection(data_factory=NumericData)
>>> list(col)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
CsvFileData
Data source for CSV file.
CSV file available at path specified by source
attribute of the service
is read into memory and its every row transformed into dictionary.
Warning
Iteration and size measurement uses cached version of source's content.
If source
attribute was overriden or its content was modified after
service initialization, call refresh_data()
method
of the service to reset the cache.
ATTRIBUTE | DESCRIPTION |
---|---|
source |
path to CSV source
TYPE:
|
Example
>>> col = collection.Collection(
>>> data_factory=data.CsvFileData,
>>> data_settings={"source": "/path/to/file.csv"},
>>> )
>>> list(col)
[
{"column_1": "value_1", "column_2": "value_2"},
...
]
ApiData
API data source.
This base class is suitable for building API calls. Its compute_data
makes the single request to the specified API action and yields items from
the response.
Warning
Iteration and size measurement uses cached version of action's result.
To call action again and use the fresh result, use refresh_data()
method
of the service.
ATTRIBUTE | DESCRIPTION |
---|---|
action |
API action that returns the data
TYPE:
|
payload |
parameters passed to the action
TYPE:
|
ignore_auth |
skip authorization checks
TYPE:
|
user |
name of the user for the action. Default:
TYPE:
|
Example
>>> col = collection.Collection(
>>> data_factory=data.ApiData,
>>> data_settings={"action": "group_list_authz", "user": "default"},
>>> )
>>> list(col)
[{...}, {...}]
ApiSearchData
Bases: ApiData[TData, TDataCollection]
API data source optimized for package_search-like actions.
This class expects that API action accepts start
and rows
parameters
that controls offset and limit. And result of the action must contain
count
and results
keys.
This data service can iterate over huge number of items, reading just few of them into the memory at once. It means you can iterate over thousands of datasets in an efficient way without messing with offsets and limits:
packages = collection.Collection(
data_factory=data.ApiSearchData,
data_settings={"action": "package_search"},
)
# the following loop goes over every available package, no matter how may
# of them are available inside search index
for pkg in packages.data: # (1)!
...
# total number of items processed in the previous step is the same as
# number of public datasets
assert packages.data.total == tk.get_action("package_search")({}, {})["count"]
data
service is iterated directly to access all the items. Iterating overpackages
would yield just first 10 items because of pagination.
Warning
Size measurement uses cached number of records. To refresh the total
number of records, call refresh_data()
method of the service. Such
thing can be used if a new item was created after initialization of the
collection
The records are not cached, to reduce memory usage. Every separate
iteration over data service and every range
call initiates a fresh
API request.
Example
>>> col = collection.Collection(
>>> data_factory=data.ApiSearchData,
>>> data_settings={
>>> "action": "package_search",
>>> "payload": {"q": "res_format:CSV"},
>>> },
>>> )
>>> list(col)
[{...}, {...}]
BaseSaData
Abstract data source for SQL statements.
This class can be extended to build data source over SQL statement. Its
compute_data
calls 4 methods:
get_base_statement
: produces initial statementalter_statement(stmt)
: modifies statement or replaces it completelystatement_with_filters(stmt)
: applyWHERE
andHAVING
conditionsstatement_with_sorting(stmt)
: applyORDER BY
These methods do nothing by default, but can be replaced in sublcasses to build SQL statement gradually.
Warning
Final statement produced by compute_data
and total number of results
are cached. Call refresh_data()
method of the service to rebuild the
statement and refresh number of rows.
ATTRIBUTE | DESCRIPTION |
---|---|
use_naive_filters |
search by filterable columns from
TYPE:
|
use_naive_search |
if
TYPE:
|
session |
SQLAlchemy session
TYPE:
|
is_scalar |
return only first column from each row
TYPE:
|
Example
import sqlalchemy as sa
from ckan import model
class UserData(data.BaseSaData):
def get_base_statement(self):
return sa.select(model.User.name)
>>> col = collection.Collection(data_factory=UserData)
>>> list(col)
[("default",), (...,)]
TemporalSaData
Bases: BaseSaData[TStatement, TData, TDataCollection]
Data source that supports pagination by datetime column.
ATTRIBUTE | DESCRIPTION |
---|---|
temporal_column |
column used for pagination
TYPE:
|
Example
This class can be used as a base for SQL statement based data
services. Collection that uses such data service must also use
pager.TemporalPager
instead of the default pager.ClassicPager
.
import sqlalchemy as sa
from datetime import date, timedelta
from ckan import model
class TemporalPackageData(data.TemporalSaData):
def get_base_statement(self):
return sa.select(model.Package.name, model.Package.metadata_created)
>>> col = collection.Collection(
>>> data_factory=TemporalPackageData,
>>> data_settings={"temporal_column": model.Package.metadata_created},
>>> pager_factory=pager.TemporalPager,
>>> pager_settings={"since": datetime.now() - timedelta(days=40)},
>>> )
>>> list(col)
[("pkg1", datetime.datetime(2024, 6, 13, 10, 40, 22, 518511)), ...]
StatementSaData
Bases: BaseSaData[Select, TData, TDataCollection]
Data source for arbitrary SQL statement.
Warning
Final statement produced by compute_data
and total number of results
are cached. Call refresh_data()
method of the service to rebuild the
statement and refresh number of rows.
ATTRIBUTE | DESCRIPTION |
---|---|
statement |
select statement
TYPE:
|
Example
>>> col = collection.Collection(
>>> data_factory=data.StatementSaData,
>>> data_settings={"statement": sa.select(model.User.name)},
>>> )
>>> list(col)
[("default",), (...,)]
UnionSaData
Bases: BaseSaData[Select, TData, TDataCollection]
Data source for multiple SQL statement merged with UNION ALL.
Warning
Final statement produced by compute_data
and total number of results
are cached. Call refresh_data()
method of the service to rebuild the
statement and refresh number of rows.
ATTRIBUTE | DESCRIPTION |
---|---|
statements |
select statements
TYPE:
|
Example
>>> col = collection.Collection(
>>> data_factory=data.UnionSaData,
>>> data_settings={"statements": [
>>> sa.select(model.User.name, sa.literal("user")),
>>> sa.select(model.Package.name, sa.literal("package")),
>>> sa.select(model.Group.name, sa.literal("group")),
>>> ]},
>>> )
>>> list(col)
[("default", "user"),
("warandpeace", "package"),
("my-cool-group", "group")]
ModelData
Bases: BaseSaData[Select, TData, TDataCollection]
Data source for SQLAlchemy model.
Warning
Final statement produced by compute_data
and total number of results
are cached. Call refresh_data()
method of the service to rebuild the
statement and refresh number of rows.
ATTRIBUTE | DESCRIPTION |
---|---|
model |
main model used by data source
TYPE:
|
is_scalar |
return model instance instead of collection of columns. ExampleNon-scalar collection returns rows as tuples of columns
Scalar collection yields model instances
TYPE:
|
static_columns |
select only specified columns. If ExampleNon-scalar collection with
Scalar collection with
TYPE:
|
static_filters |
apply filters to the select statement ExampleFilters are values produced by operations with model columns. The
same thing you'd pass into
TYPE:
|
Example
>>> col = collection.Collection(
>>> data_factory=data.ModelData,
>>> data_settings={"model": model.User, "is_scalar": True},
>>> )
>>> list(col)
[<User ...>, ...]