bootstack.data.SqliteDataSource#

class bootstack.data.SqliteDataSource(name=':memory:', page_size=10, id_field='id')#

Bases: BaseDataSource

SQLite-backed data manager with pagination, filtering, sorting, and CRUD operations.

Provides persistent storage using SQLite database with automatic schema inference. Filtering and sorting use the col expression API (rendered to parameterized SQL internally). Supports all operations defined in DataSourceProtocol.

Parameters:
  • name (str) – Database file path or “:memory:” for in-memory database (default: “:memory:”)

  • page_size (int) – Number of records per page (default: 10)

conn#

SQLite database connection

page_size#

Current page size setting

Example

ds = SqliteDataSource("data.db", page_size=20)
ds.load([{"name": "Alice", "age": 30}])
ds.where(col("age") > 25)
first = ds.page(0)

Note

  • The database connection persists for the lifetime of the object

  • Close the connection explicitly with conn.close() if needed

  • Schema is inferred from first record’s data types

  • An internal row-identity column (_bs_row_id) is added automatically as PRIMARY KEY

  • An internal selection column (_bs_selected) is added automatically for selection tracking

  • An internal JSON column (_bs_data) carries non-scalar record fields (the data bag)

  • These internal columns are filtered out of the display column list automatically

property count: int#

Total number of records matching the current filter.

property id_field: str#

Name of the record field that holds the stable row identity.

property selected_count: int#

Number of selected records.

close()#

Close the underlying SQLite connection. Idempotent.

Use a with block to close automatically:

with SqliteDataSource("app.db") as ds:
    ds.load(rows)
delete(record_id)#

Delete record by ID.

deselect(record_id)#

Mark record as unselected.

deselect_all(current_page_only=False)#

Deselect all records (optionally only current page).

export_csv(filepath, include_all=True)#

Export records to a CSV file (streamed).

Parameters:
  • filepath (str) – Path to output CSV file

  • include_all (bool) – If True, export all records; if False, export only selected

get(record_id)#

Retrieve single record by ID.

get_distinct_values(column, limit=1000)#

Get distinct values for a column.

Parameters:
  • column (str) – Column name to get distinct values from.

  • limit (int) – Maximum number of distinct values to return.

Returns:

List of distinct values sorted alphabetically.

Return type:

List[Any]

has_next_page()#

Check if more pages exist after current page.

insert(record)#

Create new record and return its ID.

is_selected(record_id)#

Check whether a record is currently selected.

load(records, column_keys=None, *, chunk_size=10000, on_progress=None)#

Load records into the database, creating the table with an inferred schema.

Records are consumed as a stream and inserted in chunks within a single transaction, so a lazy iterator of millions of rows loads with bounded memory — only chunk_size rows are held at a time. The whole load is atomic: if any row fails (for example a duplicate id), the table is rolled back to its prior state.

Parameters:
  • records (Iterable[Primitive] | Iterable[dict[str, Any]] | Iterable[Sequence[Any]]) – An iterable of dicts, primitives, or row sequences — a list, a generator, or any other iterable.

  • column_keys (Sequence[str] | None) – Optional column names when supplying row sequences (lists/tuples).

  • chunk_size (int) – Number of rows buffered per executemany batch.

  • on_progress (Callable[[int], None] | None) – Optional callback invoked after each chunk with the running count of rows inserted so far.

Returns:

Self for method chaining

Return type:

SqliteDataSource

move(record_id, target_index)#

Reorder a record to a new position.

Default returns False (not supported). Subclasses that maintain an explicit ordering should override.

Parameters:
  • record_id (Any) – Unique identifier of the record to move

  • target_index (int) – Zero-based destination index (clamped to valid range)

Returns:

True if the record was moved, False if not supported or not found

Return type:

bool

next_page()#

Advance to next page and return its records.

observe(condition=None, *order)#

Observe a live result set for a where/order query.

Returns a Stream that emits the matching records immediately, then a fresh result set whenever a relevant change occurs. Each subscriber observes its own slice — declare the query once, react to its results over time (the “observable query” pattern).

Selection toggles do not re-emit (selection is not a row-set change). Unlike where/order, observing does not disturb the source’s own pagination view.

Performance: each relevant change re-runs the whole query and re-emits the full result set. Use observe for small derived sets — dashboard metrics, a short pinned list, a filtered side panel. For large or virtualized views (Table, ListView) do NOT observe the full set; bind those widgets to the source directly — they listen via on_change and refetch only their visible window with page/page_slice.

Parameters:
  • condition (Condition | None) – Filter condition built with col (or None for all rows).

  • order (str | Column | SortKey) – Sort keys — column names, "-name" for descending, or col(...) specs.

Returns:

A Stream of result sets (each a list of record dictionaries).

Return type:

Any

Example

ds.observe(col("status") == "active", "-created").listen(
    lambda rows: gauge.set_value(len(rows))
)
on_change(handler=None)#

Subscribe to changes to this source.

Call with no argument to get a composable Stream of coarse change events; chain map/filter/debounce and listen to drive any widget (for example, a dashboard badge bound to the row count). Call with a handler to subscribe directly and get back a cancellable subscription.

The handler receives a DataChangeEvent. Rapid mutations are coalesced into a single notification per event-loop turn, and mutations made from a background thread are delivered on the main thread automatically — so a bound widget can refresh from a worker-thread feed with no extra work.

Parameters:

handler (Callable[[Any], Any] | None) – Change handler. Omit to receive a Stream instead.

Returns:

A Stream when handler is omitted, otherwise a cancellable subscription handle.

Return type:

Any

Example

ds.on_change(lambda e: print("changed:", e.kind))

# Feed a dashboard badge with the live row count.
ds.on_change().map(lambda e: ds.count).listen(badge.set_value)
order(*keys)#

Sort rows by one or more keys (no arguments clears sorting).

page(page=None)#

Get records for specified page.

page_slice(start_index, count)#

Get records by start index and count (respects filter/sort).

prev_page()#

Move to previous page and return its records.

reload()#

Re-read data from the underlying source.

Default is a no-op suitable for in-memory implementations. File- and database-backed sources should override to re-query.

save(path, *, selected_only=False, format=None, config=None)#

Export records to a file, choosing the format by extension.

Records are streamed into the writer, so a large export does not materialize the whole dataset. The active where/order view is respected — what you export is what the source currently shows.

Parameters:
  • path (str) – Destination file path; its extension selects the format (CSV, TSV, JSON, JSONL, XML, and — with the extras — Parquet, Feather, HDF5).

  • selected_only (bool) – Export only selected records instead of all.

  • format (str | None) – Explicit format name overriding the path extension.

  • config (Any) – Optional FileSourceConfig controlling encoding/delimiter/etc.

select(record_id)#

Mark record as selected.

select_all(current_page_only=False)#

Select all records (optionally only current page).

selected(page=None)#

Get selected records, optionally paginated.

update(record_id, updates)#

Update record fields by ID.

where(condition=None)#

Filter rows by a condition built with col (None clears the filter).

The condition is rendered to a parameterized query — values are always bound, never interpolated — so user input cannot inject SQL.