"""Lakehouse filesystem discovery utilities."""
from __future__ import annotations
from typing import Any, List, Optional
from fabrictools.core import get_lakehouse_abfs_path
_PIPELINE_DISCOVERY_EXCLUDED_TABLE_NAMES = frozenset(
{
"pipeline_audit_log",
"prefix_rules",
"profiling_cache",
}
)
_SCHEMA_SNAPSHOT_TABLE_SUFFIX = "_schema_snapshot"
[docs]
def filter_pipeline_discovered_tables(relative_paths: list[str]) -> list[str]:
"""Remove internal fabrictools tables and schema snapshot paths from a path list.
Drops names ending with ``_schema_snapshot`` and fixed names
``pipeline_audit_log``, ``prefix_rules``, and ``profiling_cache``.
:param relative_paths: ``Tables/<schema>/<table>`` style relative paths.
:type relative_paths: list[str]
:returns: Filtered paths in original order.
:rtype: list[str]
"""
kept: list[str] = []
for relative_path in relative_paths:
segments = [s for s in relative_path.strip().strip("/").split("/") if s]
if not segments:
continue
table_name_lower = segments[-1].lower()
if table_name_lower.endswith(_SCHEMA_SNAPSHOT_TABLE_SUFFIX):
continue
if table_name_lower in _PIPELINE_DISCOVERY_EXCLUDED_TABLE_NAMES:
continue
kept.append(relative_path)
return kept
[docs]
def get_fs_entry_name(fs_entry: Any) -> str:
"""Return a directory or file name from a ``notebookutils.fs.ls`` entry.
:param fs_entry: Object with ``name`` or ``path`` as provided by Fabric APIs.
:type fs_entry: Any
:returns: Normalized leaf name, or empty string if none.
:rtype: str
"""
raw_name = getattr(fs_entry, "name", "")
if raw_name:
return str(raw_name).strip().strip("/")
raw_path = getattr(fs_entry, "path", "")
if raw_path:
return str(raw_path).strip().strip("/").split("/")[-1]
return ""
[docs]
def list_lakehouse_tables(
lakehouse_name: str,
include_schemas: Optional[List[str]] = None,
exclude_tables: Optional[List[str]] = None,
) -> List[str]:
"""List table paths under a Lakehouse as ``Tables/<schema>/<table>``.
Uses filesystem listing under ``<abfs>/Tables/<schema>/<table>``.
:param lakehouse_name: Lakehouse display name.
:param include_schemas: If set, only these schemas (case-insensitive).
:param exclude_tables: Table names or ``schema.table`` to exclude (case-insensitive).
:type lakehouse_name: str
:type include_schemas: list[str] | None
:type exclude_tables: list[str] | None
:returns: Sorted relative paths.
:rtype: list[str]
:raises ValueError: When ``notebookutils`` is unavailable (not in Fabric).
"""
try:
import notebookutils # type: ignore[import-untyped] # noqa: PLC0415
except ImportError as exc:
raise ValueError(
"notebookutils is not available — are you running inside "
f"Microsoft Fabric? ({exc})"
) from exc
included_schema_names = (
{schema_name.strip().lower() for schema_name in include_schemas}
if include_schemas
else None
)
excluded_table_names = {
table_name.strip().lower()
for table_name in (exclude_tables or [])
}
base = get_lakehouse_abfs_path(lakehouse_name)
tables_root = f"{base}/Tables"
discovered_table_paths: List[str] = []
for schema_entry in notebookutils.fs.ls(tables_root):
schema_name = get_fs_entry_name(schema_entry)
if not schema_name:
continue
schema_name_lower = schema_name.lower()
if included_schema_names is not None and schema_name_lower not in included_schema_names:
continue
schema_path = getattr(schema_entry, "path", f"{tables_root}/{schema_name}")
for table_entry in notebookutils.fs.ls(schema_path):
table_name = get_fs_entry_name(table_entry)
if not table_name:
continue
qualified_table_name = f"{schema_name_lower}.{table_name.lower()}"
if (
table_name.lower() in excluded_table_names
or qualified_table_name in excluded_table_names
):
continue
discovered_table_paths.append(f"Tables/{schema_name}/{table_name}")
return sorted(discovered_table_paths)
[docs]
def list_lakehouse_tables_for_pipeline(
lakehouse_name: str,
include_schemas: Optional[List[str]] = None,
exclude_tables: Optional[List[str]] = None,
) -> List[str]:
"""Like :py:func:`list_lakehouse_tables`, then :py:func:`filter_pipeline_discovered_tables`.
Bulk pipelines use this to skip internal metadata and schema snapshot tables.
:param lakehouse_name: Lakehouse display name.
:param include_schemas: Optional schema allow-list (case-insensitive).
:param exclude_tables: Optional table / ``schema.table`` deny-list.
:type lakehouse_name: str
:type include_schemas: list[str] | None
:type exclude_tables: list[str] | None
:returns: Sorted, pipeline-safe relative table paths.
:rtype: list[str]
"""
return filter_pipeline_discovered_tables(
list_lakehouse_tables(
lakehouse_name=lakehouse_name,
include_schemas=include_schemas,
exclude_tables=exclude_tables,
)
)
__all__ = [
"filter_pipeline_discovered_tables",
"get_fs_entry_name",
"list_lakehouse_tables",
"list_lakehouse_tables_for_pipeline",
]