Skip to content

Resource Data Sources

Resource data sources load tabular data from CKAN resources. They are used automatically by the Resource View feature but can also be instantiated manually when you need to build a table backed by a resource file or the Datastore.

All file-based sources extend BaseResourceDataSource, which handles source path resolution (local upload vs. remote URL) and pluggable caching. The CKAN Datastore source (DataStoreDataSource) is separate and queries the Datastore API directly without any caching.


File-based sources

CsvUrlDataSource

Reads a CSV file from a local path or remote URL. The delimiter is detected automatically.

from ckanext.tables.shared import CsvUrlDataSource

# From a direct URL
source = CsvUrlDataSource(url="https://example.com/data.csv")

# From a CKAN resource dict (resolves upload path automatically)
source = CsvUrlDataSource(resource=resource_dict)
Source code in ckanext/tables/data_sources.py
414
415
416
417
418
419
420
class CsvUrlDataSource(BaseResourceDataSource):
    def fetch_dataframe(self) -> pd.DataFrame:
        try:
            return pd.read_csv(self.get_source_path(), sep=None, engine="python")
        except Exception:
            log.exception("Error fetching CSV from %s", self.get_source_path())
            return pd.DataFrame()

XlsxUrlDataSource

Reads the first sheet of an Excel workbook (.xlsx).

from ckanext.tables.shared import XlsxUrlDataSource

source = XlsxUrlDataSource(url="https://example.com/report.xlsx")
Source code in ckanext/tables/data_sources.py
423
424
425
426
427
428
429
class XlsxUrlDataSource(BaseResourceDataSource):
    def fetch_dataframe(self) -> pd.DataFrame:
        try:
            return pd.read_excel(self.get_source_path())
        except Exception:
            log.exception("Error fetching XLSX from %s", self.get_source_path())
            return pd.DataFrame()

OrcUrlDataSource

Reads an Apache ORC columnar file.

from ckanext.tables.shared import OrcUrlDataSource

source = OrcUrlDataSource(url="https://example.com/data.orc")
Source code in ckanext/tables/data_sources.py
432
433
434
435
436
437
438
class OrcUrlDataSource(BaseResourceDataSource):
    def fetch_dataframe(self) -> pd.DataFrame:
        try:
            return pd.read_orc(self.get_source_path())
        except Exception:
            log.exception("Error fetching ORC from %s", self.get_source_path())
            return pd.DataFrame()

ParquetUrlDataSource

Reads an Apache Parquet columnar file.

from ckanext.tables.shared import ParquetUrlDataSource

source = ParquetUrlDataSource(url="https://example.com/data.parquet")
Source code in ckanext/tables/data_sources.py
441
442
443
444
445
446
447
class ParquetUrlDataSource(BaseResourceDataSource):
    def fetch_dataframe(self) -> pd.DataFrame:
        try:
            return pd.read_parquet(self.get_source_path())
        except Exception:
            log.exception("Error fetching Parquet from %s", self.get_source_path())
            return pd.DataFrame()

FeatherUrlDataSource

Reads an Apache Arrow Feather file.

from ckanext.tables.shared import FeatherUrlDataSource

source = FeatherUrlDataSource(url="https://example.com/data.feather")
Source code in ckanext/tables/data_sources.py
450
451
452
453
454
455
456
class FeatherUrlDataSource(BaseResourceDataSource):
    def fetch_dataframe(self) -> pd.DataFrame:
        try:
            return pd.read_feather(self.get_source_path())
        except Exception:
            log.exception("Error fetching Feather from %s", self.get_source_path())
            return pd.DataFrame()

Caching

All file-based sources inherit from BaseResourceDataSource, which caches the fetched DataFrame to avoid re-downloading on every request. The cache backend and TTL are controlled globally via configuration (see Configuration):

ckanext.tables.cache.backend = pickle   # or "redis"
ckanext.tables.cache.pickle.cache_dir = /var/cache/ckanext-tables

You can override the backend or TTL per instance:

from ckanext.tables.shared import CsvUrlDataSource, RedisCacheBackend

source = CsvUrlDataSource(
    url="https://example.com/data.csv",
    cache_backend=RedisCacheBackend(),
    cache_ttl=120,  # seconds
)

A data source that loads resource data from a file or URL.

The cache backend defaults to the value of ckanext.tables.cache.backend ("pickle" by default). Pass an explicit cache_backend to override for a specific instance.

Override cache_ttl on a subclass or pass it to the constructor to change the expiry.

PARAMETER DESCRIPTION
url

Direct URL to fetch data from.

TYPE: str | None DEFAULT: None

resource

The CKAN resource dictionary.

TYPE: dict[str, Any] | None DEFAULT: None

cache_backend

Override the configured cache backend for this instance.

TYPE: CacheBackend | None DEFAULT: None

cache_ttl

Override the default TTL (seconds) for this instance.

TYPE: int DEFAULT: 600

