Enrollment Script

Run per metastore (executed from a workspace)

How it works:

  • The script runs from a Databricks workspace (via a Spark session).

  • It operates on the metastore attached to that workspace.

  • Unity Catalog permissions are granted at the metastore level.

Practical implications:

  1. If multiple workspaces share the same metastore, run the script once per metastore (from any workspace attached to it). Permissions apply to all workspaces using that metastore.

  2. If each workspace has its own metastore, run it once per workspace (which equals once per metastore).

  3. The script queries SHOW CATALOGS, which returns catalogs from the metastore attached to the current workspace.

In practice:

  • Run the script from a workspace connected to the metastore you want to enroll.

  • First, set DRY_RUN = True to verify output

  • If you have 3 workspaces sharing 1 metastore, run it once (from any of those workspaces).

  • If you have 3 workspaces with 3 separate metastores, run it 3 times (once per workspace/metastore).

# Get catalog names as a list
rows = spark.sql("SHOW CATALOGS").collect()
catalog_names = []
for r in rows:
    d = r.asDict()
    catalog_names.append(d.get("catalog") or d.get("name"))

print(catalog_names)

################################################################################
# Enrollment script

principal_app_id = "" # Teleskope SP Client ID

EXCLUDED_CATALOGS = {"samples", "system"} # Catalogs to exclude from enrollment
EXCLUDE_FOREIGN_CATALOGS = True # Exclude foreign catalogs (Delta Shares)
INCLUDE_VIEWS = False # Include views in enrollment
DRY_RUN = True # Dry run mode


def q(name: str) -> str:
    return f"`{name}`" if not (name.startswith("`") and name.endswith("`")) else name


def get_catalog_names() -> list[str]:
    rows = spark.sql("SHOW CATALOGS").collect()
    names = []
    for r in rows:
        d = r.asDict()
        names.append(d.get("catalog") or d.get("name"))
    return [n for n in names if n]


def get_schema_names(catalog: str) -> list[str]:
    rows = spark.sql(f"SHOW SCHEMAS IN {q(catalog)}").collect()
    names = []
    for r in rows:
        d = r.asDict()
        names.append(
            d.get("namespace")
            or d.get("databaseName")
            or d.get("schema")
            or d.get("name")
        )
    return [n for n in names if n and n.lower() != "information_schema"]


def get_first_str_value(row) -> str:
    d = row.asDict()
    for v in d.values():
        if isinstance(v, str):
            return v
    return ""


def is_foreign_catalog(catalog: str) -> bool:
    if not EXCLUDE_FOREIGN_CATALOGS:
        return False
    try:
        # Use DESCRIBE CATALOG EXTENDED to check for foreign catalogs (Delta Shares)
        rows = spark.sql(f"DESCRIBE CATALOG EXTENDED {q(catalog)}").collect()
        blob = " ".join(" ".join(str(v) for v in r.asDict().values()) for r in rows).upper()
        if "SHARE" in blob or "FOREIGN" in blob:
            return True
    except Exception:
        pass
    return False


def emit(sql: str):
    if DRY_RUN:
        print(sql)
    else:
        spark.sql(sql)


for catalog in get_catalog_names():
    catalog_lower = catalog.lower()

    if catalog in EXCLUDED_CATALOGS:
        continue

    if "sample" in catalog_lower:
        continue

    if is_foreign_catalog(catalog):
        continue

    emit(f"GRANT USE CATALOG ON CATALOG {q(catalog)} TO `{principal_app_id}`;")

    for schema in get_schema_names(catalog):
        full_schema = f"{q(catalog)}.{q(schema)}"
        emit(f"GRANT USE SCHEMA ON SCHEMA {full_schema} TO `{principal_app_id}`;")

        tables = spark.sql(f"SHOW TABLES IN {full_schema}").collect()
        for t in tables:
            d = t.asDict()
            table_name = d.get("tableName") or d.get("name")
            if not table_name:
                continue
            full_table = f"{full_schema}.{q(table_name)}"
            # Note: SELECT ON TABLE typically works for both tables and views in Unity Catalog
            # If views fail during scanning, you may need to grant SELECT ON VIEW separately
            emit(f"GRANT SELECT ON TABLE {full_table} TO `{principal_app_id}`;")

        emit(f"GRANT APPLY TAG ON SCHEMA {full_schema} TO `{principal_app_id}`;")

# Grant access to system catalog for reading tags from system.information_schema
# This is required even though 'system' is in EXCLUDED_CATALOGS, as tag reading
# requires querying system.information_schema.*_tags tables
emit(f"GRANT USE CATALOG ON CATALOG system TO `{principal_app_id}`;")

# Grant access to system.information_schema schema for reading tags, masks, and routines
# Required for: catalog_tags, schema_tags, table_tags, column_tags, volume_tags,
# column_masks, and routines tables
emit(f"GRANT USE SCHEMA ON SCHEMA system.information_schema TO `{principal_app_id}`;")

# Grant SELECT on information_schema views/tables used by Teleskope
# These are views, but SELECT ON TABLE typically works for views in Unity Catalog
information_schema_tables = [
    "catalog_tags",
    "schema_tags", 
    "table_tags",
    "column_tags",
    "volume_tags",
    "column_masks",
    "routines"
]
for table in information_schema_tables:
    emit(f"GRANT SELECT ON TABLE system.information_schema.{q(table)} TO `{principal_app_id}`;")

# Grant access to system.access schema for Access Monitoring features
# Required for query_history and audit logs (may not exist in all Databricks versions)
# Note: If this schema doesn't exist, you can comment out or remove these grants
emit(f"GRANT USE SCHEMA ON SCHEMA system.access TO `{principal_app_id}`;")

# Grant SELECT on Access Monitoring tables for user activity tracking
# system.query_history - query execution history (location varies by Databricks version)
# system.access.audit - audit logs (Public Preview feature)
# Note: These tables may not exist in all Databricks versions/regions
# If grants fail, check your Databricks version and available system tables

# Try query_history in information_schema (common location)
emit(f"GRANT SELECT ON TABLE system.information_schema.query_history TO `{principal_app_id}`;")

# Try query_history in access schema (some versions)
emit(f"GRANT SELECT ON TABLE system.access.query_history TO `{principal_app_id}`;")

# Grant SELECT on audit logs table
emit(f"GRANT SELECT ON TABLE system.access.audit TO `{principal_app_id}`;")

################################################################################

Last updated

Was this helpful?