Qualité des données et pipelines Silver
Nettoyage, métadonnées Silver, scans et orchestrations bronze vers silver.
- fabrictools.clean_data(df: pyspark.sql.DataFrame, drop_duplicates: bool = True, drop_all_null_rows: bool = True, verbose: bool = False) pyspark.sql.DataFrame
Normalize names, trim empty strings to null, infer types, optionally dedupe.
Renames columns to unique snake_case (via internal helpers), replaces blank strings with null on string columns, runs
detect_and_cast_columns(), then optionally drops duplicate rows and rows that are all-null.- Parameters:
- Returns:
Cleaned dataframe.
- Return type:
DataFrame
Example
>>> cleaned = clean_data(raw_df, drop_duplicates=True, drop_all_null_rows=True)
- fabrictools.add_silver_metadata(df: pyspark.sql.DataFrame, source_lakehouse_name: str, source_relative_path: str, source_layer: str = 'bronze', ingestion_timestamp_col: str = 'ingestion_timestamp', source_layer_col: str = 'ingestion_source_layer', source_path_col: str = 'ingestion_source_path', year_col: str = 'ingestion_year', month_col: str = 'ingestion_month', day_col: str = 'ingestion_day', spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) pyspark.sql.DataFrame
Add Silver-layer metadata columns (ingestion time, source path, date parts).
Resolves
source_relative_pathwithfabrictools.io.lakehouse.resolve_lakehouse_read_candidate(). Date partition columns (year_col/month_col/day_col) are derived from the current ingestion date.- Parameters:
df (DataFrame) – Bronze or intermediate dataframe.
source_lakehouse_name (str) – Source Lakehouse display name.
source_relative_path (str) – Source path passed to path resolution.
source_layer (str) – Literal stored in
source_layer_col(defaultbronze).ingestion_timestamp_col (str) – Column name for
current_timestamp().source_layer_col (str) – Column name for the layer literal.
source_path_col (str) – Column name for the resolved relative path string.
year_col (str) – Partition year column name.
month_col (str) – Partition month column name.
day_col (str) – Partition day-of-month column name.
spark (SparkSession | None) – Optional
SparkSessionfor path resolution.
- Returns:
dfwith metadata and partition columns appended/overwritten.- Return type:
DataFrame
Example
>>> silver_df = add_silver_metadata( ... bronze_df, ... source_lakehouse_name="BronzeLakehouse", ... source_relative_path="dbo.RawOrders", ... )
- fabrictools.scan_data_errors(df: pyspark.sql.DataFrame, include_samples: bool = True, display_results: bool = True) dict[str, Any]
Summarize nulls, blanks, duplicates, and normalized-name collisions.
Optionally builds a Plotly figure (requires optional dependency
plotly). Whendisplay_resultsisTrue, shows the summary dataframe and chart in notebook environments viadisplay/figure.show().- Parameters:
- Returns:
Dict with keys
summary_df,figure,issue_totals,collisions, and optionallysample_rows(Spark dataframe, plotly figure, lists, dict).- Return type:
Example
>>> report = scan_data_errors(df, include_samples=True, display_results=False) >>> assert "summary_df" in report
- fabrictools.clean_and_write_data(source_lakehouse_name: str, source_relative_path: str, target_lakehouse_name: str, target_relative_path: str, mode: str = 'overwrite', partition_by: list[str] | None = None, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824, spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) pyspark.sql.DataFrame
Read one Lakehouse path, clean, add Silver metadata, and write the target path.
- Parameters:
source_lakehouse_name (str) – Bronze (or source) Lakehouse name.
source_relative_path (str) – Source
Tables/...or logical path.target_lakehouse_name (str) – Silver (or target) Lakehouse name.
target_relative_path (str) – Destination path for the write.
mode (str) – Spark write mode (e.g.
overwrite,append).partition_by (list[str] | None) – Optional partition columns for
fabrictools.write_lakehouse().auto_partition – If
True(default), automatically partition the data by detected date columns if they exist.auto_partition_threshold_bytes – Threshold in bytes to trigger auto-partitioning.
spark (SparkSession | None) – Optional
SparkSession.
- Returns:
The Silver dataframe that was written.
- Return type:
DataFrame
Example
>>> silver_df = clean_and_write_data( ... "BronzeLakehouse", ... "dbo.Orders", ... "SilverLakehouse", ... "Tables/dbo/Cleaned_Orders", ... mode="overwrite", ... partition_by=["ingestion_year", "ingestion_month"], ... )
- fabrictools.clean_and_write_all_tables(source_lakehouse_name: str, target_lakehouse_name: str, mode: str = 'overwrite', partition_by: list[str] | None = None, auto_partition: bool = True, auto_partition_threshold_bytes: int = 1073741824, tables_config: list[dict[str, Any]] | None = None, include_schemas: list[str] | None = None, exclude_tables: list[str] | None = None, continue_on_error: bool = False, spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) dict[str, Any]
Bulk clean/write (or merge) using discovery or an explicit
tables_config.When
tables_configis omitted, jobs are built fromfabrictools.io.discovery.list_lakehouse_tables_for_pipeline(); target paths use aCleaned_leaf (PascalCase from the source table), e.g.Tables/dbo/projets table→Tables/dbo/Cleaned_ProjetsTable.- Parameters:
source_lakehouse_name (str) – Lakehouse to read from.
target_lakehouse_name (str) – Lakehouse to write or merge into.
mode (str) – Default mode when not overridden per table (
overwrite,append,merge).partition_by (list[str] | None) – Default partition columns for writes.
auto_partition – If
True(default), automatically partition the data by detected date columns if they exist.auto_partition_threshold_bytes – Threshold in bytes to trigger auto-partitioning.
tables_config (list[dict] | None) – Optional list of per-table job dicts (see
pipelines.config).include_schemas (list[str] | None) – Discovery filter: schema allow-list.
exclude_tables (list[str] | None) – Discovery filter: table deny-list.
continue_on_error (bool) – If
False, stop on first failure.spark (SparkSession | None) – Optional
SparkSession.
- Returns:
Summary dict with
total_tables,successful_tables,failed_tables,tables,failures.- Return type:
Example
>>> summary = clean_and_write_all_tables( ... "BronzeLakehouse", ... "SilverLakehouse", ... mode="overwrite", ... include_schemas=["dbo"], ... exclude_tables=["dbo.LegacyArchive"], ... ) >>> summary["successful_tables"] 8