bootstack.data.FileDataSource#

class bootstack.data.FileDataSource(filepath, config=None, page_size=10, *, cache=None, id_field='id')#

Bases: SqliteDataSource

A SqliteDataSource whose data is streamed in from a file.

Reads a CSV / TSV / JSON / JSONL file (or an optional Parquet / Feather / HDF5 file) and ingests it chunk-by-chunk into a SQLite working store, so even a multi-million-row file loads with bounded memory. Once loaded it behaves exactly like a SqliteDataSource — fast SQL paging, filtering, sorting, CRUD.

The original file is read-only input; edits live in the working store and are never written back. Export to save changes.

Parameters:
  • filepath (str | Path) – Path to the data file.

  • config (Optional[FileSourceConfig]) – Optional FileSourceConfig for parsing and transforms.

  • page_size (int) – Number of records returned per page.

  • cache (Optional[str]) – Working store location. None (default) uses a temporary on-disk file removed on close(). A path names a persistent store whose data survives restarts (and is reused without re-ingest while it is newer than the source). ":memory:" keeps the store in memory.

  • id_field (str) – Record field used as the stable row identity.

filepath#

Path to the data file.

config#

Active configuration.

is_loaded#

Whether the file has been ingested (or a fresh cache adopted).

Example

with FileDataSource("people.csv") as ds:
    ds.load()
    ds.where(col("age") > 25)
    first = ds.page(0)

Note

  • reload() re-ingests from the file.

  • Background-thread ingest and progressive display are a planned follow-up; load() is currently synchronous (but streamed, so memory stays bounded).

property count: int#

Total number of records matching the current filter.

property id_field: str#

Name of the record field that holds the stable row identity.

property selected_count: int#

Number of selected records.

close()#

Close the working store, removing the temporary file if one was used.

Idempotent. A temporary store is also cleaned automatically if this is never called (see the finalizer registered in __init__).

delete(record_id)#

Delete record by ID.

deselect(record_id)#

Mark record as unselected.

deselect_all(current_page_only=False)#

Deselect all records (optionally only current page).

export_csv(filepath, include_all=True)#

Export records to a CSV file (streamed).

Parameters:
  • filepath (str) – Path to output CSV file

  • include_all (bool) – If True, export all records; if False, export only selected

get(record_id)#

Retrieve single record by ID.

get_distinct_values(column, limit=1000)#

Get distinct values for a column.

Parameters:
  • column (str) – Column name to get distinct values from.

  • limit (int) – Maximum number of distinct values to return.

Returns:

List of distinct values sorted alphabetically.

Return type:

List[Any]

has_next_page()#

Check if more pages exist after current page.

insert(record)#

Create new record and return its ID.

is_selected(record_id)#

Check whether a record is currently selected.

load(*, force=False)#

Ingest the file into the working store (streamed, in chunks).

Unlike other sources, a FileDataSource draws its records from the file given at construction, so load() takes no records. It is a no-op when a fresh persistent cache was adopted, unless force=True.

Parameters:

force (bool) – Re-ingest even if data is already present (used by reload).

Returns:

Self for method chaining.

Return type:

FileDataSource

move(record_id, target_index)#

Reorder a record to a new position.

Default returns False (not supported). Subclasses that maintain an explicit ordering should override.

Parameters:
  • record_id (Any) – Unique identifier of the record to move

  • target_index (int) – Zero-based destination index (clamped to valid range)

Returns:

True if the record was moved, False if not supported or not found

Return type:

bool

next_page()#

Advance to next page and return its records.

observe(condition=None, *order)#

Observe a live result set for a where/order query.

Returns a Stream that emits the matching records immediately, then a fresh result set whenever a relevant change occurs. Each subscriber observes its own slice — declare the query once, react to its results over time (the “observable query” pattern).

Selection toggles do not re-emit (selection is not a row-set change). Unlike where/order, observing does not disturb the source’s own pagination view.

Performance: each relevant change re-runs the whole query and re-emits the full result set. Use observe for small derived sets — dashboard metrics, a short pinned list, a filtered side panel. For large or virtualized views (Table, ListView) do NOT observe the full set; bind those widgets to the source directly — they listen via on_change and refetch only their visible window with page/page_slice.

Parameters:
  • condition (Condition | None) – Filter condition built with col (or None for all rows).

  • order (str | Column | SortKey) – Sort keys — column names, "-name" for descending, or col(...) specs.

Returns:

A Stream of result sets (each a list of record dictionaries).

Return type:

Any

Example

ds.observe(col("status") == "active", "-created").listen(
    lambda rows: gauge.set_value(len(rows))
)
on_change(handler=None)#

Subscribe to changes to this source.

Call with no argument to get a composable Stream of coarse change events; chain map/filter/debounce and listen to drive any widget (for example, a dashboard badge bound to the row count). Call with a handler to subscribe directly and get back a cancellable subscription.

The handler receives a DataChangeEvent. Rapid mutations are coalesced into a single notification per event-loop turn, and mutations made from a background thread are delivered on the main thread automatically — so a bound widget can refresh from a worker-thread feed with no extra work.

Parameters:

handler (Callable[[Any], Any] | None) – Change handler. Omit to receive a Stream instead.

Returns:

A Stream when handler is omitted, otherwise a cancellable subscription handle.

Return type:

Any

Example

ds.on_change(lambda e: print("changed:", e.kind))

# Feed a dashboard badge with the live row count.
ds.on_change().map(lambda e: ds.count).listen(badge.set_value)
order(*keys)#

Sort rows by one or more keys (no arguments clears sorting).

page(page=None)#

Get records for specified page.

page_slice(start_index, count)#

Get records by start index and count (respects filter/sort).

prev_page()#

Move to previous page and return its records.

reload()#

Re-ingest the file from disk, replacing the working store’s contents.

save(path, *, selected_only=False, format=None, config=None)#

Export records to a file, choosing the format by extension.

Records are streamed into the writer, so a large export does not materialize the whole dataset. The active where/order view is respected — what you export is what the source currently shows.

Parameters:
  • path (str) – Destination file path; its extension selects the format (CSV, TSV, JSON, JSONL, XML, and — with the extras — Parquet, Feather, HDF5).

  • selected_only (bool) – Export only selected records instead of all.

  • format (str | None) – Explicit format name overriding the path extension.

  • config (Any) – Optional FileSourceConfig controlling encoding/delimiter/etc.

select(record_id)#

Mark record as selected.

select_all(current_page_only=False)#

Select all records (optionally only current page).

selected(page=None)#

Get selected records, optionally paginated.

update(record_id, updates)#

Update record fields by ID.

where(condition=None)#

Filter rows by a condition built with col (None clears the filter).

The condition is rendered to a parameterized query — values are always bound, never interpolated — so user input cannot inject SQL.