bootstack.data.FileDataSource#
- class bootstack.data.FileDataSource(filepath, config=None, page_size=10, *, cache=None, id_field='id')#
Bases:
SqliteDataSourceA
SqliteDataSourcewhose data is streamed in from a file.Reads a CSV / TSV / JSON / JSONL file (or an optional Parquet / Feather / HDF5 file) and ingests it chunk-by-chunk into a SQLite working store, so even a multi-million-row file loads with bounded memory. Once loaded it behaves exactly like a
SqliteDataSource— fast SQL paging, filtering, sorting, CRUD.The original file is read-only input; edits live in the working store and are never written back. Export to save changes.
- Parameters:
filepath (str | Path) – Path to the data file.
config (Optional[FileSourceConfig]) – Optional
FileSourceConfigfor parsing and transforms.page_size (int) – Number of records returned per page.
cache (Optional[str]) – Working store location. None (default) uses a temporary on-disk file removed on
close(). A path names a persistent store whose data survives restarts (and is reused without re-ingest while it is newer than the source).":memory:"keeps the store in memory.id_field (str) – Record field used as the stable row identity.
- filepath#
Path to the data file.
- config#
Active configuration.
- is_loaded#
Whether the file has been ingested (or a fresh cache adopted).
Example
with FileDataSource("people.csv") as ds: ds.load() ds.where(col("age") > 25) first = ds.page(0)
Note
reload()re-ingests from the file.Background-thread ingest and progressive display are a planned follow-up;
load()is currently synchronous (but streamed, so memory stays bounded).
- close()#
Close the working store, removing the temporary file if one was used.
Idempotent. A temporary store is also cleaned automatically if this is never called (see the finalizer registered in
__init__).
- delete(record_id)#
Delete record by ID.
- deselect(record_id)#
Mark record as unselected.
- deselect_all(current_page_only=False)#
Deselect all records (optionally only current page).
- export_csv(filepath, include_all=True)#
Export records to a CSV file (streamed).
- get(record_id)#
Retrieve single record by ID.
- get_distinct_values(column, limit=1000)#
Get distinct values for a column.
- has_next_page()#
Check if more pages exist after current page.
- insert(record)#
Create new record and return its ID.
- is_selected(record_id)#
Check whether a record is currently selected.
- load(*, force=False)#
Ingest the file into the working store (streamed, in chunks).
Unlike other sources, a
FileDataSourcedraws its records from the file given at construction, soload()takes no records. It is a no-op when a fresh persistent cache was adopted, unlessforce=True.- Parameters:
force (bool) – Re-ingest even if data is already present (used by
reload).- Returns:
Self for method chaining.
- Return type:
- move(record_id, target_index)#
Reorder a record to a new position.
Default returns False (not supported). Subclasses that maintain an explicit ordering should override.
- next_page()#
Advance to next page and return its records.
- observe(condition=None, *order)#
Observe a live result set for a
where/orderquery.Returns a
Streamthat emits the matching records immediately, then a fresh result set whenever a relevant change occurs. Each subscriber observes its own slice — declare the query once, react to its results over time (the “observable query” pattern).Selection toggles do not re-emit (selection is not a row-set change). Unlike
where/order, observing does not disturb the source’s own pagination view.Performance: each relevant change re-runs the whole query and re-emits the full result set. Use
observefor small derived sets — dashboard metrics, a short pinned list, a filtered side panel. For large or virtualized views (Table,ListView) do NOT observe the full set; bind those widgets to the source directly — they listen viaon_changeand refetch only their visible window withpage/page_slice.- Parameters:
- Returns:
A
Streamof result sets (each a list of record dictionaries).- Return type:
Any
Example
ds.observe(col("status") == "active", "-created").listen( lambda rows: gauge.set_value(len(rows)) )
- on_change(handler=None)#
Subscribe to changes to this source.
Call with no argument to get a composable
Streamof coarse change events; chainmap/filter/debounceandlistento drive any widget (for example, a dashboard badge bound to the row count). Call with a handler to subscribe directly and get back a cancellable subscription.The handler receives a
DataChangeEvent. Rapid mutations are coalesced into a single notification per event-loop turn, and mutations made from a background thread are delivered on the main thread automatically — so a bound widget can refresh from a worker-thread feed with no extra work.- Parameters:
handler (Callable[[Any], Any] | None) – Change handler. Omit to receive a
Streaminstead.- Returns:
A
Streamwhenhandleris omitted, otherwise a cancellable subscription handle.- Return type:
Example
ds.on_change(lambda e: print("changed:", e.kind)) # Feed a dashboard badge with the live row count. ds.on_change().map(lambda e: ds.count).listen(badge.set_value)
- order(*keys)#
Sort rows by one or more keys (no arguments clears sorting).
- page(page=None)#
Get records for specified page.
- page_slice(start_index, count)#
Get records by start index and count (respects filter/sort).
- prev_page()#
Move to previous page and return its records.
- reload()#
Re-ingest the file from disk, replacing the working store’s contents.
- save(path, *, selected_only=False, format=None, config=None)#
Export records to a file, choosing the format by extension.
Records are streamed into the writer, so a large export does not materialize the whole dataset. The active
where/orderview is respected — what you export is what the source currently shows.- Parameters:
path (str) – Destination file path; its extension selects the format (CSV, TSV, JSON, JSONL, XML, and — with the extras — Parquet, Feather, HDF5).
selected_only (bool) – Export only selected records instead of all.
format (str | None) – Explicit format name overriding the path extension.
config (Any) – Optional
FileSourceConfigcontrolling encoding/delimiter/etc.
- select(record_id)#
Mark record as selected.
- select_all(current_page_only=False)#
Select all records (optionally only current page).
- selected(page=None)#
Get selected records, optionally paginated.
- update(record_id, updates)#
Update record fields by ID.
- where(condition=None)#
Filter rows by a condition built with
col(None clears the filter).The condition is rendered to a parameterized query — values are always bound, never interpolated — so user input cannot inject SQL.