Skip to content

DatabaseDataSource

The DatabaseDataSource uses SQLAlchemy statements to fetch data from the database. This is the most common data source for production use.

Basic Usage

from sqlalchemy import select

from ckan import model

from ckanext.tables.shared import DatabaseDataSource


data_source = DatabaseDataSource(
    stmt=select(
        model.User.id,
        model.User.email,
        model.User.name,
        model.User.state,
    ).order_by(model.User.created.desc()),
    model=model.User,
)

Definition

A data source that uses a SQLAlchemy statement as the data source.

PARAMETER DESCRIPTION
stmt

The SQLAlchemy statement to use as the data source

TYPE: Select

model

The model class to use for filtering and sorting, e.g. model.User

TYPE: type[Any]

Source code in ckanext/tables/data_sources.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class DatabaseDataSource(BaseDataSource):
    """A data source that uses a SQLAlchemy statement as the data source.

    Args:
        stmt: The SQLAlchemy statement to use as the data source
        model: The model class to use for filtering and sorting, e.g. `model.User`
    """

    def __init__(self, stmt: Select, model: type[Any]):
        self.base_stmt = stmt
        self.stmt = stmt
        self.model = model

    def filter(
        self, field: str | None, operator: str | None, value: str | None
    ) -> Self:
        self.stmt = self.base_stmt

        if field and hasattr(self.model, field) and value and operator:
            col = getattr(self.model, field)
            expr = self.build_filter(col, operator, value)

            if expr is not None:
                self.stmt = self.stmt.where(expr)

        return self

    def build_filter(
        self, column: ColumnElement, operator: str, value: str
    ) -> BinaryExpression | ClauseElement | None:
        try:
            if isinstance(column.type, Boolean):
                casted_value = value.lower() in ("true", "1", "yes", "y")
            elif isinstance(column.type, Integer):
                casted_value = int(value)
            elif isinstance(column.type, DateTime):
                casted_value = datetime.fromisoformat(value)
            else:
                casted_value = str(value)
        except ValueError:
            return None

        operators: dict[
            str,
            Callable[[ColumnElement, Any], BinaryExpression | ClauseElement | None],
        ] = {
            "=": lambda col, val: col == val,
            "<": lambda col, val: col < val,
            "<=": lambda col, val: col <= val,
            ">": lambda col, val: col > val,
            ">=": lambda col, val: col >= val,
            "!=": lambda col, val: col != val,
            "like": lambda col, val: (
                col.ilike(f"%{val}%") if isinstance(val, str) else None
            ),
        }

        func = operators.get(operator)
        return func(column, casted_value) if func else None

    def sort(self, sort_by: str | None, sort_order: str | None) -> Self:
        if not sort_by or not hasattr(self.model, sort_by):
            return self

        col = getattr(self.model, sort_by)

        # Clear existing order_by clauses
        self.stmt = self.stmt.order_by(None)

        if sort_order and sort_order.lower() == "desc":
            self.stmt = self.stmt.order_by(col.desc())
        else:
            self.stmt = self.stmt.order_by(col.asc())

        return self

    def paginate(self, page: int, size: int) -> Self:
        if page and size:
            self.stmt = self.stmt.limit(size).offset((page - 1) * size)

        return self

    def all(self):
        return [dict(row) for row in model.Session.execute(self.stmt).mappings().all()]

    def count(self):
        return model.Session.execute(
            select(func.count()).select_from(self.stmt.subquery())
        ).scalar_one()

Best Practices

  • Use selective queries
  • Add database indexes for commonly filtered/sorted columns
# Good: Select only needed columns
stmt = select(model.User.id, model.User.name, model.User.email)

# Avoid: Selecting everything
stmt = select(model.User)