Ingest Logs using Logstash into Real-Time Intelligence
Coauthor: Ramachandran G, RTI Engineering Manager, Microsoft
In today’s data-driven world, organizations rely heavily on real-time insights to monitor systems, detect anomalies, and make informed decisions. One of the key challenges in achieving this is efficiently ingesting and transforming log data from diverse sources into a format that can be analysed instantly.
Real-Time Intelligence
Real-time Intelligence is an end-to-end solution for event-driven scenarios, streaming data, and data logs. It enables the extraction of insights, visualization, and action on data in motion by handling data ingestion, transformation, storage, analytics, visualization, tracking, AI, and real-time actions.
The Eventstream feature in the Microsoft Fabric Real-Time Intelligence experience lets you bring real-time events into Fabric, transform them, and then route them to various destinations without writing any code (no-code). Additionally, with Apache Kafka endpoints available on the Eventstream item, you can send or consume real-time events using the Kafka protocol.
Eventhouse provide a solution for handling and analyzing large volumes of data, particularly in scenarios requiring real-time analytics and exploration. They’re designed to handle real-time data streams efficiently, which lets organizations ingest, process, and analyze data in near real-time. These aspects make Eventhouse useful for scenarios where timely insights are crucial.
Logstash
Logstash is an open-source data processing tool that enables the collection, transformation, and forwarding of data from a wide variety of sources. It acts as a data pipeline engine, helping organizations manage and streamline the flow of structured and unstructured data across systems.
Whether you’re managing infrastructure logs, application events, or telemetry data, this guide will walk you through setting up a seamless pipeline that bridges raw log data with real-time analytics in Fabric.
Step 1: Download Logstash Free | Get Started Now | Elastic
Step 2: Make sure Logstash is installed in root drive of the system
Ex: For windows, C:\ drive

