Transformations DataFrame

Filtres, renommages, jointures et helpers pour colonnes « wide month » et texte.

fabrictools.filter_by_value_list(df: pyspark.sql.DataFrame, column: str, values: Sequence[Any], *, exclude: bool = True) pyspark.sql.DataFrame

Keep or drop rows where column is in values (no column cast).

For string-like dtypes, compares trim(column) to values. str entries in values are stripped.

Parameters:
  • df (DataFrame) – Input dataframe.

  • column (str) – Logical or physical column name (resolved like fabrictools.resolve_dataframe_column()). If it does not resolve, df is returned unchanged.

  • values (collections.abc.Sequence) – Membership list; non-strings kept as-is.

  • exclude (bool) – If True (default), drop rows in values; if False, keep only those rows.

Returns:

Filtered dataframe.

Return type:

DataFrame

Example

>>> filtered = filter_by_value_list(
...     df, "status", ["VOID", "CANCELLED"], exclude=True
... )
fabrictools.drop_rows_over_empty_percent(df: pyspark.sql.DataFrame, max_empty_percent: float, *, columns: Sequence[str] | None = None) pyspark.sql.DataFrame

Drop rows where the fraction of empty cells (see fabrictools.empty_or_null()) exceeds max_empty_percent.

Parameters:
  • df (DataFrame) – Input dataframe.

  • max_empty_percent (float) – Upper bound in [0, 1]; rows with empty ratio strictly greater than this are removed.

  • columns (collections.abc.Sequence[str] | None) – Columns to score; None means all columns. Names resolved like fabrictools.resolve_dataframe_column(); unknown labels are skipped, and if none remain all columns are used.

Returns:

Filtered dataframe.

Return type:

DataFrame

Raises:

ValueError – If max_empty_percent is outside [0, 1], if columns is an empty sequence, or if no columns remain to score.

Example

>>> pruned = drop_rows_over_empty_percent(
...     df, 0.5, columns=["col_a", "col_b", "col_c"]
... )
fabrictools.merge_dataframes(main: pyspark.sql.DataFrame, join_df: pyspark.sql.DataFrame, join_columns: Sequence[str], keys: Sequence[tuple[str, str]], how: str = 'left', *, join_prefix: str | None = None, join_column_names: Sequence[str] | None = None) pyspark.sql.DataFrame

Left-join main to join_df and project right columns as {prefix}_{suffix}.

Prefix is snake_case from, in order: inferred join_df variable name at the call site, else first SubqueryAlias on join_df’s analyzed plan, else join (see DEFAULT_JOIN_PREFIX). Pass join_prefix to force a value. Suffixes match fabrictools.clean_data() uniqueness rules.

Parameters:
  • main (DataFrame) – Left dataframe.

  • join_df (DataFrame) – Right dataframe (only join_columns projected, plus key temps).

  • join_columns (collections.abc.Sequence[str]) – Right-side columns to expose with prefixed names; labels that do not resolve on join_df are omitted.

  • join_column_names (collections.abc.Sequence[str] | None) – Optional output names for join_columns (same order, same length). If omitted, join_columns names are reused.

  • keys (collections.abc.Sequence[tuple[str, str]]) – (main_col, join_col) pairs for the join predicate (AND); names resolved per frame. Pairs where either side does not resolve are skipped; if none resolve, main is returned unchanged.

  • how (str) – Spark join type (e.g. left, inner).

  • join_prefix (str | None) – Optional explicit prefix (snake_cased); overrides inference.

Returns:

Joined dataframe with temporary key columns dropped.

Return type:

DataFrame

Raises:

ValueError – If keys is empty.

Example

>>> out = merge_dataframes(
...     orders,
...     customers,
...     join_columns=["name", "segment"],
...     keys=[("customer_id", "id")],
...     join_prefix="cust",
... )
fabrictools.remove_columns(df: pyspark.sql.DataFrame, *columns: str) pyspark.sql.DataFrame

Drop columns by physical name or by the same resolution rules as fabrictools.merge_dataframes().

Parameters:
  • df (DataFrame) – Input dataframe.

  • columns (str) – One or more labels; duplicates resolving to the same physical column are dropped once. Labels that do not resolve to a column on df are ignored.

