bootstack.data.BaseDataSource#

class bootstack.data.BaseDataSource(page_size=10)#

Bases: ABC

Abstract base class for datasource implementations.

Provides shared utilities and enforces the DataSourceProtocol interface through abstract methods. Subclasses implement storage-specific logic while inheriting common functionality.

Parameters:

page_size (int) – Number of records per page (default: 10)

page_size#

Current page size setting

Example

class RedisDataSource(BaseDataSource):
    def __init__(self, redis_client, page_size=10):
        super().__init__(page_size)
        self.redis = redis_client

    def load(self, records):
        pass  # Redis-specific implementation

    def page(self, page=None):
        pass  # Redis-specific implementation
abstract property count: int#

Total number of records matching the current filter.

property id_field: str#

Name of the record field that holds the stable row identity.

abstract property selected_count: int#

Number of selected records.

close()#

Release any resources held by this data source.

The default is a no-op — in-memory sources hold none. Sources backed by a database connection or file handle override this to release it. Safe to call more than once.

abstractmethod delete(record_id)#

Delete record by ID.

Parameters:

record_id (Any) – Unique identifier of the record

Returns:

True if record was deleted, False if not found

Return type:

bool

abstractmethod deselect(record_id)#

Mark record as unselected.

Parameters:

record_id (Any) – Unique identifier of the record

Returns:

True if record was deselected, False if not found

Return type:

bool

abstractmethod deselect_all(current_page_only=False)#

Deselect all records (optionally only current page).

Parameters:

current_page_only (bool) – If True, deselect only records on current page

Returns:

Number of records deselected

Return type:

int

export_csv(filepath, include_all=True)#

Export records to a CSV file (streamed).

Parameters:
  • filepath (str) – Path to output CSV file

  • include_all (bool) – If True, export all records; if False, export only selected

abstractmethod get(record_id)#

Retrieve single record by ID.

Parameters:

record_id (Any) – Unique identifier of the record

Returns:

Record dictionary or None if not found

Return type:

Record | None

abstractmethod has_next_page()#

Check if more pages exist after current page.

Returns:

True if next page exists, False otherwise

Return type:

bool

abstractmethod insert(record)#

Create new record and return its ID.

Parameters:

record (Dict[str, Any]) – Dictionary with record data

Returns:

The ID assigned to the new record

Return type:

int

abstractmethod is_selected(record_id)#

Check whether a record is currently selected.

Parameters:

record_id (Any) – Unique identifier of the record

Returns:

True if the record is selected, False otherwise (including missing records)

Return type:

bool

abstractmethod load(records)#

Load data records into the datasource.

Parameters:

records (Sequence[Primitive] | Sequence[Mapping[str, Any]]) – Sequence of records (dicts) or primitives (auto-wrapped)

Returns:

Self for method chaining

Return type:

BaseDataSource

move(record_id, target_index)#

Reorder a record to a new position.

Default returns False (not supported). Subclasses that maintain an explicit ordering should override.

Parameters:
  • record_id (Any) – Unique identifier of the record to move

  • target_index (int) – Zero-based destination index (clamped to valid range)

Returns:

True if the record was moved, False if not supported or not found

Return type:

bool

abstractmethod next_page()#

Advance to next page and return its records.

Returns:

List of record dictionaries for the new page

Return type:

List[Record]

observe(condition=None, *order)#

Observe a live result set for a where/order query.

Returns a Stream that emits the matching records immediately, then a fresh result set whenever a relevant change occurs. Each subscriber observes its own slice — declare the query once, react to its results over time (the “observable query” pattern).

Selection toggles do not re-emit (selection is not a row-set change). Unlike where/order, observing does not disturb the source’s own pagination view.

Performance: each relevant change re-runs the whole query and re-emits the full result set. Use observe for small derived sets — dashboard metrics, a short pinned list, a filtered side panel. For large or virtualized views (Table, ListView) do NOT observe the full set; bind those widgets to the source directly — they listen via on_change and refetch only their visible window with page/page_slice.

