How to Build Microsoft Fabric Monitoring with Automated Alerts – Part 2

Microsoft Fabric monitoring is essential when you’re running production pipelines – you need to know when something breaks before your users do. In my previous post, I walked through how to connect and ingest logs using a notebook. Now let’s take it a step further – automating the entire process with built-in alerting.

By automation, I mean creating a pipeline with a notification system that fires off alerts the moment something goes wrong. This setup enables early detection of anomalies and issues, giving you time to resolve them before they snowball into bigger problems.

To achieve this, I extended my previous notebook and integrated it with Fabric Data Factory pipeline orchestration. The goal was to log every entry from the tenant to monitor pipeline failures, and support team alerts.

Here’s what we’ll cover:

  • Ingesting logs from the Fabric API endpoint
  • Loading them into Lakehouse
  • Setting up alerts when issues are detected

I scheduled the flow to run every 30 minutes, but you can run it more frequently depending on your capacity. I’m using the trial version which corresponds to F64, and the entire flow takes around 3-5 minutes. Below you can find an overall picture of the architecture.

Pasted image 20251216112108.png
Ok, let’s now go to the Fabric workspace to present the technical details.

Setting Up Microsoft Fabric Monitoring Notebook

Starting from the beginning – I’ve extended the notebook to ingest and load data into Lakehouse as follows:

Add Pipeline Run ID Parameter

Start by adding a new cell at the top to capture the pipeline_run_id from the Fabric Pipeline. By default, you can set it to any GUID for testing purposes.

Python
pipeline_rund_id = "1caf67a5-2bba-4085-920c-6b3f07c90523" #You can add any GUID

Heads up: You need to set that cell as a toggle parameter:
Pasted image 20251216085608.png

Import Libraries

Import the necessary libraries: requests for making HTTP requests, PySpark (col, max) for column selection and aggregation, pandas for data manipulation, json for handling JSON data, datetime for date and time manipulation, and UUID for unique identifier generation.

Python
# Import all libraries

from pyspark.sql.functions import col, max
from datetime import datetime, timedelta
import requests
import pandas as pd
import json
import uuid

Execute API Call

Make the API call using the requests.get method and store the response.

Python
# Execute API call and save a response to variable
token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com")

Define Time Range for Log Ingestion

Define the start and end date for log ingestion. The default setting is to take all logs starting from the last logged item till now. If there’s no table yet (initial run), do ingestion for the last 24 hours.

Python
try:
	fabric_logs_main_df = spark.read.format("delta").load(f'Tables/fabric_logs')
	start_time = fabric_logs_main_df.agg(max("jobStartTimeUtc")).collect()[0][0]
except:
	print('No Table Found - setting yesterday as time to get the logs.')
	start_time = datetime.today() - timedelta(hours=24, minutes=0)

end_time = datetime.today()

Prepare API Request

Define the URL and parameters for the API request. The parameters include filters for the data you want to retrieve – in this case, taking all data from the latest saved logs (based on start_time variable) till current time (based on end_time variable). It also limits the statuses to only finished ones, excluding in-progress, pending, and queued processes.

Python
url = "<YOUR URL>"
params = {
    "limit": 10000, # Maximum number of rows to be retrieved
    "endTime": f"{end_time}", # Limiting range of StartTime of the Job
    "startTime": f"{start_time}", # Limiting range of StartTime of the Job
    "status":"4,6,2,3,5,7,8", # All processed statuses. Excluded all in_progress or pending.
    "Accept": "application/json", # Do not manipulate to get json output
    "accept-encoding": "gzip, deflate, br, zstd", # Do not manipulate
    "activityid": "d6678b61-1c72-4e43-9498-54b715064543", # It could be any GUID
    "accept-language": "en-US,en;q=0.9" # Do not manipulate
}
headers = {
    "Authorization": f"Bearer {token}" # Authorization using the generated token
}

Execute the API Call

Make the API call and store the response.

Python
# Execute API call and save a response to variable
response = requests.get(url, headers=headers, params=params)

Process JSON Data

Extract the JSON data from the response and convert it into a pandas DataFrame for easy manipulation and analysis.

Python
# Take json data from response and save it to variable
data = response.json()
# Open JSON and save it as pandas DF
df = pd.json_normalize(data)

Add Pipeline Run ID Column

Add pipeline_run_id to recognize which pipeline run collected which logs.

Python
df['pipeline_run_id'] = pipeline_rund_id

Check for Captured Logs

Check if the DataFrame is empty and set up the df_processing flag to determine whether to save to the Lakehouse table.

Python
if not df.empty:
    df_processing = True
else:
    df_processing = False

Load DataFrame to Table

Check if the df_processing flag is True, then convert all columns to string. This isn’t the most elegant solution, but since we’re lacking proper documentation from Microsoft to rely on, I’d suggest starting with this approach and then adjusting columns to proper datatypes later. The logs are loaded in append mode with mergeSchema option set to true so you don’t lose any new columns that might appear.

Python
if df_processing:
    for col_name in df.columns:
        df[col_name] = df[col_name].astype('str')
    df = spark.createDataFrame(df)
    df.write.format("delta").mode("append").option('mergeSchema', 'true').save('Tables/fabric_logs')

Create Failures DataFrame

Filter only failed execution logs and store them in a new DataFrame called df_failures.

Python
df_failures = df.filter(df.isSuccessful == False)

Check for Failures

Check if df_failures is empty and set up the df_failures_processing flag to determine whether to create a list of errors for Teams/email notifications.

