Transformations DataFrame
Filtres, renommages, jointures et helpers pour colonnes « wide month » et texte.
- fabrictools.filter_by_value_list(df: pyspark.sql.DataFrame, column: str, values: Sequence[Any], *, exclude: bool = True) pyspark.sql.DataFrame
Keep or drop rows where
columnis invalues(no column cast).For string-like dtypes, compares
trim(column)tovalues.strentries invaluesare stripped.- Parameters:
df (DataFrame) – Input dataframe.
column (str) – Logical or physical column name (resolved like
fabrictools.resolve_dataframe_column()). If it does not resolve,dfis returned unchanged.values (collections.abc.Sequence) – Membership list; non-strings kept as-is.
exclude (bool) – If
True(default), drop rows invalues; ifFalse, keep only those rows.
- Returns:
Filtered dataframe.
- Return type:
DataFrame
Example
>>> filtered = filter_by_value_list( ... df, "status", ["VOID", "CANCELLED"], exclude=True ... )
- fabrictools.drop_rows_over_empty_percent(df: pyspark.sql.DataFrame, max_empty_percent: float, *, columns: Sequence[str] | None = None) pyspark.sql.DataFrame
Drop rows where the fraction of empty cells (see
fabrictools.empty_or_null()) exceedsmax_empty_percent.- Parameters:
df (DataFrame) – Input dataframe.
max_empty_percent (float) – Upper bound in
[0, 1]; rows with empty ratio strictly greater than this are removed.columns (collections.abc.Sequence[str] | None) – Columns to score;
Nonemeans all columns. Names resolved likefabrictools.resolve_dataframe_column(); unknown labels are skipped, and if none remain all columns are used.
- Returns:
Filtered dataframe.
- Return type:
DataFrame
- Raises:
ValueError – If
max_empty_percentis outside[0, 1], ifcolumnsis an empty sequence, or if no columns remain to score.
Example
>>> pruned = drop_rows_over_empty_percent( ... df, 0.5, columns=["col_a", "col_b", "col_c"] ... )
- fabrictools.merge_dataframes(main: pyspark.sql.DataFrame, join_df: pyspark.sql.DataFrame, join_columns: Sequence[str], keys: Sequence[tuple[str, str]], how: str = 'left', *, join_prefix: str | None = None, join_column_names: Sequence[str] | None = None) pyspark.sql.DataFrame
Left-join
maintojoin_dfand project right columns as{prefix}_{suffix}.Prefix is snake_case from, in order: inferred
join_dfvariable name at the call site, else firstSubqueryAliasonjoin_df’s analyzed plan, elsejoin(seeDEFAULT_JOIN_PREFIX). Passjoin_prefixto force a value. Suffixes matchfabrictools.clean_data()uniqueness rules.- Parameters:
main (DataFrame) – Left dataframe.
join_df (DataFrame) – Right dataframe (only
join_columnsprojected, plus key temps).join_columns (collections.abc.Sequence[str]) – Right-side columns to expose with prefixed names; labels that do not resolve on
join_dfare omitted.join_column_names (collections.abc.Sequence[str] | None) – Optional output names for
join_columns(same order, same length). If omitted,join_columnsnames are reused.keys (collections.abc.Sequence[tuple[str, str]]) –
(main_col, join_col)pairs for the join predicate (AND); names resolved per frame. Pairs where either side does not resolve are skipped; if none resolve,mainis returned unchanged.how (str) – Spark join type (e.g.
left,inner).join_prefix (str | None) – Optional explicit prefix (snake_cased); overrides inference.
- Returns:
Joined dataframe with temporary key columns dropped.
- Return type:
DataFrame
- Raises:
ValueError – If
keysis empty.
Example
>>> out = merge_dataframes( ... orders, ... customers, ... join_columns=["name", "segment"], ... keys=[("customer_id", "id")], ... join_prefix="cust", ... )
- fabrictools.remove_columns(df: pyspark.sql.DataFrame, *columns: str) pyspark.sql.DataFrame
Drop columns by physical name or by the same resolution rules as
fabrictools.merge_dataframes().- Parameters:
df (DataFrame) – Input dataframe.
columns (str) – One or more labels; duplicates resolving to the same physical column are dropped once. Labels that do not resolve to a column on
dfare ignored.
- Returns:
dfwithout the resolved columns (unchanged if every label is unknown).- Return type:
DataFrame
- Raises:
ValueError – If no names are passed.
Example
>>> slim = remove_columns(df, "temp_flag", "raw_json_blob")
- fabrictools.rename_columns_normalized(df: pyspark.sql.DataFrame) pyspark.sql.DataFrame
Rename every column to snake_case with
_2,_3, … disambiguation.Uses the same name scheme as the rename step in
fabrictools.clean_data(). Does not cast types, replace blanks, deduplicate rows, or drop rows.- Parameters:
df (DataFrame) – Input dataframe.
- Returns:
Dataframe with updated column names where needed.
- Return type:
DataFrame
Example
>>> renamed = rename_columns_normalized(messy_cols_df)
- fabrictools.rename_columns_pq_serial_to_dates(df: pyspark.sql.DataFrame, *, date_format: str = '%Y-%m-%d', prefix: str = '', include_suffix_in_name: bool = True) pyspark.sql.DataFrame
Rename columns whose names embed a Power Query / Excel day serial (epoch
PQ_EPOCH).Non-matching columns are unchanged. Target collisions get
_2,_3, … suffixes.- Parameters:
- Returns:
Dataframe with renamed columns.
- Return type:
DataFrame
Example
>>> dated = rename_columns_pq_serial_to_dates( ... pq_wide_df, date_format="%Y-%m-%d", prefix="d_" ... )
- fabrictools.rename_columns_pq_serial_to_mois_annee(df: pyspark.sql.DataFrame, *, prefix: str = '', include_suffix_in_name: bool = True, capitalize_month: bool = True) pyspark.sql.DataFrame
Like
rename_columns_pq_serial_to_dates()but labels use French mois année (e.g.janvier_2024).- Parameters:
- Returns:
Renamed dataframe.
- Return type:
DataFrame
Example
>>> labeled = rename_columns_pq_serial_to_mois_annee( ... pq_wide_df, prefix="m_", capitalize_month=True ... )
- fabrictools.rename_columns_month_year_block_labels(df: pyspark.sql.DataFrame, *, labels: Sequence[str] = ('Coûts prévisionnels (par mois)', 'Coûts prévisionnels cumulés', 'Avancement prévisionnel', 'CA prévisionnel cumulé', 'CA Monthly'), exclude_columns: Collection[str] = ('__spark_row_order__',)) pyspark.sql.DataFrame
Rename contiguous French mois année column blocks using ordered
labels(projection-style).Order follows
df.columnsafterexclude_columns. Rename targets disambiguate with__2,__3, … among new names, then_2,_3, … against the rest of the schema.- Parameters:
df (DataFrame) – Input wide dataframe.
labels (collections.abc.Sequence[str]) – Block markers in column order (defaults to built-in forecast / CA block set).
exclude_columns (collections.abc.Collection[str]) – Column names ignored when scanning contiguous runs.
- Returns:
Dataframe with renamed month columns.
- Return type:
DataFrame
Example
>>> tagged = rename_columns_month_year_block_labels( ... wide_projection_df, labels=("Block A", "Block B") ... )
- fabrictools.month_start_from_ca_monthly_col(col_name: str) date | None
Parse first-of-month from a column name: French mois année head, optional `` [label]`` suffix stripped.
- Parameters:
col_name (str) – Wide column name (e.g.
janvier_2024 [CA Monthly]).- Returns:
Parsed month start, or
Noneif parsing fails.- Return type:
datetime.date | None
Example
>>> d0 = month_start_from_ca_monthly_col("janvier_2024 [CA Monthly]")
- fabrictools.resolve_dataframe_column(df: pyspark.sql.DataFrame, name: str) str | None
Resolve
nameto the physical column name ondf.Accepts the physical name, a
fabrictools.clean_data()-style normalized label, or snake_case (same rules asfabrictools.merge_dataframes()/fabrictools.remove_columns()).- Parameters:
df (DataFrame) – Dataframe whose schema is searched.
name (str) – Logical, normalized, or physical column label.
- Returns:
Physical column name present on
df, orNoneifnamecannot be resolved.- Return type:
str | None
Example
>>> physical = resolve_dataframe_column(df, "Customer ID")
- fabrictools.wide_value_columns(df: pyspark.sql.DataFrame, *, suffix: str, exclude: Collection[str] = ()) list[str]
List physical columns whose names end with
suffixand are not inexclude.- Parameters:
df (DataFrame) – Wide dataframe.
suffix (str) – Suffix substring to match (e.g. block label including leading space if stored that way).
exclude (collections.abc.Collection[str]) – Column names to skip.
- Returns:
Ordered column names from
df.columns.- Return type:
Example
>>> cols = wide_value_columns(df, suffix=" [CA Monthly]")
- fabrictools.dataframe_unpivot_wide_month_suffix(df: pyspark.sql.DataFrame, *, id_columns: ~collections.abc.Sequence[str], value_columns_suffix: str | None = None, value_columns: ~collections.abc.Sequence[str] | None = None, exclude_columns: ~collections.abc.Collection[str] = (), variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', month_start_from_column_name: ~collections.abc.Callable[[str], ~datetime.date | None] = <function month_start_from_ca_monthly_col>) pyspark.sql.DataFrame
Unpivot wide month columns to long form and parse
month_start_columnfrom the variable name.If
value_columnsis set, it takes precedence overvalue_columns_suffix.- Parameters:
df (DataFrame) – Wide dataframe.
id_columns (collections.abc.Sequence[str]) – Identifier columns kept as-is; labels that do not resolve on
dfare omitted.value_columns_suffix (str | None) – Suffix selecting value columns (via
wide_value_columns()).value_columns (collections.abc.Sequence[str] | None) – Explicit list of value column names (optional).
exclude_columns (collections.abc.Collection[str]) – Excluded from value detection when using suffix.
variable_column (str) – Unpivot variable column name.
value_column (str) – Unpivot value column name.
month_start_column (str) – Output column for parsed month start dates.
month_start_from_column_name (collections.abc.Callable[[str], date | None]) – Callable mapping variable name to
date(default:fabrictools.month_start_from_ca_monthly_col()).
- Returns:
Long dataframe with ids, variable, value, and month start.
- Return type:
DataFrame
Example
>>> long_df = dataframe_unpivot_wide_month_suffix( ... wide_df, ... id_columns=["project_id"], ... value_columns_suffix=" [CA Monthly]", ... )
- fabrictools.dataframe_last_nonnull_wide_month_from_long(long_df: pyspark.sql.DataFrame, *, order_column: str, variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', output_month_start: str = 'MonthStart', output_year: str = 'Year', output_month: str = 'Month', output_value: str = 'Value') pyspark.sql.DataFrame
For each distinct
variable_column, keep the row with greatestorder_columnwherevalue_columnis non-null; emit typed month/value columns.- Parameters:
long_df (DataFrame) – Long dataframe (e.g. from
dataframe_unpivot_wide_month_suffix()).order_column (str) – Tie-break column (descending); must exist on
long_df.variable_column (str) – Month variable name column.
value_column (str) – Measure column.
month_start_column (str) – Parsed month start on
long_df.output_month_start (str) – Output date column name.
output_year (str) – Output year column name.
output_month (str) – Output month-of-year column name.
output_value (str) – Output numeric value column name.
- Returns:
One row per
variable_columnwith cast types, or empty schema if inputs missing.- Return type:
DataFrame
Example
>>> latest = dataframe_last_nonnull_wide_month_from_long( ... long_df, order_column="as_of_date" ... )
- fabrictools.dataframe_pivot_category_wide_month_from_long(long_df: pyspark.sql.DataFrame, *, category_column: str, pivot_categories: Sequence[str], fill_value: float = 0.0, variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', output_year: str = 'Year', output_month: str = 'Month', montant_column: str = 'Montant') pyspark.sql.DataFrame
Sum
value_columnbymonth_start_columnandcategory_column, pivot categories wide, add year/month columns.- Parameters:
long_df (DataFrame) – Long dataframe with month, category, and value.
category_column (str) – Dimension to pivot.
pivot_categories (collections.abc.Sequence[str]) – Category values that become column names.
fill_value (float) – Fill null pivot cells after aggregation.
variable_column (str) – Variable column name (must exist on
long_dffor early-exit checks).value_column (str) – Measure to sum.
month_start_column (str) – Date key for grouping.
output_year (str) – Name of year output column.
output_month (str) – Name of month output column.
montant_column (str) – Internal aggregate column name before pivot.
- Returns:
Wide dataframe
Year,Month, one column per category.- Return type:
DataFrame
- Raises:
ValueError – If
pivot_categoriesis empty.
Example
>>> wide = dataframe_pivot_category_wide_month_from_long( ... long_df, ... category_column="cost_type", ... pivot_categories=("Actual", "Forecast"), ... )
- fabrictools.transform_wide_month_suffix(df: pyspark.sql.DataFrame, *, id_columns: ~collections.abc.Sequence[str], aggregation: ~typing.Literal['last_nonnull', 'pivot_sum'], value_columns_suffix: str | None = None, value_columns: ~collections.abc.Sequence[str] | None = None, exclude_columns: ~collections.abc.Collection[str] = (), variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', month_start_from_column_name: ~collections.abc.Callable[[str], ~datetime.date | None] = <function month_start_from_ca_monthly_col>, order_column: str | None = None, output_value: str = 'Value', output_month_start: str = 'MonthStart', output_year: str = 'Year', output_month: str = 'Month', category_column: str | None = None, pivot_categories: ~collections.abc.Sequence[str] | None = None, fill_value: float = 0.0, montant_column: str = 'Montant') pyspark.sql.DataFrame
Run
dataframe_unpivot_wide_month_suffix()thenlast_nonnullorpivot_sumaggregation.- Parameters:
df (DataFrame) – Wide source dataframe.
id_columns (collections.abc.Sequence[str]) – Passed through to unpivot.
aggregation (Literal['last_nonnull', 'pivot_sum']) –
last_nonnull(needsorder_column) orpivot_sum(needscategory_columnandpivot_categories).value_columns_suffix (str | None) – Passed through to unpivot.
value_columns (collections.abc.Sequence[str] | None) – Passed through to unpivot.
exclude_columns (collections.abc.Collection[str]) – Passed through to unpivot.
variable_column (str) – Long-form variable column name.
value_column (str) – Long-form value column name.
month_start_column (str) – Long-form month start column name.
month_start_from_column_name (collections.abc.Callable[[str], date | None]) – Parser for month start from variable name.
order_column (str | None) – Source-wide column for
last_nonnullordering (resolved ondf). If it does not resolve, the long unpivot result is returned unchanged.output_value (str) – Output value column for
last_nonnull.output_month_start (str) – Output month start for
last_nonnull.output_year (str) – Output year for both aggregations where applicable.
output_month (str) – Output month for both aggregations where applicable.
category_column (str | None) – Source column for
pivot_sum(resolved ondf). If it does not resolve, the long unpivot result is returned unchanged.pivot_categories (collections.abc.Sequence[str] | None) – Category list for
pivot_sum.fill_value (float) – Pivot fill for
pivot_sum.montant_column (str) – Internal sum column name for pivot path.
- Returns:
Aggregated dataframe per selected mode, or the long unpivot only when
order_column/category_columndoes not resolve as above.- Return type:
DataFrame
- Raises:
ValueError – If
aggregationis unknown or required parameters are missing.
Example
>>> summary = transform_wide_month_suffix( ... wide_df, ... id_columns=["project_id"], ... aggregation="last_nonnull", ... value_columns_suffix=" [CA Monthly]", ... order_column="snapshot_date", ... )
- fabrictools.norm_text(expr: pyspark.sql.Column | str) pyspark.sql.Column
Lowercase string with control chars stripped and spaces removed (Power Query
Text.Cleanstyle).If
expris astr, it is wrapped withF.lit.- Parameters:
expr (Column | str) – Spark column or string literal.
- Returns:
Transformed column expression.
- Return type:
Column
Example
>>> df.withColumn("key_norm", norm_text("Customer Name"))
- fabrictools.empty_or_null(c: pyspark.sql.Column) pyspark.sql.Column
Boolean column: true if
cis null or blank after string cast and trim.- Parameters:
c (Column) – Input column expression.
- Returns:
Boolean
Column.- Return type:
Column
Example
>>> df.filter(empty_or_null(F.col("notes")))
- fabrictools.coalesce_dim(src: pyspark.sql.Column) pyspark.sql.Column
String cast of
src; null or blank becomes the literal0as string (dimension-friendly).- Parameters:
src (Column) – Source column.
- Returns:
String
Column.- Return type:
Column
Example
>>> df.withColumn("dim_id", coalesce_dim(F.col("legacy_code")))