fabrictools.quality.clean
Pure DataFrame cleaning helpers.
- fabrictools.quality.clean.add_silver_metadata(df: pyspark.sql.DataFrame, source_lakehouse_name: str, source_relative_path: str, source_layer: str = 'bronze', ingestion_timestamp_col: str = 'ingestion_timestamp', source_layer_col: str = 'ingestion_source_layer', source_path_col: str = 'ingestion_source_path', year_col: str = 'ingestion_year', month_col: str = 'ingestion_month', day_col: str = 'ingestion_day', spark: pyspark.sql.SparkSession | None = None, verbose: bool = False) pyspark.sql.DataFrame
Add Silver-layer metadata columns (ingestion time, source path, date parts).
Resolves
source_relative_pathwithfabrictools.io.lakehouse.resolve_lakehouse_read_candidate(). Date partition columns (year_col/month_col/day_col) are derived from the current ingestion date.- Parameters:
df (DataFrame) – Bronze or intermediate dataframe.
source_lakehouse_name (str) – Source Lakehouse display name.
source_relative_path (str) – Source path passed to path resolution.
source_layer (str) – Literal stored in
source_layer_col(defaultbronze).ingestion_timestamp_col (str) – Column name for
current_timestamp().source_layer_col (str) – Column name for the layer literal.
source_path_col (str) – Column name for the resolved relative path string.
year_col (str) – Partition year column name.
month_col (str) – Partition month column name.
day_col (str) – Partition day-of-month column name.
spark (SparkSession | None) – Optional
SparkSessionfor path resolution.
- Returns:
dfwith metadata and partition columns appended/overwritten.- Return type:
DataFrame
Example
>>> silver_df = add_silver_metadata( ... bronze_df, ... source_lakehouse_name="BronzeLakehouse", ... source_relative_path="dbo.RawOrders", ... )
- fabrictools.quality.clean.clean_data(df: pyspark.sql.DataFrame, drop_duplicates: bool = True, drop_all_null_rows: bool = True, verbose: bool = False) pyspark.sql.DataFrame
Normalize names, trim empty strings to null, infer types, optionally dedupe.
Renames columns to unique snake_case (via internal helpers), replaces blank strings with null on string columns, runs
detect_and_cast_columns(), then optionally drops duplicate rows and rows that are all-null.- Parameters:
- Returns:
Cleaned dataframe.
- Return type:
DataFrame
Example
>>> cleaned = clean_data(raw_df, drop_duplicates=True, drop_all_null_rows=True)
- fabrictools.quality.clean.detect_and_cast_columns(df: pyspark.sql.DataFrame, verbose: bool = False) pyspark.sql.DataFrame[source]
Infer primitive types from string columns and cast when the column is uniform.
Order of detection (first match wins): date (uniform non-null success of a
to_date/to_timestampchain over several patterns—European forms before US for ambiguous day/month; strings with a trailing time-of-day may still yield a calendar day and are cast todate, dropping the time part; US slash dates with 12-hour clock and AM/PM suffix are handled viah:mm[:ss] apatterns), timestamp (to_timestampwith several patterns including US 12h + AM/PM, 24h, plus ISOT), integer (full string matches^[+-]?\d+$), double (decimal/scientific), else the column remainsstring. Columns that are all-null are skipped; null cells are kept through casts.Sets
spark.sql.legacy.timeParserPolicytoLEGACYfor the duration of the call and restores the previous session value afterward.- Parameters:
df (DataFrame) – Input dataframe.
- Returns:
Dataframe with qualifying string columns cast.
- Return type:
DataFrame