Parameters:
  • condition (Condition | None) – Filter condition built with col (or None for all rows).

  • order (str | Column | SortKey) – Sort keys — column names, "-name" for descending, or col(...) specs.

Returns:

A Stream of result sets (each a list of record dictionaries).

Return type:

Any

Example

ds.observe(col("status") == "active", "-created").listen(
    lambda rows: gauge.set_value(len(rows))
)
on_change(handler=None)#

Subscribe to changes to this source.

Call with no argument to get a composable Stream of coarse change events; chain map/filter/debounce and listen to drive any widget (for example, a dashboard badge bound to the row count). Call with a handler to subscribe directly and get back a cancellable subscription.

The handler receives a DataChangeEvent. Rapid mutations are coalesced into a single notification per event-loop turn, and mutations made from a background thread are delivered on the main thread automatically — so a bound widget can refresh from a worker-thread feed with no extra work.

Parameters:

handler (Callable[[Any], Any] | None) – Change handler. Omit to receive a Stream instead.

Returns:

A Stream when handler is omitted, otherwise a cancellable subscription handle.

Return type:

Any

Example

ds.on_change(lambda e: print("changed:", e.kind))

# Feed a dashboard badge with the live row count.
ds.on_change().map(lambda e: ds.count).listen(badge.set_value)
abstractmethod order(*keys)#

Sort rows by one or more keys.

Parameters:

keys (str | Column | SortKey) – Column names ("name"), descending names ("-name"), or col(...) specs. With no arguments, clears sorting.

Returns:

Self for method chaining

Return type:

BaseDataSource

abstractmethod page(page=None)#

Get records for specified page (or current page if None).

Parameters:

page (int | None) – Page number (0-indexed); updates current page if provided

Returns:

List of record dictionaries for the page

Return type:

List[Record]

abstractmethod page_slice(start_index, count)#

Get records by start index and count (respects filter/sort).

Parameters:
  • start_index (int) – Starting record index

  • count (int) – Number of records to retrieve

Returns:

List of record dictionaries

Return type:

List[Record]

abstractmethod prev_page()#

Move to previous page and return its records.

Returns:

List of record dictionaries for the new page

Return type:

List[Record]

reload()#

Re-read data from the underlying source.

Default is a no-op suitable for in-memory implementations. File- and database-backed sources should override to re-query.

save(path, *, selected_only=False, format=None, config=None)#

Export records to a file, choosing the format by extension.

Records are streamed into the writer, so a large export does not materialize the whole dataset. The active where/order view is respected — what you export is what the source currently shows.

Parameters:
  • path (str) – Destination file path; its extension selects the format (CSV, TSV, JSON, JSONL, XML, and — with the extras — Parquet, Feather, HDF5).

  • selected_only (bool) – Export only selected records instead of all.

  • format (str | None) – Explicit format name overriding the path extension.

  • config (Any) – Optional FileSourceConfig controlling encoding/delimiter/etc.

abstractmethod select(record_id)#

Mark record as selected.

Parameters:

record_id (Any) – Unique identifier of the record

Returns:

True if record was selected, False if not found

Return type:

bool

abstractmethod select_all(current_page_only=False)#

Select all records (optionally only current page).

Parameters:

current_page_only (bool) – If True, select only records on current page

Returns:

Number of records selected

Return type:

int

abstractmethod selected(page=None)#

Get selected records, optionally paginated.

Parameters:

page (int | None) – Optional page number for paginating selected records

Returns:

List of selected record dictionaries

Return type:

List[Record]

abstractmethod update(record_id, updates)#

Update record fields by ID.

Parameters:
  • record_id (Any) – Unique identifier of the record

  • updates (Dict[str, Any]) – Dictionary with fields to update

Returns:

True if record was updated, False if not found

Return type:

bool

abstractmethod where(condition=None)#

Filter rows by a condition built with col.

Parameters:

condition (Condition | None) – A filter condition (e.g. col("age") >= 25), or None / no argument to clear the filter.

Returns:

Self for method chaining

Return type:

BaseDataSource