bootstack.data.SqliteDataSource#
- class bootstack.data.SqliteDataSource(name=':memory:', page_size=10, id_field='id')#
Bases:
BaseDataSourceSQLite-backed data manager with pagination, filtering, sorting, and CRUD operations.
Provides persistent storage using SQLite database with automatic schema inference. Filtering and sorting use the
colexpression API (rendered to parameterized SQL internally). Supports all operations defined in DataSourceProtocol.- Parameters:
- conn#
SQLite database connection
- page_size#
Current page size setting
Example
ds = SqliteDataSource("data.db", page_size=20) ds.load([{"name": "Alice", "age": 30}]) ds.where(col("age") > 25) first = ds.page(0)
Note
The database connection persists for the lifetime of the object
Close the connection explicitly with conn.close() if needed
Schema is inferred from first record’s data types
An internal row-identity column (_bs_row_id) is added automatically as PRIMARY KEY
An internal selection column (_bs_selected) is added automatically for selection tracking
An internal JSON column (_bs_data) carries non-scalar record fields (the data bag)
These internal columns are filtered out of the display column list automatically
- close()#
Close the underlying SQLite connection. Idempotent.
Use a
withblock to close automatically:with SqliteDataSource("app.db") as ds: ds.load(rows)
- delete(record_id)#
Delete record by ID.
- deselect(record_id)#
Mark record as unselected.
- deselect_all(current_page_only=False)#
Deselect all records (optionally only current page).
- export_csv(filepath, include_all=True)#
Export records to a CSV file (streamed).
- get(record_id)#
Retrieve single record by ID.
- get_distinct_values(column, limit=1000)#
Get distinct values for a column.
- has_next_page()#
Check if more pages exist after current page.
- insert(record)#
Create new record and return its ID.
- is_selected(record_id)#
Check whether a record is currently selected.
- load(records, column_keys=None, *, chunk_size=10000, on_progress=None)#
Load records into the database, creating the table with an inferred schema.
Records are consumed as a stream and inserted in chunks within a single transaction, so a lazy iterator of millions of rows loads with bounded memory — only
chunk_sizerows are held at a time. The whole load is atomic: if any row fails (for example a duplicate id), the table is rolled back to its prior state.- Parameters:
records (Iterable[Primitive] | Iterable[dict[str, Any]] | Iterable[Sequence[Any]]) – An iterable of dicts, primitives, or row sequences — a list, a generator, or any other iterable.
column_keys (Sequence[str] | None) – Optional column names when supplying row sequences (lists/tuples).
chunk_size (int) – Number of rows buffered per
executemanybatch.on_progress (Callable[[int], None] | None) – Optional callback invoked after each chunk with the running count of rows inserted so far.
- Returns:
Self for method chaining
- Return type:
- move(record_id, target_index)#
Reorder a record to a new position.
Default returns False (not supported). Subclasses that maintain an explicit ordering should override.
- next_page()#
Advance to next page and return its records.
- observe(condition=None, *order)#
Observe a live result set for a
where/orderquery.Returns a
Streamthat emits the matching records immediately, then a fresh result set whenever a relevant change occurs. Each subscriber observes its own slice — declare the query once, react to its results over time (the “observable query” pattern).Selection toggles do not re-emit (selection is not a row-set change). Unlike
where/order, observing does not disturb the source’s own pagination view.Performance: each relevant change re-runs the whole query and re-emits the full result set. Use
observefor small derived sets — dashboard metrics, a short pinned list, a filtered side panel. For large or virtualized views (Table,ListView) do NOT observe the full set; bind those widgets to the source directly — they listen viaon_changeand refetch only their visible window withpage/page_slice.- Parameters:
- Returns:
A
Streamof result sets (each a list of record dictionaries).- Return type:
Any
Example
ds.observe(col("status") == "active", "-created").listen( lambda rows: gauge.set_value(len(rows)) )
- on_change(handler=None)#
Subscribe to changes to this source.
Call with no argument to get a composable
Streamof coarse change events; chainmap/filter/debounceandlistento drive any widget (for example, a dashboard badge bound to the row count). Call with a handler to subscribe directly and get back a cancellable subscription.The handler receives a
DataChangeEvent. Rapid mutations are coalesced into a single notification per event-loop turn, and mutations made from a background thread are delivered on the main thread automatically — so a bound widget can refresh from a worker-thread feed with no extra work.- Parameters:
handler (Callable[[Any], Any] | None) – Change handler. Omit to receive a
Streaminstead.- Returns:
A
Streamwhenhandleris omitted, otherwise a cancellable subscription handle.- Return type:
Example
ds.on_change(lambda e: print("changed:", e.kind)) # Feed a dashboard badge with the live row count. ds.on_change().map(lambda e: ds.count).listen(badge.set_value)
- order(*keys)#
Sort rows by one or more keys (no arguments clears sorting).
- page(page=None)#
Get records for specified page.
- page_slice(start_index, count)#
Get records by start index and count (respects filter/sort).
- prev_page()#
Move to previous page and return its records.
- reload()#
Re-read data from the underlying source.
Default is a no-op suitable for in-memory implementations. File- and database-backed sources should override to re-query.
- save(path, *, selected_only=False, format=None, config=None)#
Export records to a file, choosing the format by extension.
Records are streamed into the writer, so a large export does not materialize the whole dataset. The active
where/orderview is respected — what you export is what the source currently shows.- Parameters:
path (str) – Destination file path; its extension selects the format (CSV, TSV, JSON, JSONL, XML, and — with the extras — Parquet, Feather, HDF5).
selected_only (bool) – Export only selected records instead of all.
format (str | None) – Explicit format name overriding the path extension.
config (Any) – Optional
FileSourceConfigcontrolling encoding/delimiter/etc.
- select(record_id)#
Mark record as selected.
- select_all(current_page_only=False)#
Select all records (optionally only current page).
- selected(page=None)#
Get selected records, optionally paginated.
- update(record_id, updates)#
Update record fields by ID.
- where(condition=None)#
Filter rows by a condition built with
col(None clears the filter).The condition is rendered to a parameterized query — values are always bound, never interpolated — so user input cannot inject SQL.