diff --git a/src/sempy_labs/__init__.py b/src/sempy_labs/__init__.py index 4f3d14f5..88a6e45e 100644 --- a/src/sempy_labs/__init__.py +++ b/src/sempy_labs/__init__.py @@ -120,6 +120,7 @@ create_environment, delete_environment, publish_environment, + list_environments, ) from sempy_labs._clear_cache import ( clear_cache, @@ -558,4 +559,5 @@ "delete_sql_database", "list_sql_databases", "delta_analyzer_history", + "list_environments", ] diff --git a/src/sempy_labs/_folders.py b/src/sempy_labs/_folders.py new file mode 100644 index 00000000..57739967 --- /dev/null +++ b/src/sempy_labs/_folders.py @@ -0,0 +1,275 @@ +import pandas as pd +from typing import Optional +from uuid import UUID +from sempy_labs._helper_functions import ( + resolve_workspace_name_and_id, + resolve_workspace_id, + _base_api, + _create_dataframe, + _update_dataframe_datatypes, + _is_valid_uuid, +) +import sempy_labs._icons as icons + + +def list_folders( + workspace: Optional[str | UUID] = None, + recursive: bool = True, + root_folder: Optional[str | UUID] = None, +) -> pd.DataFrame: + """ + Shows a list of folders from the specified workspace. + + Parameters + ---------- + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + recursive : bool, default=True + Lists folders in a folder and its nested folders, or just a folder only. True - All folders in the folder and its nested folders are listed, False - Only folders in the folder are listed. + root_folder : str | uuid.UUID, default=None + This parameter allows users to filter folders based on a specific root folder. If not provided, the workspace is used as the root folder. + + Returns + ------- + pandas.DataFrame + A pandas dataframe showing a list of folders from the specified workspace. + """ + + workspace_id = resolve_workspace_id(workspace) + + columns = { + "Folder Name": "string", + "Folder Id": "string", + "Parent Folder Id": "string", + } + + df = _create_dataframe(columns=columns) + + url = f"/v1/workspaces/{workspace_id}/folders?recursive={recursive}" + + if _is_valid_uuid(root_folder): + url += f"&rootFolderId={root_folder}" + + responses = _base_api( + request=url, + client="fabric_sp", + uses_pagination=True, + ) + + for r in responses: + for v in r.get("value", []): + new_data = { + "Folder Name": v.get("displayName"), + "Folder Id": v.get("id"), + "Parent Folder Id": v.get("parentFolderId"), + } + + df = pd.concat([df, pd.DataFrame(new_data, index=[0])], ignore_index=True) + + _update_dataframe_datatypes(dataframe=df, column_map=columns) + + # Add folder path + folder_map = {row["Folder Id"]: row["Folder Name"] for _, row in df.iterrows()} + + def get_folder_path(folder_id): + if folder_id not in folder_map: + return "" + + row = df.loc[df["Folder Id"] == folder_id].iloc[0] + if "Parent Folder Id" in row: + return get_folder_path(row["Parent Folder Id"]) + "/" + row["Folder Name"] + return row["Folder Name"] + + # Apply function to create the path column + df["Folder Path"] = df["Folder Id"].apply(get_folder_path) + + # Filter the folders if specified + if root_folder is not None and not _is_valid_uuid(root_folder): + root = df[df["Folder Name"] == root_folder] + if root.empty: + raise ValueError(f"Folder name '{root_folder}' not found.") + root_folder_id = root["Folder Id"].iloc[0] + df = df[df["Parent Folder Id"] == root_folder_id] + + return df + + +def create_folder( + name, + workspace: Optional[str | UUID] = None, + parent_folder: Optional[str | UUID] = None, +) -> UUID: + """ + Creates a new folder in the specified workspace. + + Parameters + ---------- + name : str + The name of the folder to create. + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + parent_folder : str | uuid.UUID, default=None + The ID of the parent folder. If not provided, the folder will be created in the root folder of the workspace. + + Returns + ------- + uuid.UUID + The ID of the created folder. + """ + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + + url = f"/v1/workspaces/{workspace_id}/folders" + + payload = { + "displayName": name, + } + + if parent_folder: + parent_folder_id = resolve_folder_id(folder=parent_folder, workspace=workspace) + payload["parentFolderId"] = parent_folder_id + + response = _base_api( + request=url, + client="fabric_sp", + method="post", + payload=payload, + status_codes=201, + ) + + print( + f"{icons.green_dot} The '{name}' folder has been successfully created within the '{workspace_name}' workspace." + ) + + return response.json().get("id") + + +def resolve_folder_id( + folder: str | UUID, workspace: Optional[str | UUID] = None +) -> UUID: + + if _is_valid_uuid(folder): + return folder + else: + df = list_folders(workspace=workspace) + if not folder.startswith("/"): + folder_path = f"/{folder}" + else: + folder_path = folder + df_filt = df[df["Folder Path"] == folder_path] + if df_filt.empty: + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + raise ValueError( + f"{icons.red_dot} The '{folder}' folder does not exist within the '{workspace_name}' workspace." + ) + return df_filt["Folder Id"].iloc[0] + + +def delete_folder(folder: str | UUID, workspace: Optional[str | UUID] = None): + """ + Deletes a folder from the specified workspace. + + Parameters + ---------- + folder : str | uuid.UUID + The name or ID of the folder to move. If the folder is a subfolder, specify the path (i.e. "/folder/subfolder"). + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + + folder_id = resolve_folder_id(folder=folder, workspace=workspace) + + _base_api( + request=f"/v1/workspaces/{workspace_id}/folders/{folder_id}", + client="fabric_sp", + method="delete", + ) + + print( + f"{icons.green_dot} The '{folder}' folder has been successfully deleted from the '{workspace_name}' workspace." + ) + + +def move_folder( + folder: str | UUID, + target_folder: str | UUID, + workspace: Optional[str | UUID] = None, +): + """ + Moves a folder to a new location in the workspace. + + Parameters + ---------- + folder : str | uuid.UUID + The name or ID of the folder to move. If the folder is a subfolder, specify the path (i.e. "/folder/subfolder"). + target_folder : str | uuid.UUID + The name or ID of the target folder where the folder will be moved. If the folder is a subfolder, specify the path (i.e. "/folder/subfolder"). + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + target_folder_id = resolve_folder_id(folder=target_folder, workspace=workspace) + folder_id = resolve_folder_id(folder=folder, workspace=workspace) + + payload = { + "targetFolderId": target_folder_id, + } + + _base_api( + request=f"/v1/workspaces/{workspace_id}/folders/{folder_id}/move", + client="fabric_sp", + method="post", + payload=payload, + ) + + print( + f"{icons.green_dot} The '{folder}' folder has been successfully moved to the '{target_folder}' folder within the '{workspace_name}' workspace." + ) + + +def update_folder( + folder: str | UUID, name: str, workspace: Optional[str | UUID] = None +): + """ + Updates the name of a folder in the specified workspace. + + Parameters + ---------- + folder : str | uuid.UUID + The name/path or ID of the folder to update. If the folder is a subfolder, specify the path (i.e. "/folder/subfolder"). + name : str + The new name for the folder. Must meet the `folder name requirements `_. + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + folder_id = resolve_folder_id(folder=folder, workspace=workspace) + + payload = { + "displayName": name, + } + + _base_api( + request=f"/v1/workspaces/{workspace_id}/folders/{folder_id}", + client="fabric_sp", + method="patch", + payload=payload, + ) + + print( + f"{icons.green_dot} The '{folder}' folder has been successfully updated to '{name}' within the '{workspace_name}' workspace." + ) diff --git a/src/sempy_labs/_items.py b/src/sempy_labs/_items.py new file mode 100644 index 00000000..b617f41d --- /dev/null +++ b/src/sempy_labs/_items.py @@ -0,0 +1,233 @@ +import sempy.fabric as fabric +import os +import pandas as pd +from sempy_labs._helper_functions import ( + _base_api, + resolve_workspace_name_and_id, + resolve_lakehouse_name_and_id, + _mount, +) +from uuid import UUID +from typing import Optional +import json +import sempy_labs._icons as icons +from sempy_labs.lakehouse._blobs import list_blobs +from sempy_labs._folders import ( + list_folders, + create_folder, +) +import re + + +# Item types which have definitions +item_list = [ + "CopyJob", + "Eventhouse", + "DataPipeline", + "KQLDatabase", + "KQLDashboard", + "KQLQueryset", + "MirroredDatabase", + "MountedDataFactory", + "Environment", + "Notebook", + "Report", + "SemanticModel", + "Eventstream", + # "Reflex", # This API is not working + "SparkJobDefinition", + "VariableLibrary", + # Dataflow, + # GraphQLApi, +] + + +def backup_item_definitions( + workspace: Optional[str | UUID] = None, + lakehouse: Optional[str | UUID] = None, + lakehouse_workspace: Optional[str | UUID] = None, +): + """ + Backups the item definitions of a workspace to a lakehouse. + """ + + (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) + (lakehouse_workspace_name, lakehouse_workspace_id) = resolve_workspace_name_and_id( + lakehouse_workspace + ) + (lakehouse_name, lakehouse_id) = resolve_lakehouse_name_and_id( + lakehouse=lakehouse, workspace=lakehouse_workspace_id + ) + local_path = _mount(lakehouse=lakehouse_id, workspace=lakehouse_workspace_id) + path_prefix = f"{local_path}/Files/SLL_backup_item_definitions/{workspace_name}" + + # dfI = fabric.list_items(workspace=workspace) + response = _base_api(request=f"/v1/workspaces/{workspace_id}/items?recursive=True") + df = pd.json_normalize(response.json()["value"]) + dfI_filt = df[df["type"].isin(item_list)] + # dfI_filt = dfI[dfI["Type"].isin(items)] + + dfF = list_folders(workspace=workspace) + + for _, r in dfI_filt.iterrows(): + item_name = r["displayName"] + item_id = r["id"] + description = r["description"] + folder_id = r.get("folderId") + item_type = r["type"] + print(f"{item_name} : {item_type}") + definition = _base_api( + request=f"/v1/workspaces/{workspace_id}/items/{item_id}/getDefinition", + method="post", + lro_return_json=True, + status_codes=None, + ) + + # Obtain the folder path + folder_path = "" + if folder_id: + df_filt = dfF[dfF["Folder Id"] == folder_id] + if not df_filt.empty: + folder_path = df_filt["Folder Path"].iloc[0] + + definition["description"] = description + definition["folderPath"] = folder_path + + file_path = f"{path_prefix}/{item_type}/{item_name}.json" + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "w") as json_file: + json.dump(definition, json_file, indent=4) + + print( + f"{icons.green_dot} The '{item_name}' {item_type}' definition has been backed up to the Files section of the '{lakehouse_name}' lakehouse within the '{lakehouse_workspace_name}' workspace." + ) + + +def restore_item_definitions( + backup_file_path: str, + target_workspace: Optional[str | UUID] = None, +): + """ + Creates items based on an item definition backup file path. + + Parameters + ---------- + backup_file_path : str + The path to the backup file. For example: "abfss://{lakehouse_id}@onelake.dfs.fabric.microsoft.com/{workspace_id}/Files/SLL_backup_item_definitions/My Workspace Name" + target_workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + """ + + (target_workspace_name, target_workspace_id) = resolve_workspace_name_and_id( + target_workspace + ) + + lakehouse_workspace_id = backup_file_path.split("abfss://")[1].split("@")[0] + lakehouse_id = backup_file_path.split("microsoft.com/")[1].split("/")[0] + folder_path = backup_file_path.split(f"microsoft.com/{lakehouse_id}/")[1] + + blobs = list_blobs( + lakehouse=lakehouse_id, workspace=lakehouse_workspace_id, container="Files" + ) + blobs_filt = blobs[ + (blobs["Blob Name"].str.startswith(f"{lakehouse_id}/{folder_path}")) + & (blobs["Blob Name"].str.endswith(".json")) + ] + + local_path = _mount(lakehouse=lakehouse_id, workspace=lakehouse_workspace_id) + + # Creating the folder structure + def ensure_folder_path_exists(folder_path): + # Normalize the paths if necessary + existing_paths = set( + dfF["Folder Path"].str.strip("/") + ) # remove leading/trailing slashes for easier comparison + + parts = folder_path.strip("/").split("/") + current_path = "" + + for part in parts: + if current_path: + current_path += "/" + part + else: + current_path = part + + if current_path not in existing_paths: + # Create the folder since it does not exist + parent_folder = ( + "/" + "/".join(current_path.split("/")[:-1]) + if "/" in current_path + else "/" + ) + # creation_folder = '/' + current_path + create_folder( + name=part, + workspace=target_workspace_id, + parent_folder=parent_folder, + ) + existing_paths.add(current_path) + + for _, r in blobs_filt.iterrows(): + blob_name = r["Blob Name"] + blob_file = blob_name.split(f"{lakehouse_id}")[1][1:] + file_name = os.path.basename(blob_file) + # directory = os.path.dirname(blob_file) + # folder_structure = os.path.dirname(directory) + item_type = os.path.basename(os.path.dirname(blob_file)) + item_name = os.path.splitext(file_name)[0] + definition_file_path = f"{local_path}/{blob_file}" + with open(definition_file_path, "r", encoding="utf-8") as file: + definition = json.load(file) + + description = definition.get("description") + folder_path = definition.get("folderPath") + raw_definition = definition.get('definition') + + payload = { + "displayName": item_name, + "type": item_type, + "definition": raw_definition, + } + + if description: + payload["description"] = description + if folder_path: + dfF = list_folders(workspace=target_workspace_id) + dfF_filt = dfF[dfF["Folder Path"] == folder_path] + if not dfF_filt.empty: + folder_id = dfF_filt["Folder Id"].iloc[0] + else: + folder_id = None + # Create the folder if it does not exist + ensure_folder_path_exists(folder_path) + # Get the folder ID again after creating it + dfF = list_folders(workspace=target_workspace_id) + dfF_filt = dfF[dfF["Folder Path"] == folder_path] + if not dfF_filt.empty: + folder_id = dfF_filt["Folder Id"].iloc[0] + + payload["folderId"] = folder_id + + # Create items... + _base_api( + request=f"/v1/workspaces/{target_workspace_id}/items", + method="post", + payload=payload, + status_codes=[201, 202], + lro_return_status_code=True, + ) + + print(f"{icons.green_dot} Created the '{item_name}' {_split_camel_case(item_type)} within the '{target_workspace_name}' workspace") + + +def _split_camel_case(text): + # Find acronym groups or normal words + matches = re.finditer(r'([A-Z]+(?=[A-Z][a-z])|[A-Z][a-z]*)', text) + words = [m.group(0) for m in matches] + + # Lowercase normal words, keep acronyms as-is + words = [w if w.isupper() else w.lower() for w in words] + + return ' '.join(words)