Qualité des données et pipelines Silver

Nettoyage, métadonnées Silver, scans et orchestrations bronze vers silver.

fabrictools.clean_data(df: pyspark.sql.DataFrame, drop_duplicates: bool = True, drop_all_null_rows: bool = True, verbose: bool = False) pyspark.sql.DataFrame

Normalize names, trim empty strings to null, infer types, optionally dedupe.

Renames columns to unique snake_case (via internal helpers), replaces blank strings with null on string columns, runs detect_and_cast_columns(), then optionally drops duplicate rows and rows that are all-null.

Parameters:
  • df (DataFrame) – Input dataframe.

  • drop_duplicates (bool) – If True, call dropDuplicates() after cleaning.

  • drop_all_null_rows (bool) – If True, call dropna(how="all").

Returns:

Cleaned dataframe.

Return type:

DataFrame

Example

>>> cleaned = clean_data(raw_df, drop_duplicates=True, drop_all_null_rows=True)
fabrictools.add_silver_metadata(df: pyspark.sql.DataFrame, source_lakehouse_name: str, source_relative_path: str, source_layer: str = 'bronze', ingestion_timestamp_col: str = 'ingestion_timestamp', source_layer_col: str = 'ingestion_source_layer', source_path_col: str = 'ingestion_source_path', year_col: str = 'ingestion_year', month_col: str = 'ingestion_month', day_col: str = 'ingestion_day', spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) pyspark.sql.DataFrame

Add Silver-layer metadata columns (ingestion time, source path, date parts).

Resolves source_relative_path with fabrictools.io.lakehouse.resolve_lakehouse_read_candidate(). Date partition columns (year_col / month_col / day_col) are derived from the current ingestion date.

Parameters:
  • df (DataFrame) – Bronze or intermediate dataframe.

  • source_lakehouse_name (str) – Source Lakehouse display name.

  • source_relative_path (str) – Source path passed to path resolution.

  • source_layer (str) – Literal stored in source_layer_col (default bronze).

  • ingestion_timestamp_col (str) – Column name for current_timestamp().

  • source_layer_col (str) – Column name for the layer literal.

  • source_path_col (str) – Column name for the resolved relative path string.

  • year_col (str) – Partition year column name.

  • month_col (str) – Partition month column name.

  • day_col (str) – Partition day-of-month column name.

  • spark (SparkSession | None) – Optional SparkSession for path resolution.

Returns:

df with metadata and partition columns appended/overwritten.

Return type:

DataFrame

Example

>>> silver_df = add_silver_metadata(
...     bronze_df,
...     source_lakehouse_name="BronzeLakehouse",
...     source_relative_path="dbo.RawOrders",
... )
fabrictools.scan_data_errors(df: pyspark.sql.DataFrame, include_samples: bool = True, display_results: bool = True) dict[str, Any]

Summarize nulls, blanks, duplicates, and normalized-name collisions.

Optionally builds a Plotly figure (requires optional dependency plotly). When display_results is True, shows the summary dataframe and chart in notebook environments via display / figure.show().

Parameters:
  • df (DataFrame) – Dataset to profile.

  • include_samples (bool) – If True, add sample_rows (up to 10 dicts) to the report.

  • display_results (bool) – If True, render summary_df and the chart when available.

Returns:

Dict with keys summary_df, figure, issue_totals, collisions, and optionally sample_rows (Spark dataframe, plotly figure, lists, dict).

Return type:

dict

Example

>>> report = scan_data_errors(df, include_samples=True, display_results=False)
>>> assert "summary_df" in report
fabrictools.clean_and_write_data(source_lakehouse_name: str, source_relative_path: str, target_lakehouse_name: str, target_relative_path: str, mode: str = 'overwrite', partition_by: list[str] | None = None, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824, spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) pyspark.sql.DataFrame

Read one Lakehouse path, clean, add Silver metadata, and write the target path.

Parameters:
  • source_lakehouse_name (str) – Bronze (or source) Lakehouse name.

  • source_relative_path (str) – Source Tables/... or logical path.

  • target_lakehouse_name (str) – Silver (or target) Lakehouse name.

  • target_relative_path (str) – Destination path for the write.

  • mode (str) – Spark write mode (e.g. overwrite, append).

  • partition_by (list[str] | None) – Optional partition columns for fabrictools.write_lakehouse().

  • auto_partition – If True (default), automatically partition the data by detected date columns if they exist.

  • auto_partition_threshold_bytes – Threshold in bytes to trigger auto-partitioning.

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

The Silver dataframe that was written.

Return type:

DataFrame

Example

>>> silver_df = clean_and_write_data(
...     "BronzeLakehouse",
...     "dbo.Orders",
...     "SilverLakehouse",
...     "Tables/dbo/Cleaned_Orders",
...     mode="overwrite",
...     partition_by=["ingestion_year", "ingestion_month"],
... )
fabrictools.clean_and_write_all_tables(source_lakehouse_name: str, target_lakehouse_name: str, mode: str = 'overwrite', partition_by: list[str] | None = None, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824, tables_config: list[dict[str, Any]] | None = None, include_schemas: list[str] | None = None, exclude_tables: list[str] | None = None, continue_on_error: bool = False, spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) dict[str, Any]

Bulk clean/write (or merge) using discovery or an explicit tables_config.

When tables_config is omitted, jobs are built from fabrictools.io.discovery.list_lakehouse_tables_for_pipeline(); target paths use a Cleaned_ leaf (PascalCase from the source table), e.g. Tables/dbo/projets tableTables/dbo/Cleaned_ProjetsTable.

Parameters:
  • source_lakehouse_name (str) – Lakehouse to read from.

  • target_lakehouse_name (str) – Lakehouse to write or merge into.

  • mode (str) – Default mode when not overridden per table (overwrite, append, merge).

  • partition_by (list[str] | None) – Default partition columns for writes.

  • auto_partition – If True (default), automatically partition the data by detected date columns if they exist.

  • auto_partition_threshold_bytes – Threshold in bytes to trigger auto-partitioning.

  • tables_config (list[dict] | None) – Optional list of per-table job dicts (see pipelines.config).

  • include_schemas (list[str] | None) – Discovery filter: schema allow-list.

  • exclude_tables (list[str] | None) – Discovery filter: table deny-list.

  • continue_on_error (bool) – If False, stop on first failure.

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

Summary dict with total_tables, successful_tables, failed_tables, tables, failures.

Return type:

dict

Example

>>> summary = clean_and_write_all_tables(
...     "BronzeLakehouse",
...     "SilverLakehouse",
...     mode="overwrite",
...     include_schemas=["dbo"],
...     exclude_tables=["dbo.LegacyArchive"],
... )
>>> summary["successful_tables"]
8