Source code for fabrictools.io.lakehouse

"""Lakehouse I/O facade module."""

from __future__ import annotations

from typing import Any, List, Optional

from pyspark.sql import DataFrame, SparkSession  # type: ignore[reportMissingImports]
from pyspark.sql.types import IntegralType  # type: ignore[reportMissingImports]

from fabrictools.core import log
from fabrictools.core import (
    build_lakehouse_read_path_candidates,
    build_lakehouse_write_path,
    get_lakehouse_abfs_path,
)
from fabrictools.core import get_spark
from fabrictools.io.discovery import list_lakehouse_tables

# ── Read ─────────────────────────────────────────────────────────────────────


def read_lakehouse(
    lakehouse_name: str,
    relative_path: str,
    spark: Optional[SparkSession] = None,
) -> DataFrame:
    """Read a dataset from a Fabric Lakehouse.

    Tries formats in order: **Delta → Parquet → CSV**. The first format that
    succeeds is used; the detected format is logged with the resulting shape.

    :param lakehouse_name: Display name of the Lakehouse (e.g. ``"BronzeLakehouse"``).
    :param relative_path: Path inside the Lakehouse root, relative to the ABFS base
        (e.g. ``"sales/2024"``, ``"Tables/customers"``, or SQL-style ``"dbo.MyTable"`` /
        ``"dbo.PdC Extraction"`` with spaces in the table name).
    :param spark: Optional ``SparkSession``; when omitted the active session is used.
    :type lakehouse_name: str
    :type relative_path: str
    :type spark: ~pyspark.sql.SparkSession | None

    :returns: Loaded dataframe.
    :rtype: ~pyspark.sql.DataFrame

    :raises RuntimeError: When none of the supported formats can be read from the path.

    .. rubric:: Example

    >>> df = read_lakehouse("BronzeLakehouse", "sales/2024")  # doctest: +SKIP
    """
    _spark = spark or get_spark()
    base = get_lakehouse_abfs_path(lakehouse_name)
    candidate_relative_paths = build_lakehouse_read_path_candidates(relative_path)

    failures: list[str] = []
    for candidate_relative_path in candidate_relative_paths:
        full_path = f"{base}/{candidate_relative_path}"
        try:
            df = _try_read_formats(_spark, full_path)
            if candidate_relative_path != relative_path:
                log(
                    f"  Resolved relative_path '{relative_path}' -> '{candidate_relative_path}'"
                )
            log(f"  {len(df.columns)} columns")
            return df
        except RuntimeError as exc:
            failures.append(f"{full_path} ({exc})")

    attempted_paths = ", ".join(
        f"'{base}/{candidate}'" for candidate in candidate_relative_paths
    )
    raise RuntimeError(
        f"Could not read from any candidate path for relative_path='{relative_path}'. "
        f"Tried: {attempted_paths}. Details: {' | '.join(failures)}"
    )


