-
Notifications
You must be signed in to change notification settings - Fork 51
fix: tables schema command replace manual _delta_log JSON parsing with DeltaTable.schema() from the deltalake library #229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c22f70e
c0dd71b
f0327bc
746e8f6
ca07e17
0d18818
baaba5d
8221723
1012572
3963efa
34c7141
bce23dc
a4c4ba7
55c3515
f206e12
4cda700
c954a2a
0ba9da9
ed43c39
c015864
b6da0e7
7996c2e
6d4ac61
ed998af
18df707
ceba9fc
ee79ee5
d0a7e23
82c732b
b7d0831
3761717
df0360b
eebb6f2
12bfa45
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| kind: fixed | ||
| body: Refactor `fab tables schema` to use the `deltalake` Python library for schema extraction via ABFSS URI instead of manually parsing Delta log commit files | ||
| time: 2026-04-30T13:05:58.364670843Z | ||
| custom: | ||
| Author: pkontek | ||
| AuthorLink: https://github.com/pkontek |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| import json | ||
| from argparse import Namespace | ||
|
|
||
| from deltalake import DeltaTable | ||
| from deltalake.exceptions import DeltaError | ||
|
|
||
| from fabric_cli.core import fab_constant | ||
| from fabric_cli.core.fab_auth import FabAuth | ||
| from fabric_cli.core.fab_exceptions import FabricCLIError | ||
| from fabric_cli.errors import ErrorMessages | ||
|
|
||
| # Item types whose OneLake Tables/ folder contains standard Delta tables. | ||
| # SemanticModel is explicitly excluded: its Tables/ entries are columnar | ||
| # semantic-model representations, not Delta-compatible parquet. | ||
| _DELTA_SUPPORTED_ITEM_TYPES: frozenset[str] = frozenset( | ||
| { | ||
| "Lakehouse", | ||
| "Warehouse", | ||
| "KQLDatabase", | ||
| "MirroredDatabase", | ||
| "SQLDatabase", | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| def get_table_schema(args: Namespace, local_path: str) -> list[dict]: | ||
| """Return schema fields for a Delta table on OneLake. | ||
|
|
||
| Single seam for DeltaTable construction: owns item-type validation, | ||
| URI building, storage_options assembly, token acquisition, and | ||
| exception mapping. | ||
| """ | ||
| item_type = getattr(args, "item_type", None) | ||
| if item_type is not None and item_type not in _DELTA_SUPPORTED_ITEM_TYPES: | ||
| raise FabricCLIError( | ||
| ErrorMessages.Table.unsupported_item_type_for_delta(item_type), | ||
| fab_constant.ERROR_INVALID_ITEM_TYPE, | ||
| ) | ||
|
|
||
| token = FabAuth().get_access_token(fab_constant.SCOPE_ONELAKE_DEFAULT) | ||
| if token is None: | ||
| raise FabricCLIError( | ||
| ErrorMessages.Auth.access_token_failed(), | ||
| fab_constant.ERROR_AUTHENTICATION_FAILED, | ||
| ) | ||
|
|
||
| table_uri = ( | ||
| f"abfss://{args.ws_id}@{fab_constant.API_ENDPOINT_ONELAKE}" | ||
| f"/{args.lakehouse_id}/{local_path}" | ||
| ) | ||
|
|
||
| try: | ||
| table = DeltaTable( | ||
| table_uri, | ||
| storage_options={ | ||
| "bearer_token": token, | ||
| "use_fabric_endpoint": "true", | ||
| }, | ||
| ) | ||
| schema_dict = json.loads(table.schema().to_json()) | ||
| schema_fields = schema_dict.get("fields") | ||
| if not isinstance(schema_fields, list): | ||
| raise ValueError( | ||
| "Delta table schema JSON does not contain a valid 'fields' list." | ||
| ) | ||
| return schema_fields | ||
| except (DeltaError, json.JSONDecodeError, ValueError) as exc: | ||
| raise FabricCLIError( | ||
| "Failed to extract the table schema. Please ensure the path points to a valid Delta table.", | ||
| fab_constant.ERROR_INVALID_DELTA_TABLE, | ||
| ) from exc |
|
pkontek marked this conversation as resolved.
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,80 +1,17 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| import json | ||
| from argparse import Namespace | ||
| from typing import Optional | ||
|
|
||
| from fabric_cli.client import fab_api_onelake as onelake_api | ||
| from fabric_cli.core import fab_constant | ||
| from fabric_cli.core import fab_handle_context as handle_context | ||
| from fabric_cli.core.fab_exceptions import FabricCLIError | ||
| from fabric_cli.core.hiearchy.fab_hiearchy import OneLakeItem | ||
| from fabric_cli.client import fab_delta_client as delta_client | ||
| from fabric_cli.utils import fab_ui | ||
| from fabric_cli.utils import fab_util as utils | ||
|
|
||
|
|
||
| def exec_command(args: Namespace) -> None: | ||
| schema = _extract_schema_from_commit_logs(args) | ||
| if schema: | ||
| fab_ui.print_grey("Schema extracted successfully") | ||
| _schema = json.loads(schema)["fields"] | ||
| fab_ui.print_output_format(args, data=_schema, show_headers=True) | ||
| schema_fields = _get_table_schema(args) | ||
| fab_ui.print_grey("Schema extracted successfully") | ||
| fab_ui.print_output_format(args, data=schema_fields, show_headers=True) | ||
|
|
||
|
pkontek marked this conversation as resolved.
|
||
| else: | ||
| raise FabricCLIError( | ||
| "Failed to extract the table schema. Please ensure the path points to a valid Delta table", | ||
| fab_constant.ERROR_INVALID_DETLA_TABLE, | ||
| ) | ||
|
|
||
|
|
||
| def _get_commit_logs(args: Namespace) -> Optional[list[str]]: | ||
| _delta_log_path = args.path | ||
| _delta_log_path[-1] = _delta_log_path[-1] + "/_delta_log" | ||
|
|
||
| _context = handle_context.get_command_context(_delta_log_path, raise_error=True) | ||
| assert isinstance(_context, OneLakeItem) | ||
| onelake: OneLakeItem = _context | ||
| workspace_id = onelake.workspace.id | ||
| item_id = onelake.item.id | ||
| local_path = onelake.local_path | ||
|
|
||
| local_path = utils.remove_dot_suffix(local_path) | ||
| args.directory = f"{workspace_id}/?recursive=false&resource=filesystem&directory={item_id}/{local_path}&getShortcutMetadata=true" | ||
| response = onelake_api.list_tables_files_recursive(args) | ||
|
|
||
| if response.status_code in {200, 201}: | ||
| file_names = [f["name"] for f in response.json().get("paths", [])] | ||
| json_files = [ | ||
| f"{workspace_id}/{item_id}/{f.split('/', 1)[1]}" | ||
| for f in file_names | ||
| if f.endswith(".json") and f != "_temporary" | ||
| ] | ||
| json_files.sort(reverse=True) | ||
| return json_files | ||
| return None | ||
|
|
||
|
|
||
| def _extract_schema_from_commit_logs(args: Namespace) -> Optional[str]: | ||
| commit_logs = _get_commit_logs(args) | ||
|
|
||
| if not commit_logs: | ||
| return None | ||
|
|
||
| for log in commit_logs: | ||
| args.from_path = log | ||
| args.wait = True | ||
| response = onelake_api.read(args) | ||
|
|
||
| if response.status_code in {200, 201}: | ||
| json_string = response.text | ||
| json_objects = json_string.strip().split("\n") | ||
|
|
||
| for obj in json_objects: | ||
| commit_data = json.loads(obj) | ||
| if "metaData" in commit_data: | ||
| metadata = commit_data["metaData"] | ||
| schema = metadata["schemaString"] | ||
|
pkontek marked this conversation as resolved.
|
||
| return schema | ||
|
|
||
| return None | ||
| def _get_table_schema(args: Namespace) -> list[dict]: | ||
| return delta_client.get_table_schema(args, args.table_local_path) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,8 +19,7 @@ | |
| ) | ||
|
|
||
| API_ENDPOINT_POWER_BI = ( | ||
| validate_and_get_env_variable( | ||
| "FAB_API_ENDPOINT_POWER_BI", "api.powerbi.com") | ||
| validate_and_get_env_variable("FAB_API_ENDPOINT_POWER_BI", "api.powerbi.com") | ||
| + "/v1.0/myorg" | ||
| ) | ||
|
|
||
|
|
@@ -264,7 +263,7 @@ | |
| ERROR_INVALID_OPERATION = "InvalidOperation" | ||
| ERROR_INVALID_PATH = "InvalidPath" | ||
| ERROR_INVALID_PROPERTY = "InvalidProperty" | ||
| ERROR_INVALID_DETLA_TABLE = "InvalidDeltaTable" | ||
| ERROR_INVALID_DELTA_TABLE = "InvalidDeltaTable" | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is fab_constant considered a stable/public interface that external tooling imports directly? If so, a deprecated alias makes sense. If it's internal-only, the sweep was clean and no alias is needed.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ayeshurun, what do you think about that?
pkontek marked this conversation as resolved.
|
||
| ERROR_INVALID_QUERY_FIELDS = "InvalidQueryFields" | ||
| ERROR_INVALID_WORKSPACE_TYPE = "InvalidWorkspaceType" | ||
| ERROR_INVALID_QUERY = "InvalidQuery" | ||
|
|
@@ -351,4 +350,3 @@ | |
|
|
||
| # Invalid query parameters for set command across all fabric resources | ||
| SET_COMMAND_INVALID_QUERIES = ["id", "type", "workspaceId", "folderId"] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.