Couche préparée (prepared)

Schéma, résolution de colonnes, transformations, agrégations et publication sémantique.

fabrictools.snapshot_source_schema(source_lakehouse_name: str, source_relative_path: str, spark: pyspark.sql.SparkSession | None = None) str

Persist per-column profile stats and optional Delta metadata beside the source table.

Writes to {source_relative_path}_schema_snapshot on the source Lakehouse. The returned hash keys profiling cache rows in fabrictools.resolve_columns().

Parameters:
  • source_lakehouse_name (str) – Lakehouse containing the source table.

  • source_relative_path (str) – Source table path.

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

MD5 hash of name:type pairs for the source schema.

Return type:

str

Example

>>> schema_hash = snapshot_source_schema(
...     "BronzeLakehouse", "dbo.RawInvoices"
... )
fabrictools.resolve_columns(source_lakehouse_name: str, source_relative_path: str, schema_hash: str | None = None, sample_size: int = 500, profiling_confidence_threshold: float = 0.8, unresolved_webhook_url: str | None = None, spark: pyspark.sql.SparkSession | None = None) List[ResolvedColumn]

Resolve each source column to a prepared name and semantic_type (prefix rules, profiling cache, mapping).

Layer 1: prefix rules table. Layer 2: profiling heuristics (with cache by schema_hash). Layer 3: explicit mapping rules. Unresolved columns are audited (and optional webhook).

Parameters:
  • source_lakehouse_name (str) – Lakehouse of the source table.

  • source_relative_path (str) – Source table path.

  • schema_hash (str | None) – If None, computed from the live dataframe schema.

  • sample_size (int) – Row sample size for layer-2 profiling.

  • profiling_confidence_threshold (float) – Minimum confidence to accept cached profiling.

  • unresolved_webhook_url (str | None) – Optional HTTP endpoint notified for unresolved columns.

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

One fabrictools.prepare.resolve.ResolvedColumn dict per resolved column (not every source column if some stay unresolved).

Return type:

list

Example