[docs] def resolve_lakehouse_read_candidate( lakehouse_name: str, relative_path: str, spark: Optional[SparkSession] = None, ) -> str: """Resolve the best candidate relative path for a Lakehouse read. If candidate generation yields a single path, return it directly. If multiple candidates exist, try each path and return the first readable one. :param lakehouse_name: Display name of the Lakehouse. :param relative_path: Logical path under the Lakehouse root (slash path or SQL-style ``schema.table``, e.g. ``dbo.PdC Extraction``). :param spark: Optional ``SparkSession``; when omitted the active session is used. :type lakehouse_name: str :type relative_path: str :type spark: ~pyspark.sql.SparkSession | None :returns: Relative path string that was verified readable. :rtype: str :raises RuntimeError: When no candidate path can be read. .. rubric:: Example >>> resolved = resolve_lakehouse_read_candidate( # doctest: +SKIP ... "BronzeLakehouse", "dbo.SalesOrders" ... ) """ _spark = spark or get_spark() base = get_lakehouse_abfs_path(lakehouse_name) candidate_relative_paths = build_lakehouse_read_path_candidates(relative_path) if len(candidate_relative_paths) == 1: return candidate_relative_paths[0] failures: list[str] = [] for candidate_relative_path in candidate_relative_paths: full_path = f"{base}/{candidate_relative_path}" try: _try_read_formats(_spark, full_path) if candidate_relative_path != relative_path: log( f" Resolved relative_path '{relative_path}' -> " f"'{candidate_relative_path}'" ) return candidate_relative_path except RuntimeError as exc: failures.append(f"{full_path} ({exc})") attempted_paths = ", ".join( f"'{base}/{candidate}'" for candidate in candidate_relative_paths ) raise RuntimeError( f"Could not resolve a readable candidate for relative_path='{relative_path}'. " f"Tried: {attempted_paths}. Details: {' | '.join(failures)}" )
def _try_read_formats(spark: SparkSession, full_path: str) -> DataFrame: """Attempt Delta → Parquet → CSV, return the first successful DataFrame.""" # Delta (preferred in Fabric) try: df = spark.read.format("delta").load(full_path) log(" Format detected: Delta") return df except Exception: pass # Parquet try: df = ( spark.read.option("datetimeRebaseMode", "CORRECTED") .format("parquet") .load(full_path) ) log(" Format detected: Parquet") return df except Exception: pass # CSV — last resort try: df = ( spark.read.option("header", "true") .option("inferSchema", "true") .option("multiLine", "true") .option("escape", '"') .csv(full_path) ) log(" Format detected: CSV") return df except Exception as exc: raise RuntimeError( f"Could not read '{full_path}' as Delta, Parquet, or CSV: {exc}" ) from exc # ── Write ──────────────────────────────────────────────────────────────────── def _dedupe_preserve_order(values: list[str]) -> list[str]: """Return a list without duplicates while preserving insertion order.""" seen: set[str] = set() deduped: list[str] = [] for value in values: if value not in seen: seen.add(value) deduped.append(value) return deduped def _enable_delta_column_mapping_on_path(spark: SparkSession, full_path: str) -> None: """Upgrade an existing Delta table path to column mapping mode ``name``.""" escaped_path = full_path.replace("`", "``") spark.sql( f""" ALTER TABLE delta.`{escaped_path}` SET TBLPROPERTIES ( 'delta.columnMapping.mode' = 'name', 'delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5' ) """ ) def _detect_partition_columns( df: DataFrame, threshold_bytes: int = 1_073_741_824 ) -> list[str]: """ Auto-detect best partition columns (like year/month or categorical columns) only if the dataset is large enough and the column has low cardinality. """ import pyspark.sql.functions as F # 1. Vérification de la volumétrie (Fast Fail) if threshold_bytes > 0: try: size_in_bytes = int( df._jdf.queryExecution().optimizedPlan().stats().sizeInBytes() ) if size_in_bytes < threshold_bytes: return [] except Exception: pass # Si l'estimation échoue, on continue # 2. Identification des candidats par heuristique (noms) # On regarde les types de base from pyspark.sql.types import DateType, StringType candidates = [] # On garde les préférences temporelles en priorité time_keywords = {"year", "annee", "month", "mois"} categorical_keywords = {"country", "region", "status", "type", "category"} for f in df.schema.fields: if not isinstance(f.dataType, (DateType, IntegralType, StringType)): continue name_lower = f.name.lower() # Temporel ? if any(kw in name_lower for kw in time_keywords) and not name_lower.endswith( "_id" ): candidates.append(f.name) continue # Catégoriel ? if any( kw in name_lower for kw in categorical_keywords ) and not name_lower.endswith("_id"): candidates.append(f.name) if not candidates: return [] # 3. Évaluation de la Cardinalité (Le test décisif) # On limite à 5 candidats pour ne pas trop pénaliser les performances candidates_to_test = candidates[:5] try: exprs = [F.approx_count_distinct(c).alias(c) for c in candidates_to_test] cardinalities = df.agg(*exprs).collect()[0].asDict() except Exception: return [] # En cas d'erreur de calcul, on préfère ne pas partitionner # 4. Sélection Finale valid_columns = [] for col, count in cardinalities.items(): if 1 < count < 1000: valid_columns.append(col) if not valid_columns: return [] # Trier pour prioriser l'année, puis le mois, puis les autres def sort_key(col_name: str) -> int: name_lower = col_name.lower() if "year" in name_lower or "annee" in name_lower: return 0 if "month" in name_lower or "mois" in name_lower: return 1 return 2 valid_columns.sort(key=sort_key) # On ne retourne pas plus de 2 colonnes pour éviter de trop scinder return valid_columns[:2] def write_lakehouse( df: DataFrame, lakehouse_name: str, relative_path: str, mode: str = "overwrite", partition_by: Optional[List[str]] = None, format: str = "delta", spark: Optional[SparkSession] = None, *, normalize_column_names: bool = True, enable_column_mapping: bool = False, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1_073_741_824, ) -> None: """Write a ``DataFrame`` to a Fabric Lakehouse (default format: Delta). :param df: DataFrame to persist. :param lakehouse_name: Display name of the target Lakehouse. :param relative_path: Destination path inside the Lakehouse (e.g. ``"sales_clean"``, ``"Tables/sales_clean"``, or ``"dbo.PdC Extraction"``). :param mode: Spark write mode: ``"overwrite"`` (default), ``"append"``, ``"ignore"``, or ``"error"``. :param partition_by: Optional column names to partition by. Each name is resolved like :py:func:`fabrictools.clean_data` / :py:func:`fabrictools.merge_dataframes` (physical name, normalized unique label, or snake_case). Auto-detected date partitions are appended when present on ``df``. :param format: ``"delta"`` (default), ``"parquet"``, or ``"csv"``. :param spark: Optional ``SparkSession``; when omitted the active session is used. :param normalize_column_names: If ``True`` (default), run :py:func:`fabrictools.rename_columns_normalized` before resolving ``partition_by`` and writing. If ``False``, keep physical column names unchanged. :param enable_column_mapping: If ``True`` and ``format="delta"``, writes table properties required for Delta column mapping (mode ``name``), allowing column names with spaces or special characters. :param auto_partition: If ``True`` (default), automatically partition the data by detected date columns if they exist. :type df: ~pyspark.sql.DataFrame :type lakehouse_name: str :type relative_path: str :type mode: str :type partition_by: list[str] | None :type format: str :type spark: ~pyspark.sql.SparkSession | None :type normalize_column_names: bool :type enable_column_mapping: bool .. rubric:: Example >>> write_lakehouse( # doctest: +SKIP ... df, "SilverLakehouse", "sales_clean", mode="overwrite", partition_by=["year"] ... ) """ _ = spark or get_spark() # validates spark availability early base = get_lakehouse_abfs_path(lakehouse_name) resolved_relative_path = build_lakehouse_write_path(relative_path) full_path = f"{base}/{resolved_relative_path}" if resolved_relative_path != relative_path: log( f"Auto-corrected write relative_path '{relative_path}' " f"-> '{resolved_relative_path}'" ) log( f"Writing to Lakehouse '{lakehouse_name}' → {full_path} " f"[format={format}, mode={mode}]" ) # Lazy import: fabrictools.transform.columns → quality.clean → fabrictools.io # would otherwise create an import cycle while io.__init__ loads lakehouse. from fabrictools.transform.columns import ( # noqa: PLC0415 _resolve_column_name, rename_columns_normalized, ) if normalize_column_names: original_cols = list(df.columns) df = rename_columns_normalized(df) if list(df.columns) != original_cols: log(" Column names normalized (clean_data-style) before write") user_partitions = [ p for p in ( _resolve_column_name(df, col, side="DataFrame") for col in (partition_by or []) ) if p is not None ] auto_detected_partitions = ( _detect_partition_columns(df, threshold_bytes=auto_partition_threshold_bytes) if auto_partition else [] ) effective_partition_by = _dedupe_preserve_order( user_partitions + auto_detected_partitions ) writer = df.write.format(format).option("overwriteSchema", "true").mode(mode) if format.lower() == "parquet": writer = writer.option("datetimeRebaseMode", "CORRECTED") elif format.lower() == "delta" and enable_column_mapping: # If target already exists as a Delta table, upgrade protocol first so # overwrite with business-friendly names (spaces, capitals, etc.) works. try: from delta.tables import DeltaTable # type: ignore[import-untyped] # noqa: PLC0415 if DeltaTable.isDeltaTable(_, full_path): _enable_delta_column_mapping_on_path(_, full_path) except Exception: # Non-blocking: the write options below still apply for new tables. pass writer = ( writer.option("delta.columnMapping.mode", "name") .option("delta.minReaderVersion", "2") .option("delta.minWriterVersion", "5") ) if effective_partition_by: writer = writer.partitionBy(*effective_partition_by) if auto_detected_partitions: log(" Auto-detected partitions: " + ", ".join(auto_detected_partitions)) log(" Partition columns: " + ", ".join(effective_partition_by)) writer.save(full_path) log(f" Write complete → {full_path}") # ── Merge (upsert) ──────────────────────────────────────────────────────────── def merge_lakehouse( source_df: DataFrame, lakehouse_name: str, relative_path: str, merge_condition: str, update_set: Optional[dict] = None, insert_set: Optional[dict] = None, spark: Optional[SparkSession] = None, ) -> None: """Upsert (merge) a ``DataFrame`` into an existing Delta table in a Lakehouse. Uses Delta Lake ``DeltaTable.forPath``. When ``update_set`` and/or ``insert_set`` are ``None``, ``whenMatchedUpdateAll`` / ``whenNotMatchedInsertAll`` are used. :param source_df: Rows to merge into the target table. :param lakehouse_name: Lakehouse display name holding the target table. :param relative_path: Path of the Delta table inside the Lakehouse (same rules as :py:func:`write_lakehouse`, including ``schema.table`` with spaces). :param merge_condition: SQL predicate joining source and target (e.g. ``"src.id = tgt.id"``). :param update_set: ``{target_col: source_expr}`` for matched rows, or ``None`` to update all columns. :param insert_set: ``{target_col: source_expr}`` for new rows, or ``None`` to insert all columns. :param spark: Optional ``SparkSession``; when omitted the active session is used. :type source_df: ~pyspark.sql.DataFrame :type lakehouse_name: str :type relative_path: str :type merge_condition: str :type update_set: dict | None :type insert_set: dict | None :type spark: ~pyspark.sql.SparkSession | None .. rubric:: Example >>> merge_lakehouse( # doctest: +SKIP ... new_df, ... "SilverLakehouse", ... "sales_clean", ... merge_condition="src.id = tgt.id", ... ) """ from delta.tables import DeltaTable # type: ignore[import-untyped] # noqa: PLC0415 _spark = spark or get_spark() base = get_lakehouse_abfs_path(lakehouse_name) resolved_relative_path = build_lakehouse_write_path(relative_path) full_path = f"{base}/{resolved_relative_path}" if resolved_relative_path != relative_path: log( f"Auto-corrected merge relative_path '{relative_path}' " f"-> '{resolved_relative_path}'" ) log(f"Merging into Lakehouse '{lakehouse_name}' → {full_path}") log(f" Condition: {merge_condition}") target = DeltaTable.forPath(_spark, full_path) merge_builder = target.alias("tgt").merge(source_df.alias("src"), merge_condition) if update_set is not None: merge_builder = merge_builder.whenMatchedUpdate(set=update_set) else: merge_builder = merge_builder.whenMatchedUpdateAll() if insert_set is not None: merge_builder = merge_builder.whenNotMatchedInsert(values=insert_set) else: merge_builder = merge_builder.whenNotMatchedInsertAll() merge_builder.execute() log(" Merge complete") def delete_all_lakehouse_tables( lakehouse_name: str, include_schemas: Optional[List[str]] = None, exclude_tables: Optional[List[str]] = None, continue_on_error: bool = False, ) -> dict[str, Any]: """Hard-delete all discovered Lakehouse table folders. Tables are discovered as ``Tables/<schema>/<table>`` and removed with ``notebookutils.fs.rm(<abfs>/Tables/<schema>/<table>, recurse=True)``. :param lakehouse_name: Lakehouse display name to purge. :param include_schemas: If set, only these schema names (case-insensitive). :param exclude_tables: Table or ``schema.table`` names to skip (case-insensitive). :param continue_on_error: If ``False`` (default), stop on first delete failure. :type lakehouse_name: str :type include_schemas: list[str] | None :type exclude_tables: list[str] | None :type continue_on_error: bool :returns: Summary with counts and per-table ``relative_path`` / errors. :rtype: dict :raises ValueError: When ``notebookutils`` is unavailable (not in Fabric). .. rubric:: Example >>> summary = delete_all_lakehouse_tables( # doctest: +SKIP ... "DevLakehouse", ... include_schemas=["dbo"], ... exclude_tables=["dbo.KeepThis"], ... continue_on_error=True, ... ) """ try: import notebookutils # type: ignore[import-untyped] # noqa: PLC0415 except ImportError as exc: raise ValueError( "notebookutils is not available — are you running inside " f"Microsoft Fabric? ({exc})" ) from exc base = get_lakehouse_abfs_path(lakehouse_name) table_paths = list_lakehouse_tables( lakehouse_name=lakehouse_name, include_schemas=include_schemas, exclude_tables=exclude_tables, ) if not table_paths: log( f"No tables found in Lakehouse '{lakehouse_name}' for purge.", level="warning", ) return { "total_tables": 0, "deleted_tables": 0, "failed_tables": 0, "tables": [], "failures": [], } deleted_entries: list[dict[str, str]] = [] failure_entries: list[dict[str, str]] = [] total_tables = len(table_paths) for index, table_relative_path in enumerate(table_paths, start=1): try: full_path = f"{base}/{table_relative_path}" log(f"[{index}/{total_tables}] Hard-deleting table path '{full_path}'...") notebookutils.fs.rm(full_path, recurse=True) deleted_entries.append( { "relative_path": table_relative_path, "path": full_path, } ) except Exception as exc: failure_entries.append( { "relative_path": table_relative_path, "path": f"{base}/{table_relative_path}", "error": str(exc), } ) log( f"[{index}/{total_tables}] Failed to hard-delete '{table_relative_path}': {exc}", level="warning", ) if not continue_on_error: raise return { "total_tables": total_tables, "deleted_tables": len(deleted_entries), "failed_tables": len(failure_entries), "tables": deleted_entries, "failures": failure_entries, } __all__ = [ "read_lakehouse", "resolve_lakehouse_read_candidate", "write_lakehouse", "merge_lakehouse", "delete_all_lakehouse_tables", ]