Step 3: Ready the log producer
In this post, we are going to use a bash script to generate continuous system metrics. We can store the below script in a shell file Ex: GenerateJsonSample.sh.
#!/usr/bin/env bash
# you will need to install jq for JSON handling
while true
do
# Generate random IP
random_ip=$(dd if=/dev/urandom bs=4 count=1 2>/dev/null | od -An -tu1 | sed -e 's/^ *//' -e 's/ */./g')
# Generate random response size and HTTP status
random_size=$(( (RANDOM % 65535) + 1 ))
status_codes=(200 201 400 404 500)
random_status=${status_codes[$RANDOM % ${#status_codes[@]}]}
# Generate current timestamp in ISO 8601
timestamp=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
# Random endpoint
endpoints=("/api/data" "/user/login" "/metrics" "/products/123" "/health")
random_endpoint=${endpoints[$RANDOM % ${#endpoints[@]}]}
# Construct JSON log
json_log=$(jq -c -n --arg ip "$random_ip" --arg ts "$timestamp" --arg endpoint "$random_endpoint" --arg status "$random_status" --arg size "$random_size" '{ip: $ip, timestamp: $ts, endpoint: $endpoint, status: ($status|tonumber), size: ($size|tonumber)}')
echo "$json_log" | tee -a '/tmp/jsonlogs.txt'
sleep 0.1
done
Step 4: Running the above script should generate logs in the below format
./GenerateJsonSample.sh
{"ip":"191.137.181.189","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":760}
{"ip":"63.27.40.41","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":10908}
{"ip":"28.39.101.198","timestamp":"2025-07-08T04:58:04Z","endpoint":"/health","status":500,"size":6086}
{"ip":"233.220.66.250","timestamp":"2025-07-08T04:58:04Z","endpoint":"/api/data","status":200,"size":5718}
{"ip":"186.6.135.228","timestamp":"2025-07-08T04:58:04Z","endpoint":"/metrics","status":400,"size":4729}
{"ip":"116.63.93.212","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24704}
{"ip":"206.89.144.138","timestamp":"2025-07-08T04:58:04Z","endpoint":"/products/123","status":404,"size":31}
{"ip":"151.218.40.55","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":404,"size":24437}
{"ip":"20.9.227.147","timestamp":"2025-07-08T04:58:04Z","endpoint":"/user/login","status":400,"size":10039}
{"ip":"131.148.221.81","timestamp":"2025-07-08T04:58:05Z","endpoint":"/products/123","status":404,"size":6435}

Note: For next steps,
- If you want to stream logs to Eventhouse, follow steps 5 to 9.
- If you want to stream logs to Eventstream, follow steps 10 to 13.
Stream logs to Eventhouse:
Step 5: Create table and ingestion mapping in Eventhouse
// create table to store logs
.create-merge table LogstashWeblogsJson (
status: int,
size: int,
ip: string,
timestamp: datetime,
endpoint: string,
host: dynamic,
log: dynamic,
event: dynamic
)
// create json mapping to map incoming data to columns in above table
.create table LogstashWeblogsJson ingestion json mapping "LogstashRawMapping" '[{ "column": "status", "path": "$.status" },{ "column": "size", "path": "$.size" }, { "column": "ip", "path": "$.ip" },{ "column": "timestamp", "path": "$.timestamp" },{ "column": "endpoint", "path": "$.endpoint" }, { "column": "host", "path": "$.host" },{ "column": "log", "path": "$.log" },{ "column": "event", "path": "$.event" }]'
Step 6: Get ingestion URL and table details from Eventhouse
From the Overview section on Eventhouse homepage, copy the ingestion URL.

Step 7: Configure input and output of Logstash
Config file for Logstash is present in config folder where Logstash is installed. Open it in a text editor and add input and output blocks.
Note: For authentication of Logstash to Eventhouse, either Service Principal, or CLI or Managed Identity can be used. The following example uses CLI authentication using my personal login credentials.
Note: Input should read from logs getting generated by GenerateJsonSample.sh . So, make sure path mentioned in input block matches the path in shell script.
input {
file {
path => "/tmp/jsonlogs.txt"
start_position => "beginning"
codec => json
}
}
output {
kusto {
path => "C:\Logstash\tmp\kusto\%{+YYYY-MM-dd-HH-mm-ss}.txt"
ingest_url => "<ingestion url copied in Step 6>"
cli_auth => true
database => "Logs_EH"
table => "LogstashWeblogsJson"
mapping => "LogstashRawMapping"
}
}
Step 8: Run Logstash
.\logstash -f "C:\logstash\logstash-9.0.3\config\logstash-sample.conf"
Step 9: Data flowing into Eventhouse
Query the table in Eventhouse to see real-time log data flowing.
LogstashWeblogsJson
| take 10

Stream logs to Eventstream:
Step 10: Get Kafka custom endpoint details
Create a ‘custom endpoint’ source in Eventstream. Once created, click on the source node and select ‘Kafka’ and collect details of Bootstrap server, Security protocol, Topic name, SASL mechanism, SASL JASS config

Step 11: Configure input and output of Logstash
similar to step 7 but output will point to the above kafka endpoint of Eventstream
input {
file {
path => "/tmp/jsonlogs.txt"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => json
}
}
output {
kafka {
codec => json
topic_id => "<Kafka topic name>"
bootstrap_servers => "<Kafka bootstrap server>"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "<path to>/jaas.conf"
}
}
Note: jaas.conf file should be present in local path with the jaas config copied in step 10. More details here – Kafka output plugin | Logstash Plugins
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="<SASL JAAS config>";
};
Step 12: Run logstash
.\logstash -f “C:\logstash\logstash-9.0.3\config\logstash-kafka.conf”
Step 13: Data flowing into Eventstream

Conclusion
Logstash output plugins are available for Real-Time Intelligence for ingesting logs, sensor or telemetry data and perform large scale analytics. We can stream the logs either to Eventstream or Eventhouse providing the flexibility to users.
- To learn about Logstash – Kafka output plugin, visit – Kafka output plugin | Logstash Plugins
- To learn about Logstash – Eventhouse output plugin, visit – Ingest data from Logstash to Azure Data Explorer
Note: Although the documentation points to ADX, the plugin is fully compatible with Eventhouse.
Need help or want to suggest an improvement?
Reach out to us on RTI Forum: Get Help with Real-Time Intelligence.
Request or upvote a suggestion on Fabric Ideas RTI: Fabric Ideas.