Source code in ckanext/tables/data_sources.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class BaseResourceDataSource(CachedDataSourceMixin, PandasDataSource):
    """A data source that loads resource data from a file or URL.

    The cache backend defaults to the value of ``ckanext.tables.cache.backend``
    (``"pickle"`` by default). Pass an explicit *cache_backend* to
    override for a specific instance.

    Override ``cache_ttl`` on a subclass or pass it to the constructor to
    change the expiry.

    Args:
        url: Direct URL to fetch data from.
        resource: The CKAN resource dictionary.
        cache_backend: Override the configured cache backend for this instance.
        cache_ttl: Override the default TTL (seconds) for this instance.
    """

    def __init__(
        self,
        url: str | None = None,
        resource: dict[str, Any] | None = None,
        cache_backend: CacheBackend | None = None,
        cache_ttl: int = 600,
    ):
        super().__init__()

        if not url and not resource:
            raise ValueError(  # noqa: TRY003
                "Either url or resource_id must be provided"
            )

        self.url = url
        self.resource = resource
        self._source_path: str = ""
        self.cache_backend = cache_backend if cache_backend is not None else get_cache_backend()

        if cache_ttl is not None:
            self.cache_ttl = cache_ttl

    def get_cache_key(self) -> str:
        return f"resource-{self.resource['id']}" if self.resource else f"url-{self.url}"

    def get_source_path(self) -> str:
        if self._source_path:
            return self._source_path

        if self.resource:
            try:
                if self.resource.get("url_type") == "upload":
                    upload = uploader.get_resource_uploader(self.resource)
                    self._source_path = upload.get_path(self.resource["id"])
                    return self._source_path

                if self.resource.get("url"):
                    self._source_path = self.resource["url"]
                    return self._source_path

            except (OSError, TypeError, tk.ValidationError, tk.ObjectNotFound):
                log.warning(
                    "Failed to resolve path for resource %s, falling back to provided url",
                    self.resource_id,  # noqa: TRY003
                    exc_info=True,
                )

        if self.url:
            self._source_path = self.url
            return self._source_path

        raise ValueError("Could not resolve source path")  # noqa: TRY003

DataStoreDataSource

Queries the CKAN Datastore API directly. This source is used automatically when a resource has datastore_active = True. It does not use any caching, as the data is already stored in the database.

from ckanext.tables.shared import DataStoreDataSource

source = DataStoreDataSource(resource_id="<resource-id>")

Filtering, sorting, and pagination are translated into datastore_search parameters:

  • = → exact match filter
  • like → full-text search (partial word match via PostgreSQL FTS with :*)
  • Other comparison operators are not supported by datastore_search and are silently ignored.

Note

DataStoreDataSource requires the datastore plugin to be enabled in CKAN. If it is not active, all methods return empty results gracefully.

A data source that fetches records directly from the CKAN Datastore.

Source code in ckanext/tables/data_sources.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
class DataStoreDataSource(BaseDataSource):
    """A data source that fetches records directly from the CKAN Datastore."""

    def __init__(self, resource_id: str):
        self.resource_id = resource_id

        self._filters: dict[str, Any] = {}
        self._q: dict[str, str] = {}
        self._sort: str | None = None
        self._limit: int | None = None
        self._offset: int | None = None

        self._datastore_enabled = "datastore" in tk.g.plugins

    def filter(self, filters: list[FilterItem]) -> Self:
        self._filters = {}
        self._q = {}

        for filter_item in filters:
            if filter_item.operator == "=":
                self._filters[filter_item.field] = filter_item.value
            elif filter_item.operator == "like":
                # replace non-alphanumeric characters (except dots) with FTS wildcard (_)
                v = str(filter_item.value)
                v = re.sub(r"[^\w\-\.]+", "_", v)
                # append ':*' so we can do partial FTS searches
                self._q[filter_item.field] = v + ":*"
            # Other operators like <, > might require datastore_search_sql
            # which is more complex, so we skip them unless necessary.
        return self

    def sort(self, sort_by: str | None, sort_order: str | None) -> Self:
        self._sort = f"{sort_by} {sort_order or 'asc'}" if sort_by else None
        return self

    def paginate(self, page: int, size: int) -> Self:
        if page and size:
            self._limit = size
            self._offset = (page - 1) * size
        return self

    def all(self) -> list[dict[str, Any]]:
        if not self._datastore_enabled:
            return []

        data_dict: dict[str, Any] = {"resource_id": self.resource_id}

        if self._filters:
            data_dict["filters"] = self._filters

        if self._q:
            data_dict["q"] = json.dumps(self._q)
            data_dict["plain"] = False
            data_dict["language"] = "simple"

        if self._sort:
            data_dict["sort"] = self._sort

        if self._limit is not None:
            data_dict["limit"] = self._limit

        if self._offset is not None:
            data_dict["offset"] = self._offset

        try:
            result = tk.get_action("datastore_search")({}, data_dict)
            return result.get("records", [])
        except (tk.ObjectNotFound, tk.NotAuthorized):
            return []

    def count(self) -> int:
        if not self._datastore_enabled:
            return 0

        data_dict: dict[str, Any] = {"resource_id": self.resource_id, "limit": 0}

        if self._filters:
            data_dict["filters"] = self._filters

        if self._q:
            data_dict["q"] = json.dumps(self._q)
            data_dict["plain"] = False
            data_dict["language"] = "simple"

        try:
            result = tk.get_action("datastore_search")({}, data_dict)
            return result.get("total", 0)
        except (tk.ObjectNotFound, tk.NotAuthorized):
            return 0

    def get_columns(self) -> list[str]:
        if not self._datastore_enabled:
            return []

        data_dict = {"resource_id": self.resource_id, "limit": 0}

        try:
            result = tk.get_action("datastore_search")({}, data_dict)
            return [f["id"] for f in result.get("fields", []) if f["id"] != "_id"]
        except (tk.ObjectNotFound, tk.NotAuthorized):
            return []