Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 169 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,169 @@
# Requirements
# RailGuard: Morocco High Speed Train Hackathon Project

Hello everyone! This project was developed by **Aya Harrak**, **Oumama Lemaoukni**, and **Er-Rougui Saad** as part of the Morocco High Speed Train Hackathon.

This README provides the steps to execute the codes included in this project.

---

## 1) System Requirements

To run this project, ensure you have the following:

- **Operating System**: Ubuntu 22.04
- **ROS2**: Used for fake sensor data publishing.
[Installation Guide for ROS2 on Ubuntu](https://docs.ros.org/en/humble/Installation/Ubuntu-Install-Debians.html)
- **Kafka**: Used for real-time data streaming.
[Apache Kafka Installation Guide]([https://kafka.apache.org/documentation/quickstart](https://hostman.com/tutorials/install-apache-kafka-on-ubuntu-22-04/)
- **Apache Spark**: Used for data processing.
[Apache Spark Installation Guide]([https://spark.apache.org/docs/latest/](https://phoenixnap.com/kb/install-spark-on-ubuntu)
- **Dependencies for Spark**:
Download necessary dependencies (`--jars` files) and place them in the directory `~/spark_jars`.
- **Grafana**: Used for data visualization and dashboards.
[Grafana Installation Guide](https://grafana.com/docs/grafana/latest/setup-grafana/installation/)

---

## 2) Sensors Publisher

To set up the ROS2-based sensor simulation:

1. Navigate to the RailGuards ROS2 workspace:
```bash
cd ~/railGuards/ros2

2. Build the ROS2 package:
```bash
colcon build --symlink-install

2. Source the ROS2 workspace by adding it to your bash configuration:
```bash
echo "source ~/railGuards/ros2/install/setup.bash" >> ~/.bashrc
```

```bash
source ~/.bashrc
```

4. Run the sensor simulator:

```bash
ros2 run sensors_simulator full_sensor_publisher
```

5. Verify the data:

List all ROS2 topics
```bash
ros2 topic list
```

Echo a topic, for example:

```bash
ros2 topic echo /air_temperature
```


## 3) Data Streaming

To set up Kafka for data streaming:

Install Kafka and unzip it into the directory ~/kafka.

Start the Kafka broker and Zookeeper:
```bash
~/kafka/bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties
```
In another terminal:
```bash
~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties
```

3. Verify Kafka is running:

Verify Kafka is running:

4. Launch the ROS2-Kafka bridge to send sensor data to Kafka:
```bash
ros2 run sensors_simulator full_ros2_kafka_bridge
```

5. Listen to the Kafka topic (fulldata) to view sensor data being streamed:

```bash
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fulldata
```

## Build

2. Build the ROS2 package:
```bash
colcon build --symlink-install
```
Source the ROS2 workspace by adding it to your bash configuration:

```bash
echo "source ~/railGuards/ros2/install/setup.bash" >> ~/.bashrc
```
```bash
source ~/.bashrc
```

Run the sensor simulator:

```bash
ros2 run sensors_simulator full_sensor_publisher
```

```bash
systemctl status kafka

Launch the ROS2-Kafka bridge to send sensor data to Kafka:

```bash
ros2 run sensors_simulator full_ros2_kafka_bridge
```

Listen to the Kafka topic (fulldata) to view sensor data being streamed:

```bash
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fulldata
```

## 4) Data Processing with Spark
Sensor data contains noise and requires processing. To process data using Apache Spark:

1. Navigate to the data processing directory:
```bash
cd ~/railGuards/data_processing
```

2. Run the Spark job:
```bash
spark-submit --jars ~/spark_jars/<required-jar-files> full_spark_processing.py
```

# 3. Verify that the cleaned data is being written to the Kafka topic (cleaned_sensor_data) and InfluxDB.


## 5) Visualization with Grafana
To visualize the processed data:

Install and set up Grafana.
Connect Grafana to InfluxDB as a data source.
Import the provided Grafana dashboard JSON file or manually create dashboards.
Use InfluxDB queries to visualize:
Sensor metrics (e.g., temperature, pressure).
Maintenance flags and anomaly rates from the Kafka topic future_anomaly_predictions.











132 changes: 132 additions & 0 deletions data_processing/full_spark_kafka_processing copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StructType, FloatType, BooleanType, TimestampType
from influxdb import InfluxDBClient
from datetime import datetime
from pyspark.sql.functions import current_timestamp

# InfluxDB 1.x configuration
INFLUXDB_HOST = "localhost"
INFLUXDB_PORT = 8086
INFLUXDB_DATABASE = "sensor_data" # InfluxDB 1.x database name

# Create InfluxDB client (no need for token and org in InfluxDB 1.x)
influx_client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT)
# Select the database
influx_client.switch_database(INFLUXDB_DATABASE)

def write_to_influxdb(batch_df):
"""
Write cleaned data from Spark batch to InfluxDB 1.x.
:param batch_df: Spark DataFrame containing cleaned data from the batch.
"""
try:
# Convert each row to a format suitable for InfluxDB
data = []
for row in batch_df.collect():
# Check if timestamp is null
if row["timestamp"] is None:
print("Timestamp is None, skipping this row.")
continue

# Ensure timestamp is in the correct format
# timestamp = int(row["timestamp"]) * 1000000000 # Convert to nanoseconds
timestamp = int(row["timestamp"].timestamp()) * 1000000000
# Construct data in the InfluxDB 1.x format
data.append(
{
"measurement": "cleaned_sensor_data", # This is the measurement name
"tags": {"source": "spark"},
"fields": {
"pressure": row["pressure"],
"oil_temperature": row["oil_temperature"],
"motor_current": row["motor_current"],
"air_temperature": row["air_temperature"],
"humidity": row["humidity"],
"vibration": row["vibration"],
"air_pressure": row["air_pressure"]
},
"time": timestamp
}
)

# Write data to InfluxDB in batch
if data:
influx_client.write_points(data)
print(f"Data written to InfluxDB: {data}")
except Exception as e:
print(f"Failed to write to InfluxDB: {e}")

# Define the schema for combined sensor data
schema = StructType() \
.add("timestamp", TimestampType()) \
.add("pressure", FloatType()) \
.add("oil_temperature", FloatType()) \
.add("motor_current", FloatType()) \
.add("air_temperature", FloatType()) \
.add("humidity", FloatType()) \
.add("vibration", FloatType()) \
.add("air_pressure", FloatType()) \
.add("valve_intake", BooleanType()) \
.add("valve_outlet", BooleanType()) \
.add("compressor_status", BooleanType()) \
.add("filter_status", BooleanType()) \
.add("safety_switch", BooleanType()) \
.add("overload_protection", BooleanType()) \
.add("emergency_stop", BooleanType()) \
.add("door_sensor", BooleanType())

def main():
# Create SparkSession
spark = SparkSession.builder \
.appName("Simple Sensor Data Cleaning") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# Read data from Kafka
raw_data = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "fulldata") \
.option("startingOffsets", "earliest") \
.load()

# Parse the JSON data
parsed_data = raw_data.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")

# Ensure the timestamp is not null, or replace it with the current timestamp
parsed_data = parsed_data.withColumn(
"timestamp", when(col("timestamp").isNull(), current_timestamp()).otherwise(col("timestamp"))
)

# Filter invalid data based on defined ranges
cleaned_data = parsed_data.filter(
(col("pressure").between(1.0, 10.0)) &
(col("oil_temperature").between(30.0, 90.0)) &
(col("motor_current").between(0.5, 5.0)) &
(col("air_temperature").between(15.0, 40.0)) &
(col("humidity").between(30.0, 90.0)) &
(col("vibration").between(0.1, 2.0)) &
(col("air_pressure").between(1.0, 10.0))
)

# Write the cleaned data to InfluxDB using foreachBatch
cleaned_data.writeStream \
.foreachBatch(lambda batch_df, batch_id: write_to_influxdb(batch_df)) \
.start()

# Write cleaned data to a new Kafka topic
query = cleaned_data.selectExpr("to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "cleaned_sensor_data") \
.option("checkpointLocation", "/tmp/spark_simple_clean_checkpoint") \
.start()

query.awaitTermination()

if __name__ == "__main__":
main()
Loading