FabricFlow is a code-first Python SDK for building, managing, and automating Microsoft Fabric data pipelines, workspaces, and core items. It provides a high-level, object-oriented interface for interacting with the Microsoft Fabric REST API, enabling you to create, execute, and monitor data pipelines programmatically.
- Pipeline Templates: Easily create data pipelines from reusable templates (e.g., SQL Server to Lakehouse).
- Pipeline Execution: Trigger, monitor, and extract results from pipeline runs.
- Copy & Lookup Activities: Build and execute copy and lookup activities with source/sink abstractions.
- Modular Architecture: Activities, sources, and sinks are organized in separate modules for better organization.
- Workspace & Item Management: CRUD operations for workspaces and core items.
- Connection & Capacity Utilities: Resolve and manage connections and capacities.
- Logging Utilities: Simple logging setup for consistent diagnostics.
- Service Principal Authentication: Authenticate securely with Microsoft Fabric REST API using Azure Service Principal credentials.
pip install fabricflowBelow is a sample workflow that demonstrates how to use FabricFlow to automate workspace creation, pipeline deployment, and data copy operations in Microsoft Fabric.
from sempy.fabric import FabricRestClient from fabricflow import create_workspace, create_data_pipeline from fabricflow.pipeline.activities import Copy, Lookup from fabricflow.pipeline.sources import SQLServerSource, GoogleBigQuerySource, PostgreSQLSource, FileSystemSource from fabricflow.pipeline.sinks import LakehouseTableSink, ParquetFileSink, LakehouseFilesSink from fabricflow.pipeline.sinks.types import FileCopyBehavior from fabricflow.pipeline.templates import ( DataPipelineTemplates, COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE, COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH, COPY_FILES_TO_LAKEHOUSE, LOOKUP_SQL_SERVER, LOOKUP_SQL_SERVER_FOR_EACH )fabric_client = FabricRestClient()Note: If you are using
ServicePrincipalTokenProvider, please make sure your Service Principal has access to the workspace and connections you are using.
capacity_name = "FabricFlow" workspace_name = "FabricFlow"You can create a new workspace, or use an existing one by specifying its name.
create_workspace(fabric_client, workspace_name, capacity_name)You can also create individual data pipeline templates by selecting specific templates from the list.
New Feature: You can now use custom pipeline templates by passing a dict (json payload), enum, or file path to
create_data_pipeline.
for template in DataPipelineTemplates: create_data_pipeline( fabric_client, template, workspace_name )SOURCE_CONNECTION_ID = "your-source-connection-id" SOURCE_DATABASE_NAME = "AdventureWorks2022" SINK_WORKSPACE_ID = "your-sink-workspace-id" SINK_LAKEHOUSE_ID = "your-sink-lakehouse-id" ITEMS_TO_LOAD = [ { "source_schema_name": "Sales", "source_table_name": "SalesOrderHeader", "source_query": "SELECT * FROM [Sales].[SalesOrderHeader]", "sink_table_name": "SalesOrderHeader", "sink_schema_name": "dbo", "sink_table_action": "Overwrite", "load_type": "Incremental", "primary_key_columns": ["SalesOrderID"], "skip": True, "load_from_timestamp": None, "load_to_timestamp": None, }, # Add more items as needed... ]You can copy data using either a single item per pipeline run (Option 1) or multiple items per pipeline run (Option 2). Choose the option that best fits your requirements.
Note: The examples below uses the new
Copyclass. You can also useCopyManagerfor backward compatibility, butCopyis recommended for new code.
copy = Copy( fabric_client, workspace_name, COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE ) source = SQLServerSource( source_connection_id=SOURCE_CONNECTION_ID, source_database_name=SOURCE_DATABASE_NAME, source_query=ITEMS_TO_LOAD[0]["source_query"], ) sink = LakehouseTableSink( sink_workspace=SINK_WORKSPACE_ID, sink_lakehouse=SINK_LAKEHOUSE_ID, sink_table_name=ITEMS_TO_LOAD[0]["sink_table_name"], sink_schema_name=ITEMS_TO_LOAD[0]["sink_schema_name"], sink_table_action=ITEMS_TO_LOAD[0]["sink_table_action"], ) result = ( copy .source(source) .sink(sink) .execute() )copy = Copy( fabric_client, workspace_name, COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH ) source = SQLServerSource( source_connection_id=SOURCE_CONNECTION_ID, source_database_name=SOURCE_DATABASE_NAME, ) sink = LakehouseTableSink( sink_workspace=SINK_WORKSPACE_ID, sink_lakehouse=SINK_LAKEHOUSE_ID, ) result = ( copy .source(source) .sink(sink) .items(ITEMS_TO_LOAD) .execute() )FabricFlow now supports lookup operations for data validation and enrichment:
# Single lookup operation lookup = Lookup( fabric_client, workspace_name, LOOKUP_SQL_SERVER ) source = SQLServerSource( source_connection_id=SOURCE_CONNECTION_ID, source_database_name=SOURCE_DATABASE_NAME, source_query="SELECT COUNT(*) as record_count FROM [Sales].[SalesOrderHeader]", ) result = ( lookup .source(source) .execute() ) # Multiple lookup operations lookup_items = [ { "source_query": "SELECT COUNT(*) as order_count FROM [Sales].[SalesOrderHeader]", "first_row_only": True, }, { "source_query": "SELECT MAX(OrderDate) as latest_order FROM [Sales].[SalesOrderHeader]", "first_row_only": True, } ] lookup = Lookup( fabric_client, workspace_name, LOOKUP_SQL_SERVER_FOR_EACH ) result = ( lookup .source(source) .items(lookup_items) .execute() )FabricFlow now supports copying files from file servers directly to Lakehouse Files area:
copy = Copy( fabric_client, workspace_name, COPY_FILES_TO_LAKEHOUSE ) # Define file system source with pattern matching and filtering source = FileSystemSource( source_connection_id="your-file-server-connection-id", source_folder_pattern="incoming/data/*", # Wildcard folder pattern source_file_pattern="*.csv", # File pattern source_modified_after="2025-01-01T00:00:00Z", # Optional date filter recursive_search=True, # Recursive directory search delete_source_after_copy=False, # Keep source files max_concurrent_connections=10 # Connection limit ) # Define lakehouse files sink sink = LakehouseFilesSink( sink_lakehouse="data-lakehouse", sink_workspace="analytics-workspace", sink_directory="processed/files", # Target directory in lakehouse copy_behavior=FileCopyBehavior.PRESERVE_HIERARCHY, # Maintain folder structure enable_staging=False, # Direct copy without staging parallel_copies=4, # Parallel operations max_concurrent_connections=10 # Connection limit ) result = ( copy .source(source) .sink(sink) .execute() )Below are the main classes and functions available in FabricFlow:
DataPipelineExecutor– Execute data pipelines and monitor their status.DataPipelineError– Exception class for pipeline errors.PipelineStatus– Enum for pipeline run statuses.DataPipelineTemplates– Enum for pipeline templates.get_template– Retrieve a pipeline template definition.get_base64_str– Utility for base64 encoding of template files.create_data_pipeline– Create a new data pipeline from template.
Copy– Build and execute copy activities (replacesCopyManager).Lookup– Build and execute lookup activities for data validation.
SQLServerSource– Define SQL Server as a data source.GoogleBigQuerySource– Define Google BigQuery as a data source.PostgreSQLSource– Define PostgreSQL as a data source.FileSystemSource– Define file server as a data source for file-based operations.BaseSource– Base class for all data sources.LakehouseTableSink– Define a Lakehouse table as a data sink.ParquetFileSink– Define a Parquet file as a data sink.LakehouseFilesSink– Define Lakehouse Files area as a data sink for file operations.BaseSink– Base class for all data sinks.SinkType/SourceType– Enums for sink and source types.FileCopyBehavior– Enum for file copy behavior options.
FabricCoreItemsManager– Manage core Fabric items via APIs.FabricWorkspacesManager– Manage Fabric workspaces via APIs.get_workspace_id– Get a workspace ID or return the current one.create_workspace– Create a new workspace and assign to a capacity.FabricItemType– Enum for Fabric item types.
setup_logging– Configure logging for diagnostics.resolve_connection_id– Resolve a connection by name or ID.resolve_capacity_id– Resolve a capacity by name or ID.ServicePrincipalTokenProvider– Handles Azure Service Principal authentication.
FabricFlow provides a modular architecture with separate packages for activities, sources, sinks, and templates:
- Activities:
Copy,Lookup- Build and execute pipeline activities - Sources:
SQLServerSource,GoogleBigQuerySource,PostgreSQLSource,FileSystemSource,BaseSource,SourceType- Define data sources - Sinks:
LakehouseTableSink,ParquetFileSink,LakehouseFilesSink,BaseSink,SinkType,FileCopyBehavior- Define data destinations - Templates: Pre-built pipeline definitions for common patterns
- CopyManager → Copy: The
CopyManagerclass is now renamed toCopyfor consistency. Existing code usingCopyManagerwill continue to work (backward compatible alias), but new code should useCopy.
Read the Contributing file.
Parth Lad