I/O Lakehouse et Warehouse
Lakehouse filesystem discovery utilities.
- fabrictools.io.discovery.filter_pipeline_discovered_tables(relative_paths: list[str]) list[str][source]
Remove internal fabrictools tables and schema snapshot paths from a path list.
Drops names ending with
_schema_snapshotand fixed namespipeline_audit_log,prefix_rules, andprofiling_cache.
- fabrictools.io.discovery.get_fs_entry_name(fs_entry: Any) str[source]
Return a directory or file name from a
notebookutils.fs.lsentry.- Parameters:
fs_entry (Any) – Object with
nameorpathas provided by Fabric APIs.- Returns:
Normalized leaf name, or empty string if none.
- Return type:
- fabrictools.io.discovery.list_lakehouse_tables(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None) List[str][source]
List table paths under a Lakehouse as
Tables/<schema>/<table>.Uses filesystem listing under
<abfs>/Tables/<schema>/<table>.- Parameters:
- Returns:
Sorted relative paths.
- Return type:
- Raises:
ValueError – When
notebookutilsis unavailable (not in Fabric).
- fabrictools.io.discovery.list_lakehouse_tables_for_pipeline(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None) List[str][source]
Like
list_lakehouse_tables(), thenfilter_pipeline_discovered_tables().Bulk pipelines use this to skip internal metadata and schema snapshot tables.
Lakehouse I/O facade module.
- fabrictools.io.lakehouse.delete_all_lakehouse_tables(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None, continue_on_error: bool = False) dict[str, Any]
Hard-delete all discovered Lakehouse table folders.
Tables are discovered as
Tables/<schema>/<table>and removed withnotebookutils.fs.rm(<abfs>/Tables/<schema>/<table>, recurse=True).- Parameters:
lakehouse_name (str) – Lakehouse display name to purge.
include_schemas (list[str] | None) – If set, only these schema names (case-insensitive).
exclude_tables (list[str] | None) – Table or
schema.tablenames to skip (case-insensitive).continue_on_error (bool) – If
False(default), stop on first delete failure.
- Returns:
Summary with counts and per-table
relative_path/ errors.- Return type:
- Raises:
ValueError – When
notebookutilsis unavailable (not in Fabric).
Example
>>> summary = delete_all_lakehouse_tables( ... "DevLakehouse", ... include_schemas=["dbo"], ... exclude_tables=["dbo.KeepThis"], ... continue_on_error=True, ... )
- fabrictools.io.lakehouse.merge_lakehouse(source_df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, merge_condition: str, update_set: dict | None = None, insert_set: dict | None = None, spark: pyspark.sql.SparkSession | None = None) None
Upsert (merge) a
DataFrameinto an existing Delta table in a Lakehouse.Uses Delta Lake
DeltaTable.forPath. Whenupdate_setand/orinsert_setareNone,whenMatchedUpdateAll/whenNotMatchedInsertAllare used.- Parameters:
source_df (DataFrame) – Rows to merge into the target table.
lakehouse_name (str) – Lakehouse display name holding the target table.
relative_path (str) – Path of the Delta table inside the Lakehouse (same rules as
write_lakehouse(), includingschema.tablewith spaces).merge_condition (str) – SQL predicate joining source and target (e.g.
"src.id = tgt.id").update_set (dict | None) –
{target_col: source_expr}for matched rows, orNoneto update all columns.insert_set (dict | None) –
{target_col: source_expr}for new rows, orNoneto insert all columns.spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
Example
>>> merge_lakehouse( ... new_df, ... "SilverLakehouse", ... "sales_clean", ... merge_condition="src.id = tgt.id", ... )
- fabrictools.io.lakehouse.read_lakehouse(lakehouse_name: str, relative_path: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Read a dataset from a Fabric Lakehouse.
Tries formats in order: Delta → Parquet → CSV. The first format that succeeds is used; the detected format is logged with the resulting shape.
- Parameters:
lakehouse_name (str) – Display name of the Lakehouse (e.g.
"BronzeLakehouse").relative_path (str) – Path inside the Lakehouse root, relative to the ABFS base (e.g.
"sales/2024","Tables/customers", or SQL-style"dbo.MyTable"/"dbo.PdC Extraction"with spaces in the table name).spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
- Returns:
Loaded dataframe.
- Return type:
DataFrame
- Raises:
RuntimeError – When none of the supported formats can be read from the path.
Example
>>> df = read_lakehouse("BronzeLakehouse", "sales/2024")
- fabrictools.io.lakehouse.resolve_lakehouse_read_candidate(lakehouse_name: str, relative_path: str, spark: pyspark.sql.SparkSession | None = None) str[source]
Resolve the best candidate relative path for a Lakehouse read.
If candidate generation yields a single path, return it directly. If multiple candidates exist, try each path and return the first readable one.
- Parameters:
- Returns:
Relative path string that was verified readable.
- Return type:
- Raises:
RuntimeError – When no candidate path can be read.
Example
>>> resolved = resolve_lakehouse_read_candidate( ... "BronzeLakehouse", "dbo.SalesOrders" ... )
- fabrictools.io.lakehouse.write_lakehouse(df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, mode: str = 'overwrite', partition_by: List[str] | None = None, format: str = 'delta', spark: pyspark.sql.SparkSession | None = None, *, normalize_column_names: bool = True, enable_column_mapping: bool = False, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824) None
Write a
DataFrameto a Fabric Lakehouse (default format: Delta).- Parameters:
df (DataFrame) – DataFrame to persist.
lakehouse_name (str) – Display name of the target Lakehouse.
relative_path (str) – Destination path inside the Lakehouse (e.g.
"sales_clean","Tables/sales_clean", or"dbo.PdC Extraction").mode (str) – Spark write mode:
"overwrite"(default),"append","ignore", or"error".partition_by (list[str] | None) – Optional column names to partition by. Each name is resolved like
fabrictools.clean_data()/fabrictools.merge_dataframes()(physical name, normalized unique label, or snake_case). Auto-detected date partitions are appended when present ondf.format (str) –
"delta"(default),"parquet", or"csv".spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.normalize_column_names (bool) – If
True(default), runfabrictools.rename_columns_normalized()before resolvingpartition_byand writing. IfFalse, keep physical column names unchanged.enable_column_mapping (bool) – If
Trueandformat="delta", writes table properties required for Delta column mapping (modename), allowing column names with spaces or special characters.auto_partition – If
True(default), automatically partition the data by detected date columns if they exist.
Example
>>> write_lakehouse( ... df, "SilverLakehouse", "sales_clean", mode="overwrite", partition_by=["year"] ... )
Warehouse I/O facade module.
- fabrictools.io.warehouse.read_warehouse(warehouse_name: str, query: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Run a SQL query on a Fabric Warehouse and return the result as a
DataFrame.The JDBC URL is resolved from the warehouse display name via
notebookutils. Authentication uses the signed-in Fabric user token.- Parameters:
warehouse_name (str) – Warehouse display name (e.g.
"MyWarehouse").query (str) – SQL text (e.g.
"SELECT * FROM dbo.sales"). Wrap subqueries in parentheses when needed, e.g."(SELECT id, name FROM dbo.sales WHERE year = 2024) t".spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
- Returns:
Query result.
- Return type:
DataFrame
Example
>>> df = read_warehouse("MyWarehouse", "SELECT * FROM dbo.sales")
- fabrictools.io.warehouse.write_warehouse(df: pyspark.sql.DataFrame, warehouse_name: str, table: str, mode: str = 'overwrite', batch_size: int = 10000, spark: pyspark.sql.SparkSession | None = None) None
Write a
DataFrameto a Fabric Warehouse table via JDBC.- Parameters:
df (DataFrame) – DataFrame to persist.
warehouse_name (str) – Target Warehouse display name.
table (str) – Fully-qualified table name (e.g.
"dbo.sales_clean").mode (str) – Spark write mode:
"overwrite"(default),"append","ignore", or"error".batch_size (int) – Rows per JDBC batch (default
10000).spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
Example
>>> write_warehouse(df, "MyWarehouse", "dbo.sales_clean", mode="append")
I/O adapters for Fabric Lakehouse and Warehouse (see fabrictools.io.lakehouse, fabrictools.io.warehouse, fabrictools.io.discovery).
- fabrictools.io.delete_all_lakehouse_tables(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None, continue_on_error: bool = False) dict[str, Any]
Hard-delete all discovered Lakehouse table folders.
Tables are discovered as
Tables/<schema>/<table>and removed withnotebookutils.fs.rm(<abfs>/Tables/<schema>/<table>, recurse=True).- Parameters:
lakehouse_name (str) – Lakehouse display name to purge.
include_schemas (list[str] | None) – If set, only these schema names (case-insensitive).
exclude_tables (list[str] | None) – Table or
schema.tablenames to skip (case-insensitive).continue_on_error (bool) – If
False(default), stop on first delete failure.
- Returns:
Summary with counts and per-table
relative_path/ errors.- Return type:
- Raises:
ValueError – When
notebookutilsis unavailable (not in Fabric).
Example
>>> summary = delete_all_lakehouse_tables( ... "DevLakehouse", ... include_schemas=["dbo"], ... exclude_tables=["dbo.KeepThis"], ... continue_on_error=True, ... )
- fabrictools.io.filter_pipeline_discovered_tables(relative_paths: list[str]) list[str][source]
Remove internal fabrictools tables and schema snapshot paths from a path list.
Drops names ending with
_schema_snapshotand fixed namespipeline_audit_log,prefix_rules, andprofiling_cache.
- fabrictools.io.list_lakehouse_tables(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None) List[str][source]
List table paths under a Lakehouse as
Tables/<schema>/<table>.Uses filesystem listing under
<abfs>/Tables/<schema>/<table>.- Parameters:
- Returns:
Sorted relative paths.
- Return type:
- Raises:
ValueError – When
notebookutilsis unavailable (not in Fabric).
- fabrictools.io.list_lakehouse_tables_for_pipeline(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None) List[str][source]
Like
list_lakehouse_tables(), thenfilter_pipeline_discovered_tables().Bulk pipelines use this to skip internal metadata and schema snapshot tables.
- fabrictools.io.merge_lakehouse(source_df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, merge_condition: str, update_set: dict | None = None, insert_set: dict | None = None, spark: pyspark.sql.SparkSession | None = None) None
Upsert (merge) a
DataFrameinto an existing Delta table in a Lakehouse.Uses Delta Lake
DeltaTable.forPath. Whenupdate_setand/orinsert_setareNone,whenMatchedUpdateAll/whenNotMatchedInsertAllare used.- Parameters:
source_df (DataFrame) – Rows to merge into the target table.
lakehouse_name (str) – Lakehouse display name holding the target table.
relative_path (str) – Path of the Delta table inside the Lakehouse (same rules as
write_lakehouse(), includingschema.tablewith spaces).merge_condition (str) – SQL predicate joining source and target (e.g.
"src.id = tgt.id").update_set (dict | None) –
{target_col: source_expr}for matched rows, orNoneto update all columns.insert_set (dict | None) –
{target_col: source_expr}for new rows, orNoneto insert all columns.spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
Example
>>> merge_lakehouse( ... new_df, ... "SilverLakehouse", ... "sales_clean", ... merge_condition="src.id = tgt.id", ... )
- fabrictools.io.read_lakehouse(lakehouse_name: str, relative_path: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Read a dataset from a Fabric Lakehouse.
Tries formats in order: Delta → Parquet → CSV. The first format that succeeds is used; the detected format is logged with the resulting shape.
- Parameters:
lakehouse_name (str) – Display name of the Lakehouse (e.g.
"BronzeLakehouse").relative_path (str) – Path inside the Lakehouse root, relative to the ABFS base (e.g.
"sales/2024","Tables/customers", or SQL-style"dbo.MyTable"/"dbo.PdC Extraction"with spaces in the table name).spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
- Returns:
Loaded dataframe.
- Return type:
DataFrame
- Raises:
RuntimeError – When none of the supported formats can be read from the path.
Example
>>> df = read_lakehouse("BronzeLakehouse", "sales/2024")
- fabrictools.io.read_warehouse(warehouse_name: str, query: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Run a SQL query on a Fabric Warehouse and return the result as a
DataFrame.The JDBC URL is resolved from the warehouse display name via
notebookutils. Authentication uses the signed-in Fabric user token.- Parameters:
warehouse_name (str) – Warehouse display name (e.g.
"MyWarehouse").query (str) – SQL text (e.g.
"SELECT * FROM dbo.sales"). Wrap subqueries in parentheses when needed, e.g."(SELECT id, name FROM dbo.sales WHERE year = 2024) t".spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
- Returns:
Query result.
- Return type:
DataFrame
Example
>>> df = read_warehouse("MyWarehouse", "SELECT * FROM dbo.sales")
- fabrictools.io.resolve_lakehouse_read_candidate(lakehouse_name: str, relative_path: str, spark: pyspark.sql.SparkSession | None = None) str[source]
Resolve the best candidate relative path for a Lakehouse read.
If candidate generation yields a single path, return it directly. If multiple candidates exist, try each path and return the first readable one.
- Parameters:
- Returns:
Relative path string that was verified readable.
- Return type:
- Raises:
RuntimeError – When no candidate path can be read.
Example
>>> resolved = resolve_lakehouse_read_candidate( ... "BronzeLakehouse", "dbo.SalesOrders" ... )
- fabrictools.io.write_lakehouse(df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, mode: str = 'overwrite', partition_by: List[str] | None = None, format: str = 'delta', spark: pyspark.sql.SparkSession | None = None, *, normalize_column_names: bool = True, enable_column_mapping: bool = False, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824) None
Write a
DataFrameto a Fabric Lakehouse (default format: Delta).- Parameters:
df (DataFrame) – DataFrame to persist.
lakehouse_name (str) – Display name of the target Lakehouse.
relative_path (str) – Destination path inside the Lakehouse (e.g.
"sales_clean","Tables/sales_clean", or"dbo.PdC Extraction").mode (str) – Spark write mode:
"overwrite"(default),"append","ignore", or"error".partition_by (list[str] | None) – Optional column names to partition by. Each name is resolved like
fabrictools.clean_data()/fabrictools.merge_dataframes()(physical name, normalized unique label, or snake_case). Auto-detected date partitions are appended when present ondf.format (str) –
"delta"(default),"parquet", or"csv".spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.normalize_column_names (bool) – If
True(default), runfabrictools.rename_columns_normalized()before resolvingpartition_byand writing. IfFalse, keep physical column names unchanged.enable_column_mapping (bool) – If
Trueandformat="delta", writes table properties required for Delta column mapping (modename), allowing column names with spaces or special characters.auto_partition – If
True(default), automatically partition the data by detected date columns if they exist.
Example
>>> write_lakehouse( ... df, "SilverLakehouse", "sales_clean", mode="overwrite", partition_by=["year"] ... )
- fabrictools.io.write_warehouse(df: pyspark.sql.DataFrame, warehouse_name: str, table: str, mode: str = 'overwrite', batch_size: int = 10000, spark: pyspark.sql.SparkSession | None = None) None
Write a
DataFrameto a Fabric Warehouse table via JDBC.- Parameters:
df (DataFrame) – DataFrame to persist.
warehouse_name (str) – Target Warehouse display name.
table (str) – Fully-qualified table name (e.g.
"dbo.sales_clean").mode (str) – Spark write mode:
"overwrite"(default),"append","ignore", or"error".batch_size (int) – Rows per JDBC batch (default
10000).spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
Example
>>> write_warehouse(df, "MyWarehouse", "dbo.sales_clean", mode="append")