I/O Lakehouse et Warehouse
Fonctions réexportées par fabrictools pour lire, écrire et fusionner dans les Lakehouses et Warehouses Fabric.
- fabrictools.read_lakehouse(lakehouse_name: str, relative_path: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Read a dataset from a Fabric Lakehouse.
Tries formats in order: Delta → Parquet → CSV. The first format that succeeds is used; the detected format is logged with the resulting shape.
- Parameters:
lakehouse_name (str) – Display name of the Lakehouse (e.g.
"BronzeLakehouse").relative_path (str) – Path inside the Lakehouse root, relative to the ABFS base (e.g.
"sales/2024","Tables/customers", or SQL-style"dbo.MyTable"/"dbo.PdC Extraction"with spaces in the table name).spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
- Returns:
Loaded dataframe.
- Return type:
DataFrame
- Raises:
RuntimeError – When none of the supported formats can be read from the path.
Example
>>> df = read_lakehouse("BronzeLakehouse", "sales/2024")
- fabrictools.write_lakehouse(df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, mode: str = 'overwrite', partition_by: List[str] | None = None, format: str = 'delta', spark: pyspark.sql.SparkSession | None = None, *, normalize_column_names: bool = True, enable_column_mapping: bool = False, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824) None
Write a
DataFrameto a Fabric Lakehouse (default format: Delta).- Parameters:
df (DataFrame) – DataFrame to persist.
lakehouse_name (str) – Display name of the target Lakehouse.
relative_path (str) – Destination path inside the Lakehouse (e.g.
"sales_clean","Tables/sales_clean", or"dbo.PdC Extraction").mode (str) – Spark write mode:
"overwrite"(default),"append","ignore", or"error".partition_by (list[str] | None) – Optional column names to partition by. Each name is resolved like
fabrictools.clean_data()/fabrictools.merge_dataframes()(physical name, normalized unique label, or snake_case). Auto-detected date partitions are appended when present ondf.format (str) –
"delta"(default),"parquet", or"csv".spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.normalize_column_names (bool) – If
True(default), runfabrictools.rename_columns_normalized()before resolvingpartition_byand writing. IfFalse, keep physical column names unchanged.enable_column_mapping (bool) – If
Trueandformat="delta", writes table properties required for Delta column mapping (modename), allowing column names with spaces or special characters.auto_partition – If
True(default), automatically partition the data by detected date columns if they exist.
Example
>>> write_lakehouse( ... df, "SilverLakehouse", "sales_clean", mode="overwrite", partition_by=["year"] ... )
- fabrictools.merge_lakehouse(source_df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, merge_condition: str, update_set: dict | None = None, insert_set: dict | None = None, spark: pyspark.sql.SparkSession | None = None) None
Upsert (merge) a
DataFrameinto an existing Delta table in a Lakehouse.Uses Delta Lake
DeltaTable.forPath. Whenupdate_setand/orinsert_setareNone,whenMatchedUpdateAll/whenNotMatchedInsertAllare used.- Parameters:
source_df (DataFrame) – Rows to merge into the target table.
lakehouse_name (str) – Lakehouse display name holding the target table.
relative_path (str) – Path of the Delta table inside the Lakehouse (same rules as
write_lakehouse(), includingschema.tablewith spaces).merge_condition (str) – SQL predicate joining source and target (e.g.
"src.id = tgt.id").update_set (dict | None) –
{target_col: source_expr}for matched rows, orNoneto update all columns.insert_set (dict | None) –
{target_col: source_expr}for new rows, orNoneto insert all columns.spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
Example
>>> merge_lakehouse( ... new_df, ... "SilverLakehouse", ... "sales_clean", ... merge_condition="src.id = tgt.id", ... )
- fabrictools.delete_all_lakehouse_tables(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None, continue_on_error: bool = False) dict[str, Any]
Hard-delete all discovered Lakehouse table folders.
Tables are discovered as
Tables/<schema>/<table>and removed withnotebookutils.fs.rm(<abfs>/Tables/<schema>/<table>, recurse=True).- Parameters:
lakehouse_name (str) – Lakehouse display name to purge.
include_schemas (list[str] | None) – If set, only these schema names (case-insensitive).
exclude_tables (list[str] | None) – Table or
schema.tablenames to skip (case-insensitive).continue_on_error (bool) – If
False(default), stop on first delete failure.
- Returns:
Summary with counts and per-table
relative_path/ errors.- Return type:
- Raises:
ValueError – When
notebookutilsis unavailable (not in Fabric).
Example
>>> summary = delete_all_lakehouse_tables( ... "DevLakehouse", ... include_schemas=["dbo"], ... exclude_tables=["dbo.KeepThis"], ... continue_on_error=True, ... )
- fabrictools.read_warehouse(warehouse_name: str, query: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Run a SQL query on a Fabric Warehouse and return the result as a
DataFrame.The JDBC URL is resolved from the warehouse display name via
notebookutils. Authentication uses the signed-in Fabric user token.- Parameters:
warehouse_name (str) – Warehouse display name (e.g.
"MyWarehouse").query (str) – SQL text (e.g.
"SELECT * FROM dbo.sales"). Wrap subqueries in parentheses when needed, e.g."(SELECT id, name FROM dbo.sales WHERE year = 2024) t".spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
- Returns:
Query result.
- Return type:
DataFrame
Example
>>> df = read_warehouse("MyWarehouse", "SELECT * FROM dbo.sales")
- fabrictools.write_warehouse(df: pyspark.sql.DataFrame, warehouse_name: str, table: str, mode: str = 'overwrite', batch_size: int = 10000, spark: pyspark.sql.SparkSession | None = None) None
Write a
DataFrameto a Fabric Warehouse table via JDBC.- Parameters:
df (DataFrame) – DataFrame to persist.
warehouse_name (str) – Target Warehouse display name.
table (str) – Fully-qualified table name (e.g.
"dbo.sales_clean").mode (str) – Spark write mode:
"overwrite"(default),"append","ignore", or"error".batch_size (int) – Rows per JDBC batch (default
10000).spark (SparkSession | None) – Optional
SparkSession; when omitted the active session is used.
Example
>>> write_warehouse(df, "MyWarehouse", "dbo.sales_clean", mode="append")