I/O Lakehouse et Warehouse

Fonctions réexportées par fabrictools pour lire, écrire et fusionner dans les Lakehouses et Warehouses Fabric.

fabrictools.read_lakehouse(lakehouse_name: str, relative_path: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame

Read a dataset from a Fabric Lakehouse.

Tries formats in order: Delta → Parquet → CSV. The first format that succeeds is used; the detected format is logged with the resulting shape.

Parameters:
  • lakehouse_name (str) – Display name of the Lakehouse (e.g. "BronzeLakehouse").

  • relative_path (str) – Path inside the Lakehouse root, relative to the ABFS base (e.g. "sales/2024", "Tables/customers", or SQL-style "dbo.MyTable" / "dbo.PdC Extraction" with spaces in the table name).

  • spark (SparkSession | None) – Optional SparkSession; when omitted the active session is used.

Returns:

Loaded dataframe.

Return type:

DataFrame

Raises:

RuntimeError – When none of the supported formats can be read from the path.

Example

>>> df = read_lakehouse("BronzeLakehouse", "sales/2024")
fabrictools.write_lakehouse(df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, mode: str = 'overwrite', partition_by: List[str] | None = None, format: str = 'delta', spark: pyspark.sql.SparkSession | None = None, *, normalize_column_names: bool = True, enable_column_mapping: bool = False, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824) None

Write a DataFrame to a Fabric Lakehouse (default format: Delta).

Parameters:
  • df (DataFrame) – DataFrame to persist.

  • lakehouse_name (str) – Display name of the target Lakehouse.

  • relative_path (str) – Destination path inside the Lakehouse (e.g. "sales_clean", "Tables/sales_clean", or "dbo.PdC Extraction").

  • mode (str) – Spark write mode: "overwrite" (default), "append", "ignore", or "error".

  • partition_by (list[str] | None) – Optional column names to partition by. Each name is resolved like fabrictools.clean_data() / fabrictools.merge_dataframes() (physical name, normalized unique label, or snake_case). Auto-detected date partitions are appended when present on df.

  • format (str) – "delta" (default), "parquet", or "csv".

  • spark (SparkSession | None) – Optional SparkSession; when omitted the active session is used.

  • normalize_column_names (bool) – If True (default), run fabrictools.rename_columns_normalized() before resolving partition_by and writing. If False, keep physical column names unchanged.

  • enable_column_mapping (bool) – If True and format="delta", writes table properties required for Delta column mapping (mode name), allowing column names with spaces or special characters.

  • auto_partition – If True (default), automatically partition the data by detected date columns if they exist.

Example

>>> write_lakehouse(
...     df, "SilverLakehouse", "sales_clean", mode="overwrite", partition_by=["year"]
... )
fabrictools.merge_lakehouse(source_df: pyspark.sql.DataFrame, lakehouse_name: str, relative_path: str, merge_condition: str, update_set: dict | None = None, insert_set: dict | None = None, spark: pyspark.sql.SparkSession | None = None) None

Upsert (merge) a DataFrame into an existing Delta table in a Lakehouse.

Uses Delta Lake DeltaTable.forPath. When update_set and/or insert_set are None, whenMatchedUpdateAll / whenNotMatchedInsertAll are used.

Parameters:
  • source_df (DataFrame) – Rows to merge into the target table.

  • lakehouse_name (str) – Lakehouse display name holding the target table.

  • relative_path (str) – Path of the Delta table inside the Lakehouse (same rules as write_lakehouse(), including schema.table with spaces).

  • merge_condition (str) – SQL predicate joining source and target (e.g. "src.id = tgt.id").

  • update_set (dict | None) – {target_col: source_expr} for matched rows, or None to update all columns.

  • insert_set (dict | None) – {target_col: source_expr} for new rows, or None to insert all columns.

  • spark (SparkSession | None) – Optional SparkSession; when omitted the active session is used.

Example

>>> merge_lakehouse(
...     new_df,
...     "SilverLakehouse",
...     "sales_clean",
...     merge_condition="src.id = tgt.id",
... )
fabrictools.delete_all_lakehouse_tables(lakehouse_name: str, include_schemas: List[str] | None = None, exclude_tables: List[str] | None = None, continue_on_error: bool = False) dict[str, Any]

Hard-delete all discovered Lakehouse table folders.

Tables are discovered as Tables/<schema>/<table> and removed with notebookutils.fs.rm(<abfs>/Tables/<schema>/<table>, recurse=True).

Parameters:
  • lakehouse_name (str) – Lakehouse display name to purge.

  • include_schemas (list[str] | None) – If set, only these schema names (case-insensitive).

  • exclude_tables (list[str] | None) – Table or schema.table names to skip (case-insensitive).

  • continue_on_error (bool) – If False (default), stop on first delete failure.

Returns:

Summary with counts and per-table relative_path / errors.

Return type:

dict

Raises:

ValueError – When notebookutils is unavailable (not in Fabric).

Example

>>> summary = delete_all_lakehouse_tables(
...     "DevLakehouse",
...     include_schemas=["dbo"],
...     exclude_tables=["dbo.KeepThis"],
...     continue_on_error=True,
... )
fabrictools.read_warehouse(warehouse_name: str, query: str, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame

Run a SQL query on a Fabric Warehouse and return the result as a DataFrame.

The JDBC URL is resolved from the warehouse display name via notebookutils. Authentication uses the signed-in Fabric user token.

Parameters:
  • warehouse_name (str) – Warehouse display name (e.g. "MyWarehouse").

  • query (str) – SQL text (e.g. "SELECT * FROM dbo.sales"). Wrap subqueries in parentheses when needed, e.g. "(SELECT id, name FROM dbo.sales WHERE year = 2024) t".

  • spark (SparkSession | None) – Optional SparkSession; when omitted the active session is used.

Returns:

Query result.

Return type:

DataFrame

Example

>>> df = read_warehouse("MyWarehouse", "SELECT * FROM dbo.sales")
fabrictools.write_warehouse(df: pyspark.sql.DataFrame, warehouse_name: str, table: str, mode: str = 'overwrite', batch_size: int = 10000, spark: pyspark.sql.SparkSession | None = None) None

Write a DataFrame to a Fabric Warehouse table via JDBC.

Parameters:
  • df (DataFrame) – DataFrame to persist.

  • warehouse_name (str) – Target Warehouse display name.

  • table (str) – Fully-qualified table name (e.g. "dbo.sales_clean").

  • mode (str) – Spark write mode: "overwrite" (default), "append", "ignore", or "error".

  • batch_size (int) – Rows per JDBC batch (default 10000).

  • spark (SparkSession | None) – Optional SparkSession; when omitted the active session is used.

Example

>>> write_warehouse(df, "MyWarehouse", "dbo.sales_clean", mode="append")