Returns:

df without the resolved columns (unchanged if every label is unknown).

Return type:

DataFrame

Raises:

ValueError – If no names are passed.

Example

>>> slim = remove_columns(df, "temp_flag", "raw_json_blob")
fabrictools.rename_columns_normalized(df: pyspark.sql.DataFrame) pyspark.sql.DataFrame

Rename every column to snake_case with _2, _3, … disambiguation.

Uses the same name scheme as the rename step in fabrictools.clean_data(). Does not cast types, replace blanks, deduplicate rows, or drop rows.

Parameters:

df (DataFrame) – Input dataframe.

Returns:

Dataframe with updated column names where needed.

Return type:

DataFrame

Example

>>> renamed = rename_columns_normalized(messy_cols_df)
fabrictools.rename_columns_pq_serial_to_dates(df: pyspark.sql.DataFrame, *, date_format: str = '%Y-%m-%d', prefix: str = '', include_suffix_in_name: bool = True) pyspark.sql.DataFrame

Rename columns whose names embed a Power Query / Excel day serial (epoch PQ_EPOCH).

Non-matching columns are unchanged. Target collisions get _2, _3, … suffixes.

Parameters:
  • df (DataFrame) – Input dataframe.

  • date_format (str) – strftime format for the date portion of new names.

  • prefix (str) – Text prepended before the formatted date.

  • include_suffix_in_name (bool) – If True, append parsed numeric suffix after the serial segment.

Returns:

Dataframe with renamed columns.

Return type:

DataFrame

Example

>>> dated = rename_columns_pq_serial_to_dates(
...     pq_wide_df, date_format="%Y-%m-%d", prefix="d_"
... )
fabrictools.rename_columns_pq_serial_to_mois_annee(df: pyspark.sql.DataFrame, *, prefix: str = '', include_suffix_in_name: bool = True, capitalize_month: bool = True) pyspark.sql.DataFrame

Like rename_columns_pq_serial_to_dates() but labels use French mois année (e.g. janvier_2024).

Parameters:
  • df (DataFrame) – Input dataframe.

  • prefix (str) – Prepended before the month-year token.

  • include_suffix_in_name (bool) – Append _{suffix} when a numeric suffix follows the serial in the source name.

  • capitalize_month (bool) – If True, capitalize the month word (e.g. Janvier_2024).

Returns:

Renamed dataframe.

Return type:

DataFrame

Example

>>> labeled = rename_columns_pq_serial_to_mois_annee(
...     pq_wide_df, prefix="m_", capitalize_month=True
... )
fabrictools.rename_columns_month_year_block_labels(df: pyspark.sql.DataFrame, *, labels: Sequence[str] = ('Coûts prévisionnels (par mois)', 'Coûts prévisionnels cumulés', 'Avancement prévisionnel', 'CA prévisionnel cumulé', 'CA Monthly'), exclude_columns: Collection[str] = ('__spark_row_order__',)) pyspark.sql.DataFrame

Rename contiguous French mois année column blocks using ordered labels (projection-style).

Order follows df.columns after exclude_columns. Rename targets disambiguate with __2, __3, … among new names, then _2, _3, … against the rest of the schema.

Parameters:
Returns:

Dataframe with renamed month columns.

Return type:

DataFrame

Example

>>> tagged = rename_columns_month_year_block_labels(
...     wide_projection_df, labels=("Block A", "Block B")
... )
fabrictools.month_start_from_ca_monthly_col(col_name: str) date | None

Parse first-of-month from a column name: French mois année head, optional `` [label]`` suffix stripped.

Parameters:

col_name (str) – Wide column name (e.g. janvier_2024 [CA Monthly]).

Returns:

Parsed month start, or None if parsing fails.

Return type:

datetime.date | None

Example

>>> d0 = month_start_from_ca_monthly_col("janvier_2024 [CA Monthly]")
fabrictools.resolve_dataframe_column(df: pyspark.sql.DataFrame, name: str) str | None

Resolve name to the physical column name on df.

Accepts the physical name, a fabrictools.clean_data()-style normalized label, or snake_case (same rules as fabrictools.merge_dataframes() / fabrictools.remove_columns()).

Parameters:
  • df (DataFrame) – Dataframe whose schema is searched.

  • name (str) – Logical, normalized, or physical column label.

Returns:

Physical column name present on df, or None if name cannot be resolved.

Return type:

str | None

Example

>>> physical = resolve_dataframe_column(df, "Customer ID")
fabrictools.wide_value_columns(df: pyspark.sql.DataFrame, *, suffix: str, exclude: Collection[str] = ()) list[str]

List physical columns whose names end with suffix and are not in exclude.

Parameters:
  • df (DataFrame) – Wide dataframe.

  • suffix (str) – Suffix substring to match (e.g. block label including leading space if stored that way).

  • exclude (collections.abc.Collection[str]) – Column names to skip.

Returns:

Ordered column names from df.columns.

Return type:

list[str]

Example

>>> cols = wide_value_columns(df, suffix=" [CA Monthly]")
fabrictools.dataframe_unpivot_wide_month_suffix(df: pyspark.sql.DataFrame, *, id_columns: ~collections.abc.Sequence[str], value_columns_suffix: str | None = None, value_columns: ~collections.abc.Sequence[str] | None = None, exclude_columns: ~collections.abc.Collection[str] = (), variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', month_start_from_column_name: ~collections.abc.Callable[[str], ~datetime.date | None] = <function month_start_from_ca_monthly_col>) pyspark.sql.DataFrame

Unpivot wide month columns to long form and parse month_start_column from the variable name.

If value_columns is set, it takes precedence over value_columns_suffix.

Parameters:
Returns:

Long dataframe with ids, variable, value, and month start.

Return type:

DataFrame

Example

>>> long_df = dataframe_unpivot_wide_month_suffix(
...     wide_df,
...     id_columns=["project_id"],
...     value_columns_suffix=" [CA Monthly]",
... )
fabrictools.dataframe_last_nonnull_wide_month_from_long(long_df: pyspark.sql.DataFrame, *, order_column: str, variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', output_month_start: str = 'MonthStart', output_year: str = 'Year', output_month: str = 'Month', output_value: str = 'Value') pyspark.sql.DataFrame

For each distinct variable_column, keep the row with greatest order_column where value_column is non-null; emit typed month/value columns.

Parameters:
  • long_df (DataFrame) – Long dataframe (e.g. from dataframe_unpivot_wide_month_suffix()).

  • order_column (str) – Tie-break column (descending); must exist on long_df.

  • variable_column (str) – Month variable name column.

  • value_column (str) – Measure column.

  • month_start_column (str) – Parsed month start on long_df.

  • output_month_start (str) – Output date column name.

  • output_year (str) – Output year column name.

  • output_month (str) – Output month-of-year column name.

  • output_value (str) – Output numeric value column name.

Returns:

One row per variable_column with cast types, or empty schema if inputs missing.

Return type:

DataFrame

Example

>>> latest = dataframe_last_nonnull_wide_month_from_long(
...     long_df, order_column="as_of_date"
... )
fabrictools.dataframe_pivot_category_wide_month_from_long(long_df: pyspark.sql.DataFrame, *, category_column: str, pivot_categories: Sequence[str], fill_value: float = 0.0, variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', output_year: str = 'Year', output_month: str = 'Month', montant_column: str = 'Montant') pyspark.sql.DataFrame

Sum value_column by month_start_column and category_column, pivot categories wide, add year/month columns.

Parameters:
  • long_df (DataFrame) – Long dataframe with month, category, and value.

  • category_column (str) – Dimension to pivot.

  • pivot_categories (collections.abc.Sequence[str]) – Category values that become column names.

  • fill_value (float) – Fill null pivot cells after aggregation.

  • variable_column (str) – Variable column name (must exist on long_df for early-exit checks).

  • value_column (str) – Measure to sum.

  • month_start_column (str) – Date key for grouping.

  • output_year (str) – Name of year output column.

  • output_month (str) – Name of month output column.

  • montant_column (str) – Internal aggregate column name before pivot.

Returns:

Wide dataframe Year, Month, one column per category.

Return type:

DataFrame

Raises:

ValueError – If pivot_categories is empty.

Example

>>> wide = dataframe_pivot_category_wide_month_from_long(
...     long_df,
...     category_column="cost_type",
...     pivot_categories=("Actual", "Forecast"),
... )
fabrictools.transform_wide_month_suffix(df: pyspark.sql.DataFrame, *, id_columns: ~collections.abc.Sequence[str], aggregation: ~typing.Literal['last_nonnull', 'pivot_sum'], value_columns_suffix: str | None = None, value_columns: ~collections.abc.Sequence[str] | None = None, exclude_columns: ~collections.abc.Collection[str] = (), variable_column: str = 'MoisCol', value_column: str = 'Valeur', month_start_column: str = 'MonthStart', month_start_from_column_name: ~collections.abc.Callable[[str], ~datetime.date | None] = <function month_start_from_ca_monthly_col>, order_column: str | None = None, output_value: str = 'Value', output_month_start: str = 'MonthStart', output_year: str = 'Year', output_month: str = 'Month', category_column: str | None = None, pivot_categories: ~collections.abc.Sequence[str] | None = None, fill_value: float = 0.0, montant_column: str = 'Montant') pyspark.sql.DataFrame

Run dataframe_unpivot_wide_month_suffix() then last_nonnull or pivot_sum aggregation.

Parameters:
  • df (DataFrame) – Wide source dataframe.

  • id_columns (collections.abc.Sequence[str]) – Passed through to unpivot.

  • aggregation (Literal['last_nonnull', 'pivot_sum']) – last_nonnull (needs order_column) or pivot_sum (needs category_column and pivot_categories).

  • value_columns_suffix (str | None) – Passed through to unpivot.

  • value_columns (collections.abc.Sequence[str] | None) – Passed through to unpivot.

  • exclude_columns (collections.abc.Collection[str]) – Passed through to unpivot.

  • variable_column (str) – Long-form variable column name.

  • value_column (str) – Long-form value column name.

  • month_start_column (str) – Long-form month start column name.

  • month_start_from_column_name (collections.abc.Callable[[str], date | None]) – Parser for month start from variable name.

  • order_column (str | None) – Source-wide column for last_nonnull ordering (resolved on df). If it does not resolve, the long unpivot result is returned unchanged.

  • output_value (str) – Output value column for last_nonnull.

  • output_month_start (str) – Output month start for last_nonnull.

  • output_year (str) – Output year for both aggregations where applicable.

  • output_month (str) – Output month for both aggregations where applicable.

  • category_column (str | None) – Source column for pivot_sum (resolved on df). If it does not resolve, the long unpivot result is returned unchanged.

  • pivot_categories (collections.abc.Sequence[str] | None) – Category list for pivot_sum.

  • fill_value (float) – Pivot fill for pivot_sum.

  • montant_column (str) – Internal sum column name for pivot path.

Returns:

Aggregated dataframe per selected mode, or the long unpivot only when order_column / category_column does not resolve as above.

Return type:

DataFrame

Raises:

ValueError – If aggregation is unknown or required parameters are missing.

Example

>>> summary = transform_wide_month_suffix(
...     wide_df,
...     id_columns=["project_id"],
...     aggregation="last_nonnull",
...     value_columns_suffix=" [CA Monthly]",
...     order_column="snapshot_date",
... )
fabrictools.norm_text(expr: pyspark.sql.Column | str) pyspark.sql.Column

Lowercase string with control chars stripped and spaces removed (Power Query Text.Clean style).

If expr is a str, it is wrapped with F.lit.

Parameters:

expr (Column | str) – Spark column or string literal.

Returns:

Transformed column expression.

Return type:

Column

Example

>>> df.withColumn("key_norm", norm_text("Customer Name"))
fabrictools.empty_or_null(c: pyspark.sql.Column) pyspark.sql.Column

Boolean column: true if c is null or blank after string cast and trim.

Parameters:

c (Column) – Input column expression.

Returns:

Boolean Column.

Return type:

Column

Example

>>> df.filter(empty_or_null(F.col("notes")))
fabrictools.coalesce_dim(src: pyspark.sql.Column) pyspark.sql.Column

String cast of src; null or blank becomes the literal 0 as string (dimension-friendly).

Parameters:

src (Column) – Source column.

Returns:

String Column.

Return type:

Column

Example

>>> df.withColumn("dim_id", coalesce_dim(F.col("legacy_code")))