Couche préparée (prepared)
Schéma, résolution de colonnes, transformations, agrégations et publication sémantique.
- fabrictools.snapshot_source_schema(source_lakehouse_name: str, source_relative_path: str, spark: pyspark.sql.SparkSession | None = None) str
Persist per-column profile stats and optional Delta metadata beside the source table.
Writes to
{source_relative_path}_schema_snapshoton the source Lakehouse. The returned hash keys profiling cache rows infabrictools.resolve_columns().- Parameters:
- Returns:
MD5 hash of
name:typepairs for the source schema.- Return type:
Example
>>> schema_hash = snapshot_source_schema( ... "BronzeLakehouse", "dbo.RawInvoices" ... )
- fabrictools.resolve_columns(source_lakehouse_name: str, source_relative_path: str, schema_hash: str | None = None, sample_size: int = 500, profiling_confidence_threshold: float = 0.8, unresolved_webhook_url: str | None = None, spark: pyspark.sql.SparkSession | None = None) List[ResolvedColumn]
Resolve each source column to a prepared name and
semantic_type(prefix rules, profiling cache, mapping).Layer 1: prefix rules table. Layer 2: profiling heuristics (with cache by
schema_hash). Layer 3: explicit mapping rules. Unresolved columns are audited (and optional webhook).- Parameters:
source_lakehouse_name (str) – Lakehouse of the source table.
source_relative_path (str) – Source table path.
schema_hash (str | None) – If
None, computed from the live dataframe schema.sample_size (int) – Row sample size for layer-2 profiling.
profiling_confidence_threshold (float) – Minimum confidence to accept cached profiling.
unresolved_webhook_url (str | None) – Optional HTTP endpoint notified for unresolved columns.
spark (SparkSession | None) – Optional
SparkSession.
- Returns:
One
fabrictools.prepare.resolve.ResolvedColumndict per resolved column (not every source column if some stay unresolved).- Return type:
Example
>>> mappings = resolve_columns( ... "BronzeLakehouse", ... "dbo.RawInvoices", ... schema_hash=known_hash, ... sample_size=1000, ... )
- fabrictools.transform_to_prepared(source_lakehouse_name: str, source_relative_path: str, resolved_mappings: List[ResolvedColumn], spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Build the prepared projection: semantic casts, optional code labels, DATE derivations.
- Parameters:
source_lakehouse_name (str) – Lakehouse to read the source from.
source_relative_path (str) – Source path.
resolved_mappings (list) – Output of
fabrictools.resolve_columns().spark (SparkSession | None) – Optional
SparkSession.
- Returns:
Single
selectresult with prepared column names.- Return type:
DataFrame
Example
>>> prepared_df = transform_to_prepared( ... "BronzeLakehouse", "dbo.RawInvoices", resolved_mappings ... )
- fabrictools.write_prepared_table(df: pyspark.sql.DataFrame, resolved_mappings: List[ResolvedColumn], target_lakehouse_name: str, target_relative_path: str, mode: str = 'overwrite', max_partitions_guard: int = 200, vacuum_retention_hours: int = 168, spark: pyspark.sql.SparkSession | None = None) None
Write
dfto the target Lakehouse path with heuristic Delta partitioning, then optional OPTIMIZE/VACUUM.Partition columns are chosen so the product of distinct counts lies in
[20, min(200, max_partitions_guard)]when possible; otherwise the write omitspartitionBy. Maintenance (OPTIMIZE/VACUUM) is best-effort.- Parameters:
df (DataFrame) – Prepared dataframe from
fabrictools.transform_to_prepared().resolved_mappings (list) – Same mappings used to build
df(for partition heuristics).target_lakehouse_name (str) – Target Lakehouse name.
target_relative_path (str) – Target Delta path.
mode (str) – Spark write mode.
max_partitions_guard (int) – Upper cap for combined partition cardinality guard.
vacuum_retention_hours (int) – Retention passed to Delta
VACUUM.spark (SparkSession | None) – Optional
SparkSession.
Example
>>> write_prepared_table( ... prepared_df, ... resolved_mappings, ... "GoldLakehouse", ... "Tables/dbo/Prepared_Invoices", ... mode="overwrite", ... )
- fabrictools.generate_prepared_aggregations(source_lakehouse_name: str, target_lakehouse_name: str, target_relative_path: str, resolved_mappings: List[ResolvedColumn], spark: pyspark.sql.SparkSession | None = None) dict[str, str]
Build and write three default aggregation tables next to the prepared table.
Writes
prepared_agg_day,prepared_agg_week, andprepared_agg_regionunder a sibling folder derived fromtarget_relative_path. Heuristics pick temporal and categorical dimensions fromresolved_mappingsand column names.- Parameters:
source_lakehouse_name (str) – Lakehouse name (used for reads if needed).
target_lakehouse_name (str) – Lakehouse where the prepared table and aggs are stored.
target_relative_path (str) – Path to the main prepared Delta table.
resolved_mappings (list) – Column resolution rows from
fabrictools.resolve_columns().spark (SparkSession | None) – Optional
SparkSession.
- Returns:
Map of logical aggregation name to written relative path.
- Return type:
Example
>>> paths = generate_prepared_aggregations( ... "BronzeLakehouse", ... "GoldLakehouse", ... "Tables/dbo/Prepared_Sales", ... resolved_mappings, ... )
- fabrictools.publish_semantic_model(target_lakehouse_name: str, agg_tables: dict[str, str], resolved_mappings: List[ResolvedColumn], semantic_workspace: str | None, semantic_model_name: str = 'fabrictools_prepared_dataset', overwrite_model: bool = True, spark: pyspark.sql.SparkSession | None = None) dict[str, Any]
Publish or replace a semantic model via Semantic Link (
sempy/sempy_labs).Best-effort: returns a status dict instead of raising when prerequisites are missing.
- Parameters:
target_lakehouse_name (str) – Lakehouse holding aggregation tables.
agg_tables (dict[str, str]) – Map of table label to relative path (from
fabrictools.generate_prepared_aggregations()).resolved_mappings (list) – Column metadata for semantic typing.
semantic_workspace (str | None) – Fabric workspace name; if empty, publish is skipped.
semantic_model_name (str) – Model name in the workspace.
overwrite_model (bool) – Replace an existing model when
True.spark (SparkSession | None) – Optional
SparkSessionfor reading table schemas.
- Returns:
Status dictionary (
status,reasonorerror, counts, etc.).- Return type:
Example
>>> status = publish_semantic_model( ... "GoldLakehouse", ... agg_tables, ... resolved_mappings, ... semantic_workspace="Analytics WS", ... semantic_model_name="PreparedSales", ... )
- fabrictools.prepare_and_write_data(source_lakehouse_name: str, source_relative_path: str, target_lakehouse_name: str, target_relative_path: str, mode: str = 'overwrite', sample_size: int = 500, profiling_confidence_threshold: float = 0.8, max_partitions_guard: int = 200, vacuum_retention_hours: int = 168, enable_semantic_model_publish: bool = False, semantic_workspace: str | None = None, semantic_model_name: str = 'fabrictools_prepared_dataset', overwrite_semantic_model: bool = True, spark: pyspark.sql.SparkSession | None = None) pyspark.sql.DataFrame
Run the full prepared pipeline for one source table (schema, resolve, transform, write).
Optionally generates aggregations and publishes a semantic model when
enable_semantic_model_publishisTrue(requires Fabric Semantic Link).- Parameters:
source_lakehouse_name (str) – Source Lakehouse name.
source_relative_path (str) – Source table path.
target_lakehouse_name (str) – Target Lakehouse for the prepared Delta table.
target_relative_path (str) – Target path for the prepared table.
mode (str) – Spark write mode for the prepared table.
sample_size (int) – Profiling sample size for
fabrictools.resolve_columns().profiling_confidence_threshold (float) – Minimum confidence to trust profiling cache hits.
max_partitions_guard (int) – Upper bound for partition column selection (see
fabrictools.write_prepared_table()).vacuum_retention_hours (int) – Delta
VACUUMretention when maintenance runs.enable_semantic_model_publish (bool) – If
True, callfabrictools.publish_semantic_model().semantic_workspace (str | None) – Fabric workspace for semantic publish (required when publish enabled).
semantic_model_name (str) – Model display name in the workspace.
overwrite_semantic_model (bool) – Replace existing semantic model when publishing.
spark (SparkSession | None) – Optional
SparkSession.
- Returns:
The prepared
DataFramethat was written.- Return type:
DataFrame
Example
>>> prepared_df = prepare_and_write_data( ... "BronzeLakehouse", ... "dbo.RawInvoices", ... "GoldLakehouse", ... "Tables/dbo/Prepared_Invoices", ... mode="overwrite", ... )
- fabrictools.prepare_and_write_all_tables(source_lakehouse_name: str, target_lakehouse_name: str, mode: str = 'overwrite', tables_config: list[dict[str, Any]] | None = None, include_schemas: list[str] | None = None, exclude_tables: list[str] | None = None, sample_size: int = 500, profiling_confidence_threshold: float = 0.8, max_partitions_guard: int = 200, vacuum_retention_hours: int = 168, enable_semantic_model_publish: bool = False, semantic_workspace: str | None = None, semantic_model_name: str = 'fabrictools_prepared_dataset', overwrite_semantic_model: bool = True, continue_on_error: bool = False, spark: pyspark.sql.SparkSession | None = None) dict[str, Any]
Bulk prepared pipeline: discovery or
tables_config, thenfabrictools.prepare_and_write_data()per job.- Parameters:
source_lakehouse_name (str) – Source Lakehouse.
target_lakehouse_name (str) – Target Lakehouse for prepared outputs.
mode (str) – Default write mode for discovered jobs.
tables_config (list[dict] | None) – Optional explicit job list (see
fabrictools.pipelines.config).include_schemas (list[str] | None) – Discovery schema filter.
exclude_tables (list[str] | None) – Discovery exclusion list.
sample_size (int) – Forwarded to each
fabrictools.prepare_and_write_data()call.profiling_confidence_threshold (float) – Forwarded to each call.
max_partitions_guard (int) – Forwarded to each call.
vacuum_retention_hours (int) – Forwarded to each call.
enable_semantic_model_publish (bool) – Forwarded to each call.
semantic_workspace (str | None) – Forwarded to each call.
semantic_model_name (str) – Forwarded to each call.
overwrite_semantic_model (bool) – Forwarded to each call.
continue_on_error (bool) – If
False, abort on first table failure.spark (SparkSession | None) – Optional
SparkSession.
- Returns:
Summary dict with counts and per-table success/failure entries.
- Return type:
Example
>>> summary = prepare_and_write_all_tables( ... "BronzeLakehouse", ... "GoldLakehouse", ... mode="overwrite", ... include_schemas=["dbo"], ... )