Microsoft Fabric Updates Blog

Ingest Logs using Logstash into Real-Time Intelligence

Coauthor: Ramachandran G, RTI Engineering Manager, Microsoft

In today’s data-driven world, organizations rely heavily on real-time insights to monitor systems, detect anomalies, and make informed decisions. One of the key challenges in achieving this is efficiently ingesting and transforming log data from diverse sources into a format that can be analysed instantly.

Real-Time Intelligence

Real-time Intelligence is an end-to-end solution for event-driven scenarios, streaming data, and data logs. It enables the extraction of insights, visualization, and action on data in motion by handling data ingestion, transformation, storage, analytics, visualization, tracking, AI, and real-time actions.

The Eventstream feature in the Microsoft Fabric Real-Time Intelligence experience lets you bring real-time events into Fabric, transform them, and then route them to various destinations without writing any code (no-code). Additionally, with Apache Kafka endpoints available on the Eventstream item, you can send or consume real-time events using the Kafka protocol.

Eventhouse provide a solution for handling and analyzing large volumes of data, particularly in scenarios requiring real-time analytics and exploration. They’re designed to handle real-time data streams efficiently, which lets organizations ingest, process, and analyze data in near real-time. These aspects make Eventhouse useful for scenarios where timely insights are crucial.

Logstash

Logstash is an open-source data processing tool that enables the collection, transformation, and forwarding of data from a wide variety of sources. It acts as a data pipeline engine, helping organizations manage and streamline the flow of structured and unstructured data across systems.

Whether you’re managing infrastructure logs, application events, or telemetry data, this guide will walk you through setting up a seamless pipeline that bridges raw log data with real-time analytics in Fabric.

Step 1: Download Logstash Free | Get Started Now | Elastic

Step 2: Make sure Logstash is installed in root drive of the system

Ex: For windows, C:\ drive

Step 3: Ready the log producer

In this post, we are going to use a bash script to generate continuous system metrics. We can store the below script in a shell file Ex: GenerateJsonSample.sh.

#!/usr/bin/env bash
# you will need to install jq for JSON handling

while true
do
    # Generate random IP
    random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/  */./g')
    
    # Generate random response size and HTTP status
    random_size=$(( (RANDOM % 65535) + 1 ))
    status_codes=(200 201 400 404 500)
    random_status=${status_codes[$RANDOM % ${#status_codes[@]}]}

    # Generate current timestamp in ISO 8601
    timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

    # Random endpoint
    endpoints=("/api/data" "/user/login" "/metrics" "/products/123" "/health")
    random_endpoint=${endpoints[$RANDOM % ${#endpoints[@]}]}

    # Construct JSON log
    json_log=$(jq -c -n --arg ip "$random_ip" --arg ts "$timestamp" --arg endpoint "$random_endpoint" --arg status "$random_status" --arg size "$random_size" '{ip: $ip, timestamp: $ts, endpoint: $endpoint, status: ($status|tonumber), size: ($size|tonumber)}')


    echo "$json_log" | tee -a '/tmp/jsonlogs.txt'

    sleep 0.1
done

Step 4: Running the above script should generate logs in the below format

./GenerateJsonSample.sh 
{"ip":"191.137.181.189","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":760}
{"ip":"63.27.40.41","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":10908}
{"ip":"28.39.101.198","timestamp":"2025-07-08T04:58:04Z","endpoint":"/health","status":500,"size":6086}
{"ip":"233.220.66.250","timestamp":"2025-07-08T04:58:04Z","endpoint":"/api/data","status":200,"size":5718}
{"ip":"186.6.135.228","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":4729}
{"ip":"116.63.93.212","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24704}
{"ip":"206.89.144.138","timestamp":"2025-07-08T04:58:04Z","endpoint":"/products/123","status":404,"size":31}
{"ip":"151.218.40.55","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24437}
{"ip":"20.9.227.147","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":400,"size":10039}
{"ip":"131.148.221.81","timestamp":"2025-07-08T04:58:05Z","endpoint":"/products/123","status":404,"size":6435}
Example run of bash script using Linux distro on Windows

Note: For next steps,

  • If you want to stream logs to Eventhouse, follow steps 5 to 9.
  • If you want to stream logs to Eventstream, follow steps 10 to 13.

Stream logs to Eventhouse:

Step 5: Create table and ingestion mapping in Eventhouse

// create table to store logs
.create-merge table LogstashWeblogsJson (
    status: int,
    size: int,
    ip: string,
    timestamp: datetime,
    endpoint: string,
    host: dynamic,
    log: dynamic,
    event: dynamic
)


// create json mapping to map incoming data to columns in above table
.create table LogstashWeblogsJson ingestion json mapping "LogstashRawMapping" '[{ "column": "status",        "path": "$.status" },{ "column": "size",          "path": "$.size" }, { "column": "ip",            "path": "$.ip" },{ "column": "timestamp",     "path": "$.timestamp" },{ "column": "endpoint",      "path": "$.endpoint" },   { "column": "host",          "path": "$.host" },{ "column": "log",           "path": "$.log" },{ "column": "event",         "path": "$.event" }]'

Step 6: Get ingestion URL and table details from Eventhouse

From the Overview section on Eventhouse homepage, copy the ingestion URL.

Step 7: Configure input and output of Logstash

Config file for Logstash is present in config folder where Logstash is installed. Open it in a text editor and add input and output blocks.

Note: For authentication of Logstash to Eventhouse, either Service Principal, or CLI or Managed Identity can be used. The following example uses CLI authentication using my personal login credentials.

Note: Input should read from logs getting generated by GenerateJsonSample.sh . So, make sure path mentioned in input block matches the path in shell script.

input {
  file {
    path => "/tmp/jsonlogs.txt"
    start_position => "beginning"
    codec => json
  }
}
output {
    kusto {
        path => "C:\Logstash\tmp\kusto\%{+YYYY-MM-dd-HH-mm-ss}.txt"
		ingest_url => "<ingestion url copied in Step 6>"
        cli_auth => true
        database => "Logs_EH"
        table => "LogstashWeblogsJson"
        mapping => "LogstashRawMapping"
    }
}

Step 8: Run Logstash

.\logstash -f "C:\logstash\logstash-9.0.3\config\logstash-sample.conf"

Step 9: Data flowing into Eventhouse

Query the table in Eventhouse to see real-time log data flowing.

LogstashWeblogsJson
 | take 10

Stream logs to Eventstream:

Step 10: Get Kafka custom endpoint details

Create a ‘custom endpoint’ source in Eventstream. Once created, click on the source node and select ‘Kafka’ and collect details of Bootstrap server, Security protocol, Topic name, SASL mechanism, SASL JASS config

Step 11: Configure input and output of Logstash

similar to step 7 but output will point to the above kafka endpoint of Eventstream

input {
  file {
    path => "/tmp/jsonlogs.txt"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => json
  }
}

output {
  kafka {
      codec => json
      topic_id => "<Kafka topic name>"
      bootstrap_servers => "<Kafka bootstrap server>"
      security_protocol => "SASL_SSL"
      sasl_mechanism => "PLAIN"
      jaas_path => "<path to>/jaas.conf"
  }
}

Note: jaas.conf file should be present in local path with the jaas config copied in step 10. More details here – Kafka output plugin | Logstash Plugins

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
	username="$ConnectionString" 
	password="<SASL JAAS config>";
};

Step 12: Run logstash

.\logstash -f “C:\logstash\logstash-9.0.3\config\logstash-kafka.conf”

Step 13: Data flowing into Eventstream

Conclusion

Logstash output plugins are available for Real-Time Intelligence for ingesting logs, sensor or telemetry data and perform large scale analytics. We can stream the logs either to Eventstream or Eventhouse providing the flexibility to users.

Note: Although the documentation points to ADX, the plugin is fully compatible with Eventhouse.

Need help or want to suggest an improvement?

Reach out to us on RTI Forum: Get Help with Real-Time Intelligence.

Request or upvote a suggestion on Fabric Ideas RTI: Fabric Ideas.

Billets de blog associés

Ingest Logs using Logstash into Real-Time Intelligence

décembre 3, 2025 par Pradeep Srikakolapu

Deployment Challenges While Solutions Are in Development Microsoft Fabric has revolutionized data analytics with its unified platform, but deploying complex architectures with cross-dependencies remains a significant challenge for organizations. The good news is that the Microsoft Fabric team is actively working on native warehouse deployment capabilities with DacFx, cross-item dependency resolution, and cross-warehouse reference support. … Continue reading “Bridging the Gap: Automate Warehouse & SQL Endpoint Deployment in Microsoft Fabric”

décembre 1, 2025 par Ye Xu

Copy job is the recommended approach in Microsoft Fabric Data Factory for moving data from any sources to any destinations in a simplified and efficient way—whether you’re transferring data across clouds, from on-premises systems, or between services. With native support for multiple delivery patterns, including bulk copy, incremental copy, and change data capture (CDC) replication, … Continue reading “Simplifying Data Ingestion with Copy job – Replicate data from Dataverse through Fabric to multiple destinations”