>>> mappings = resolve_columns(
...     "BronzeLakehouse",
...     "dbo.RawInvoices",
...     schema_hash=known_hash,
...     sample_size=1000,
... )
fabrictools.transform_to_prepared(source_lakehouse_name: str, source_relative_path: str, resolved_mappings: List[ResolvedColumn], spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame

Build the prepared projection: semantic casts, optional code labels, DATE derivations.

Parameters:
  • source_lakehouse_name (str) – Lakehouse to read the source from.

  • source_relative_path (str) – Source path.

  • resolved_mappings (list) – Output of fabrictools.resolve_columns().

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

Single select result with prepared column names.

Return type:

DataFrame

Example

>>> prepared_df = transform_to_prepared(
...     "BronzeLakehouse", "dbo.RawInvoices", resolved_mappings
... )
fabrictools.write_prepared_table(df: pyspark.sql.DataFrame, resolved_mappings: List[ResolvedColumn], target_lakehouse_name: str, target_relative_path: str, mode: str = 'overwrite', max_partitions_guard: int = 200, vacuum_retention_hours: int = 168, spark: pyspark.sql.SparkSession | None = None) None

Write df to the target Lakehouse path with heuristic Delta partitioning, then optional OPTIMIZE/VACUUM.

Partition columns are chosen so the product of distinct counts lies in [20, min(200, max_partitions_guard)] when possible; otherwise the write omits partitionBy. Maintenance (OPTIMIZE / VACUUM) is best-effort.

Parameters:
  • df (DataFrame) – Prepared dataframe from fabrictools.transform_to_prepared().

  • resolved_mappings (list) – Same mappings used to build df (for partition heuristics).

  • target_lakehouse_name (str) – Target Lakehouse name.

  • target_relative_path (str) – Target Delta path.

  • mode (str) – Spark write mode.

  • max_partitions_guard (int) – Upper cap for combined partition cardinality guard.

  • vacuum_retention_hours (int) – Retention passed to Delta VACUUM.

  • spark (SparkSession | None) – Optional SparkSession.

Example

>>> write_prepared_table(
...     prepared_df,
...     resolved_mappings,
...     "GoldLakehouse",
...     "Tables/dbo/Prepared_Invoices",
...     mode="overwrite",
... )
fabrictools.generate_prepared_aggregations(source_lakehouse_name: str, target_lakehouse_name: str, target_relative_path: str, resolved_mappings: List[ResolvedColumn], spark: pyspark.sql.SparkSession | None = None) dict[str, str]

Build and write three default aggregation tables next to the prepared table.

Writes prepared_agg_day, prepared_agg_week, and prepared_agg_region under a sibling folder derived from target_relative_path. Heuristics pick temporal and categorical dimensions from resolved_mappings and column names.

Parameters:
  • source_lakehouse_name (str) – Lakehouse name (used for reads if needed).

  • target_lakehouse_name (str) – Lakehouse where the prepared table and aggs are stored.

  • target_relative_path (str) – Path to the main prepared Delta table.

  • resolved_mappings (list) – Column resolution rows from fabrictools.resolve_columns().

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

Map of logical aggregation name to written relative path.

Return type:

dict[str, str]

Example

>>> paths = generate_prepared_aggregations(
...     "BronzeLakehouse",
...     "GoldLakehouse",
...     "Tables/dbo/Prepared_Sales",
...     resolved_mappings,
... )
fabrictools.publish_semantic_model(target_lakehouse_name: str, agg_tables: dict[str, str], resolved_mappings: List[ResolvedColumn], semantic_workspace: str | None, semantic_model_name: str = 'fabrictools_prepared_dataset', overwrite_model: bool = True, spark: pyspark.sql.SparkSession | None = None) dict[str, Any]

Publish or replace a semantic model via Semantic Link (sempy / sempy_labs).

Best-effort: returns a status dict instead of raising when prerequisites are missing.

Parameters:
  • target_lakehouse_name (str) – Lakehouse holding aggregation tables.

  • agg_tables (dict[str, str]) – Map of table label to relative path (from fabrictools.generate_prepared_aggregations()).

  • resolved_mappings (list) – Column metadata for semantic typing.

  • semantic_workspace (str | None) – Fabric workspace name; if empty, publish is skipped.

  • semantic_model_name (str) – Model name in the workspace.

  • overwrite_model (bool) – Replace an existing model when True.

  • spark (SparkSession | None) – Optional SparkSession for reading table schemas.

Returns:

Status dictionary (status, reason or error, counts, etc.).

Return type:

dict

Example

>>> status = publish_semantic_model(
...     "GoldLakehouse",
...     agg_tables,
...     resolved_mappings,
...     semantic_workspace="Analytics WS",
...     semantic_model_name="PreparedSales",
... )
fabrictools.prepare_and_write_data(source_lakehouse_name: str, source_relative_path: str, target_lakehouse_name: str, target_relative_path: str, mode: str = 'overwrite', sample_size: int = 500, profiling_confidence_threshold: float = 0.8, max_partitions_guard: int = 200, vacuum_retention_hours: int = 168, enable_semantic_model_publish: bool = False, semantic_workspace: str | None = None, semantic_model_name: str = 'fabrictools_prepared_dataset', overwrite_semantic_model: bool = True, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame

Run the full prepared pipeline for one source table (schema, resolve, transform, write).

Optionally generates aggregations and publishes a semantic model when enable_semantic_model_publish is True (requires Fabric Semantic Link).

Parameters:
  • source_lakehouse_name (str) – Source Lakehouse name.

  • source_relative_path (str) – Source table path.

  • target_lakehouse_name (str) – Target Lakehouse for the prepared Delta table.

  • target_relative_path (str) – Target path for the prepared table.

  • mode (str) – Spark write mode for the prepared table.

  • sample_size (int) – Profiling sample size for fabrictools.resolve_columns().

  • profiling_confidence_threshold (float) – Minimum confidence to trust profiling cache hits.

  • max_partitions_guard (int) – Upper bound for partition column selection (see fabrictools.write_prepared_table()).

  • vacuum_retention_hours (int) – Delta VACUUM retention when maintenance runs.

  • enable_semantic_model_publish (bool) – If True, call fabrictools.publish_semantic_model().

  • semantic_workspace (str | None) – Fabric workspace for semantic publish (required when publish enabled).

  • semantic_model_name (str) – Model display name in the workspace.

  • overwrite_semantic_model (bool) – Replace existing semantic model when publishing.

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

The prepared DataFrame that was written.

Return type:

DataFrame

Example

>>> prepared_df = prepare_and_write_data(
...     "BronzeLakehouse",
...     "dbo.RawInvoices",
...     "GoldLakehouse",
...     "Tables/dbo/Prepared_Invoices",
...     mode="overwrite",
... )
fabrictools.prepare_and_write_all_tables(source_lakehouse_name: str, target_lakehouse_name: str, mode: str = 'overwrite', tables_config: list[dict[str, Any]] | None = None, include_schemas: list[str] | None = None, exclude_tables: list[str] | None = None, sample_size: int = 500, profiling_confidence_threshold: float = 0.8, max_partitions_guard: int = 200, vacuum_retention_hours: int = 168, enable_semantic_model_publish: bool = False, semantic_workspace: str | None = None, semantic_model_name: str = 'fabrictools_prepared_dataset', overwrite_semantic_model: bool = True, continue_on_error: bool = False, spark: pyspark.sql.SparkSession | None = None) dict[str, Any]

Bulk prepared pipeline: discovery or tables_config, then fabrictools.prepare_and_write_data() per job.

Parameters:
  • source_lakehouse_name (str) – Source Lakehouse.

  • target_lakehouse_name (str) – Target Lakehouse for prepared outputs.

  • mode (str) – Default write mode for discovered jobs.

  • tables_config (list[dict] | None) – Optional explicit job list (see fabrictools.pipelines.config).

  • include_schemas (list[str] | None) – Discovery schema filter.

  • exclude_tables (list[str] | None) – Discovery exclusion list.

  • sample_size (int) – Forwarded to each fabrictools.prepare_and_write_data() call.

  • profiling_confidence_threshold (float) – Forwarded to each call.

  • max_partitions_guard (int) – Forwarded to each call.

  • vacuum_retention_hours (int) – Forwarded to each call.

  • enable_semantic_model_publish (bool) – Forwarded to each call.

  • semantic_workspace (str | None) – Forwarded to each call.

  • semantic_model_name (str) – Forwarded to each call.

  • overwrite_semantic_model (bool) – Forwarded to each call.

  • continue_on_error (bool) – If False, abort on first table failure.

  • spark (SparkSession | None) – Optional SparkSession.

Returns:

Summary dict with counts and per-table success/failure entries.

Return type:

dict

Example

>>> summary = prepare_and_write_all_tables(
...     "BronzeLakehouse",
...     "GoldLakehouse",
...     mode="overwrite",
...     include_schemas=["dbo"],
... )