fabrictools.quality.clean

Pure DataFrame cleaning helpers.

fabrictools.quality.clean.add_silver_metadata(df: pyspark.sql.DataFrame, source_lakehouse_name: str, source_relative_path: str, source_layer: str = 'bronze', ingestion_timestamp_col: str = 'ingestion_timestamp', source_layer_col: str = 'ingestion_source_layer', source_path_col: str = 'ingestion_source_path', year_col: str = 'ingestion_year', month_col: str = 'ingestion_month', day_col: str = 'ingestion_day', spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) pyspark.sql.DataFrame

Add Silver-layer metadata columns (ingestion time, source path, date parts).

Resolves source_relative_path with fabrictools.io.lakehouse.resolve_lakehouse_read_candidate(). Date partition columns (year_col / month_col / day_col) are derived from the current ingestion date.

Parameters:
  • df (DataFrame) – Bronze or intermediate dataframe.

  • source_lakehouse_name (str) – Source Lakehouse display name.

  • source_relative_path (str) – Source path passed to path resolution.

  • source_layer (str) – Literal stored in source_layer_col (default bronze).

  • ingestion_timestamp_col (str) – Column name for current_timestamp().

  • source_layer_col (str) – Column name for the layer literal.

  • source_path_col (str) – Column name for the resolved relative path string.

  • year_col (str) – Partition year column name.

  • month_col (str) – Partition month column name.

  • day_col (str) – Partition day-of-month column name.

  • spark (SparkSession | None) – Optional SparkSession for path resolution.

Returns:

df with metadata and partition columns appended/overwritten.

Return type:

DataFrame

Example

>>> silver_df = add_silver_metadata(
...     bronze_df,
...     source_lakehouse_name="BronzeLakehouse",
...     source_relative_path="dbo.RawOrders",
... )
fabrictools.quality.clean.clean_data(df: pyspark.sql.DataFrame, drop_duplicates: bool = True, drop_all_null_rows: bool = True, verbose: bool = False) pyspark.sql.DataFrame

Normalize names, trim empty strings to null, infer types, optionally dedupe.

Renames columns to unique snake_case (via internal helpers), replaces blank strings with null on string columns, runs detect_and_cast_columns(), then optionally drops duplicate rows and rows that are all-null.

Parameters:
  • df (DataFrame) – Input dataframe.

  • drop_duplicates (bool) – If True, call dropDuplicates() after cleaning.

  • drop_all_null_rows (bool) – If True, call dropna(how="all").

Returns:

Cleaned dataframe.

Return type:

DataFrame

Example

>>> cleaned = clean_data(raw_df, drop_duplicates=True, drop_all_null_rows=True)
fabrictools.quality.clean.detect_and_cast_columns(df: pyspark.sql.DataFrame, verbose: bool = False) pyspark.sql.DataFrame[source]

Infer primitive types from string columns and cast when the column is uniform.

Order of detection (first match wins): date (uniform non-null success of a to_date / to_timestamp chain over several patterns—European forms before US for ambiguous day/month; strings with a trailing time-of-day may still yield a calendar day and are cast to date, dropping the time part; US slash dates with 12-hour clock and AM/PM suffix are handled via h:mm[:ss] a patterns), timestamp (to_timestamp with several patterns including US 12h + AM/PM, 24h, plus ISO T), integer (full string matches ^[+-]?\d+$), double (decimal/scientific), else the column remains string. Columns that are all-null are skipped; null cells are kept through casts.

Sets spark.sql.legacy.timeParserPolicy to LEGACY for the duration of the call and restores the previous session value afterward.

Parameters:

df (DataFrame) – Input dataframe.

Returns:

Dataframe with qualifying string columns cast.

Return type:

DataFrame