Python
if not df_failures.rdd.isEmpty():
    df_failures_processing = True
else:
    df_failures_processing = False

Prepare Array for Notifications

Select and rename informative columns from df_failures (these work for me but you can adjust based on your needs), convert the DataFrame to a JSON list, and assign it to result_array. If df_failures_processing is false, result_array is set to an empty list.

Python
if df_failures_processing:
    from pyspark.sql import functions as F
    result_df = df_failures.select(
        df_failures["workspaceName"], # Name of the workspace
        df_failures["workspaceObjectId"].alias("WorkspaceId"), # Workspace ID
        df_failures["artifactName"].alias("FailedItemName"), # Name of the item failed
        df_failures["artifactType"].alias("FailedItemType"), # Type of the failed item (like pipeline, notebook etc.
        df_failures["serviceExceptionJson"].alias("ErrorDetails"), # Json with ErrorDetails
        df_failures["`ownerUser.name`"].alias("OwnerUserName"), # Owner user name of failed item
        df_failures["`ownerUser.userPrincipalName`"].alias("OwnerUserEmail"), # Owner email of failed item
        df_failures["jobScheduleTimeUtc"], # Scheduled time
        df_failures["jobStartTimeUtc"], # Start time
        df_failures["jobEndTimeUtc"] # End time
    )
    # collect failures and format to json
    result_json = result_df.toJSON().collect()
    # format json to a list, to send them separetely.
    result_array = [json.loads(item) for item in result_json]
else:
    result_array=[]

Close Notebook with Output

Pass the array as output from the notebook, which will be used in the Fabric Pipeline to send email/Teams notifications.

Python
mssparkutils.notebook.exit(json.dumps(result_array))

Creating the Microsoft Fabric Monitoring Pipeline

The next thing to prepare is the Fabric Pipeline for orchestrating the whole process.

Create Pipeline and Add Notebook Activity

First, create a new pipeline. In the pipeline, add the Notebook activity and select your prepared notebook:

Pasted image 20251216090222.png

Configure the Notebook Activity

Configure the notebook to be executed:

Pasted image 20251216090238.png
  1. Choose the workspace where you have the notebook created
  2. Choose the notebook for logs ingestion
  3. Add the name of the parameter from the toggle parameter cell – in my case it’s pipeline_run_id
  4. Choose String data type
  5. Click on Add dynamic content and add the following value: @pipeline().RunId. This takes the system variable which is the unique identifier of the running pipeline – you can check the details here: Parameters – Microsoft Fabric | Microsoft Learn

Configuring Alert Notifications

Now it’s time to set up the notifications. First, add a ForEach loop activity, which will execute notifications for each item from the result_array.

Pasted image 20251216090304.png
  1. Add ForEach loop activity
  2. Link it by On Success path
  3. Name it as you wish
Pasted image 20251216090335.png
  1. Go to the Settings tab
  2. Leave sequential option unchecked – this allows ForEach to run in parallel
  3. Set up batch count – this limits the number of parallel runs (I’ve set it to 10)
  4. Add array output from Notebook using: @json(activity('Logs Ingestion Notebook').output.result.exitValue)

Add Email Notification

Now click on “Edit” in the ForEach loop and add Teams or Outlook activities. I’ll set it up for emails.

Pasted image 20251216090435.png

  1. Add Outlook activity
  2. Name it as you wish
Pasted image 20251216090457.png
  1. Go to the Settings tab
  2. Sign in using your account or a prepared technical email for notifications. Once signed in, the activity will show you the menu to set up the email
Pasted image 20251216090526.png
  1. Here the account you’re logged in with will be displayed – this will be used for email notifications
  2. Add the email address where you want to send the notification
  3. Subject of the email – I’ve added an expression to build the subject “Issue spotted in workspace <workspace_name>”. Expression: Issue spotted in workspace @{item().WorkspaceName}
  4. Body of the email – I’ve listed all items from the result_array like this:
HTML
<p>Hello,</p>
<p>The issue have been spotted during jobs execution, you can find a details below:</p>
<ul>
  <li>Workspace Name: @{item().WorkspaceName}</li>
  <li>Workspace Id: @{item().WorkspaceId}</li>
  <li>Failed Item Name: @{item().FailedItemName}</li>
  <li>Failed Item Type: @{item().FailedItemType}</li>
  <li>ErrorMessage: @{item().ErrorMessage}</li>
  <li>Owner User Name: @{item().OwnerUserName}</li>
  <li>Owner User Email: @{item().OwnerUserEmail}</li>
  <li>Scheduled On: @{item().jobScheduleTimeUtc}</li>
  <li>Started On: @{item().jobStartTimeUtc}</li>
  <li>Finished On: @{item().jobEndTimeUtc}</li>
</ul>

GitHub Repository

The whole solution is available on my GitHub Repository: GitHub. You can sync the content directly to your Fabric workspace.

Wrapping Up

Automating log ingestion and monitoring using Fabric Data Factory pipeline orchestration can significantly enhance your ability to detect and resolve issues swiftly. By setting up a robust alerting system, you ensure that anomalies are promptly addressed, minimizing potential disruptions.

The key steps involve:

  • Ingesting logs from the Fabric API endpoint
  • Loading them into Lakehouse
  • Setting up alerts for detected issues

Scheduling the flow to run at regular intervals ensures continuous monitoring and timely detection of any problems.

I hope this guide helps you streamline your log management process and improve your overall system efficiency. If you have any questions or need further assistance, feel free to reach out.

Happy automating!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *