Microsoft Fabric Updates Blog

Ingest Logs using Logstash into Real-Time Intelligence

Coauthor: Ramachandran G, RTI Engineering Manager, Microsoft

In today’s data-driven world, organizations rely heavily on real-time insights to monitor systems, detect anomalies, and make informed decisions. One of the key challenges in achieving this is efficiently ingesting and transforming log data from diverse sources into a format that can be analysed instantly.

Real-Time Intelligence

Real-time Intelligence is an end-to-end solution for event-driven scenarios, streaming data, and data logs. It enables the extraction of insights, visualization, and action on data in motion by handling data ingestion, transformation, storage, analytics, visualization, tracking, AI, and real-time actions.

The Eventstream feature in the Microsoft Fabric Real-Time Intelligence experience lets you bring real-time events into Fabric, transform them, and then route them to various destinations without writing any code (no-code). Additionally, with Apache Kafka endpoints available on the Eventstream item, you can send or consume real-time events using the Kafka protocol.

Eventhouse provide a solution for handling and analyzing large volumes of data, particularly in scenarios requiring real-time analytics and exploration. They’re designed to handle real-time data streams efficiently, which lets organizations ingest, process, and analyze data in near real-time. These aspects make Eventhouse useful for scenarios where timely insights are crucial.

Logstash

Logstash is an open-source data processing tool that enables the collection, transformation, and forwarding of data from a wide variety of sources. It acts as a data pipeline engine, helping organizations manage and streamline the flow of structured and unstructured data across systems.

Whether you’re managing infrastructure logs, application events, or telemetry data, this guide will walk you through setting up a seamless pipeline that bridges raw log data with real-time analytics in Fabric.

Step 1: Download Logstash Free | Get Started Now | Elastic

Step 2: Make sure Logstash is installed in root drive of the system

Ex: For windows, C:\ drive

Step 3: Ready the log producer

In this post, we are going to use a bash script to generate continuous system metrics. We can store the below script in a shell file Ex: GenerateJsonSample.sh.

#!/usr/bin/env bash
# you will need to install jq for JSON handling

while true
do
    # Generate random IP
    random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/  */./g')
    
    # Generate random response size and HTTP status
    random_size=$(( (RANDOM % 65535) + 1 ))
    status_codes=(200 201 400 404 500)
    random_status=${status_codes[$RANDOM % ${#status_codes[@]}]}

    # Generate current timestamp in ISO 8601
    timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

    # Random endpoint
    endpoints=("/api/data" "/user/login" "/metrics" "/products/123" "/health")
    random_endpoint=${endpoints[$RANDOM % ${#endpoints[@]}]}

    # Construct JSON log
    json_log=$(jq -c -n --arg ip "$random_ip" --arg ts "$timestamp" --arg endpoint "$random_endpoint" --arg status "$random_status" --arg size "$random_size" '{ip: $ip, timestamp: $ts, endpoint: $endpoint, status: ($status|tonumber), size: ($size|tonumber)}')


    echo "$json_log" | tee -a '/tmp/jsonlogs.txt'

    sleep 0.1
done

Step 4: Running the above script should generate logs in the below format

./GenerateJsonSample.sh 
{"ip":"191.137.181.189","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":760}
{"ip":"63.27.40.41","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":10908}
{"ip":"28.39.101.198","timestamp":"2025-07-08T04:58:04Z","endpoint":"/health","status":500,"size":6086}
{"ip":"233.220.66.250","timestamp":"2025-07-08T04:58:04Z","endpoint":"/api/data","status":200,"size":5718}
{"ip":"186.6.135.228","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":4729}
{"ip":"116.63.93.212","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24704}
{"ip":"206.89.144.138","timestamp":"2025-07-08T04:58:04Z","endpoint":"/products/123","status":404,"size":31}
{"ip":"151.218.40.55","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24437}
{"ip":"20.9.227.147","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":400,"size":10039}
{"ip":"131.148.221.81","timestamp":"2025-07-08T04:58:05Z","endpoint":"/products/123","status":404,"size":6435}
Example run of bash script using Linux distro on Windows

Note: For next steps,

  • If you want to stream logs to Eventhouse, follow steps 5 to 9.
  • If you want to stream logs to Eventstream, follow steps 10 to 13.

Stream logs to Eventhouse:

Step 5: Create table and ingestion mapping in Eventhouse

// create table to store logs
.create-merge table LogstashWeblogsJson (
    status: int,
    size: int,
    ip: string,
    timestamp: datetime,
    endpoint: string,
    host: dynamic,
    log: dynamic,
    event: dynamic
)


// create json mapping to map incoming data to columns in above table
.create table LogstashWeblogsJson ingestion json mapping "LogstashRawMapping" '[{ "column": "status",        "path": "$.status" },{ "column": "size",          "path": "$.size" }, { "column": "ip",            "path": "$.ip" },{ "column": "timestamp",     "path": "$.timestamp" },{ "column": "endpoint",      "path": "$.endpoint" },   { "column": "host",          "path": "$.host" },{ "column": "log",           "path": "$.log" },{ "column": "event",         "path": "$.event" }]'

Step 6: Get ingestion URL and table details from Eventhouse

From the Overview section on Eventhouse homepage, copy the ingestion URL.

Step 7: Configure input and output of Logstash

Config file for Logstash is present in config folder where Logstash is installed. Open it in a text editor and add input and output blocks.

Note: For authentication of Logstash to Eventhouse, either Service Principal, or CLI or Managed Identity can be used. The following example uses CLI authentication using my personal login credentials.

Note: Input should read from logs getting generated by GenerateJsonSample.sh . So, make sure path mentioned in input block matches the path in shell script.

input {
  file {
    path => "/tmp/jsonlogs.txt"
    start_position => "beginning"
    codec => json
  }
}
output {
    kusto {
        path => "C:\Logstash\tmp\kusto\%{+YYYY-MM-dd-HH-mm-ss}.txt"
		ingest_url => "<ingestion url copied in Step 6>"
        cli_auth => true
        database => "Logs_EH"
        table => "LogstashWeblogsJson"
        mapping => "LogstashRawMapping"
    }
}

Step 8: Run Logstash

.\logstash -f "C:\logstash\logstash-9.0.3\config\logstash-sample.conf"

Step 9: Data flowing into Eventhouse

Query the table in Eventhouse to see real-time log data flowing.

LogstashWeblogsJson
 | take 10

Stream logs to Eventstream:

Step 10: Get Kafka custom endpoint details

Create a ‘custom endpoint’ source in Eventstream. Once created, click on the source node and select ‘Kafka’ and collect details of Bootstrap server, Security protocol, Topic name, SASL mechanism, SASL JASS config

Step 11: Configure input and output of Logstash

similar to step 7 but output will point to the above kafka endpoint of Eventstream

input {
  file {
    path => "/tmp/jsonlogs.txt"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => json
  }
}

output {
  kafka {
      codec => json
      topic_id => "<Kafka topic name>"
      bootstrap_servers => "<Kafka bootstrap server>"
      security_protocol => "SASL_SSL"
      sasl_mechanism => "PLAIN"
      jaas_path => "<path to>/jaas.conf"
  }
}

Note: jaas.conf file should be present in local path with the jaas config copied in step 10. More details here – Kafka output plugin | Logstash Plugins

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
	username="$ConnectionString" 
	password="<SASL JAAS config>";
};

Step 12: Run logstash

.\logstash -f “C:\logstash\logstash-9.0.3\config\logstash-kafka.conf”

Step 13: Data flowing into Eventstream

Conclusion

Logstash output plugins are available for Real-Time Intelligence for ingesting logs, sensor or telemetry data and perform large scale analytics. We can stream the logs either to Eventstream or Eventhouse providing the flexibility to users.

Note: Although the documentation points to ADX, the plugin is fully compatible with Eventhouse.

Need help or want to suggest an improvement?

Reach out to us on RTI Forum: Get Help with Real-Time Intelligence.

Request or upvote a suggestion on Fabric Ideas RTI: Fabric Ideas.

Related blog posts

Ingest Logs using Logstash into Real-Time Intelligence

November 10, 2025 by Arun Ulagaratchagan

SQL is having its moment. From on-premises data centers to Azure Cloud Services to Microsoft Fabric, SQL has evolved into something far more powerful than many realize and it deserves the focused attention of a big stage.  That’s why I’m thrilled to announce SQLCon, a dedicated conference for database developers, database administrators, and database engineers. Co-located with FabCon for an unprecedented week of deep technical content … Continue reading “It’s Time! Announcing The Microsoft SQL Community Conference”

November 10, 2025 by Twinkle Cyril

Managing data consistency during ETL has always been a challenge for our customers. Dashboards break, KPIs fluctuate, and compliance audits become painful when reporting hits ‘half-loaded’ data. With Warehouse Snapshots, Microsoft Fabric solves this by giving you a stable, read-only view of your warehouse at a specific point in time and now, this capability is … Continue reading “Warehouse Snapshots in Microsoft Fabric (Generally Available)”