diff --git a/README.md b/README.md index 1fa9034..b6bbbb1 100644 --- a/README.md +++ b/README.md @@ -1 +1,169 @@ -# Requirements +# RailGuard: Morocco High Speed Train Hackathon Project + +Hello everyone! This project was developed by **Aya Harrak**, **Oumama Lemaoukni**, and **Er-Rougui Saad** as part of the Morocco High Speed Train Hackathon. + +This README provides the steps to execute the codes included in this project. + +--- + +## 1) System Requirements + +To run this project, ensure you have the following: + +- **Operating System**: Ubuntu 22.04 +- **ROS2**: Used for fake sensor data publishing. + [Installation Guide for ROS2 on Ubuntu](https://docs.ros.org/en/humble/Installation/Ubuntu-Install-Debians.html) +- **Kafka**: Used for real-time data streaming. + [Apache Kafka Installation Guide]([https://kafka.apache.org/documentation/quickstart](https://hostman.com/tutorials/install-apache-kafka-on-ubuntu-22-04/) +- **Apache Spark**: Used for data processing. + [Apache Spark Installation Guide]([https://spark.apache.org/docs/latest/](https://phoenixnap.com/kb/install-spark-on-ubuntu) +- **Dependencies for Spark**: + Download necessary dependencies (`--jars` files) and place them in the directory `~/spark_jars`. +- **Grafana**: Used for data visualization and dashboards. + [Grafana Installation Guide](https://grafana.com/docs/grafana/latest/setup-grafana/installation/) + +--- + +## 2) Sensors Publisher + +To set up the ROS2-based sensor simulation: + +1. Navigate to the RailGuards ROS2 workspace: + ```bash + cd ~/railGuards/ros2 + +2. Build the ROS2 package: + ```bash + colcon build --symlink-install + +2. Source the ROS2 workspace by adding it to your bash configuration: + ```bash + echo "source ~/railGuards/ros2/install/setup.bash" >> ~/.bashrc +``` + + ```bash + source ~/.bashrc +``` + +4. Run the sensor simulator: + +```bash + ros2 run sensors_simulator full_sensor_publisher + ``` + +5. Verify the data: + +List all ROS2 topics +```bash +ros2 topic list +``` + +Echo a topic, for example: + +```bash +ros2 topic echo /air_temperature +``` + + +## 3) Data Streaming + +To set up Kafka for data streaming: + +Install Kafka and unzip it into the directory ~/kafka. + +Start the Kafka broker and Zookeeper: +```bash +~/kafka/bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties +``` +In another terminal: +```bash +~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties +``` + +3. Verify Kafka is running: + +Verify Kafka is running: + +4. Launch the ROS2-Kafka bridge to send sensor data to Kafka: +```bash +ros2 run sensors_simulator full_ros2_kafka_bridge +``` + +5. Listen to the Kafka topic (fulldata) to view sensor data being streamed: + +```bash +~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fulldata +``` + +## Build + +2. Build the ROS2 package: + ```bash + colcon build --symlink-install + ``` +Source the ROS2 workspace by adding it to your bash configuration: + +```bash +echo "source ~/railGuards/ros2/install/setup.bash" >> ~/.bashrc +``` +```bash +source ~/.bashrc +``` + +Run the sensor simulator: + +```bash +ros2 run sensors_simulator full_sensor_publisher +``` + +```bash +systemctl status kafka + +Launch the ROS2-Kafka bridge to send sensor data to Kafka: + +```bash +ros2 run sensors_simulator full_ros2_kafka_bridge +``` + +Listen to the Kafka topic (fulldata) to view sensor data being streamed: + +```bash +~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fulldata +``` + +## 4) Data Processing with Spark +Sensor data contains noise and requires processing. To process data using Apache Spark: + +1. Navigate to the data processing directory: +```bash +cd ~/railGuards/data_processing +``` + +2. Run the Spark job: +```bash +spark-submit --jars ~/spark_jars/ full_spark_processing.py +``` + +# 3. Verify that the cleaned data is being written to the Kafka topic (cleaned_sensor_data) and InfluxDB. + + +## 5) Visualization with Grafana +To visualize the processed data: + +Install and set up Grafana. +Connect Grafana to InfluxDB as a data source. +Import the provided Grafana dashboard JSON file or manually create dashboards. +Use InfluxDB queries to visualize: +Sensor metrics (e.g., temperature, pressure). +Maintenance flags and anomaly rates from the Kafka topic future_anomaly_predictions. + + + + + + + + + + + diff --git a/data_processing/full_spark_kafka_processing copy.py b/data_processing/full_spark_kafka_processing copy.py new file mode 100644 index 0000000..80415b9 --- /dev/null +++ b/data_processing/full_spark_kafka_processing copy.py @@ -0,0 +1,132 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import from_json, col, when +from pyspark.sql.types import StructType, FloatType, BooleanType, TimestampType +from influxdb import InfluxDBClient +from datetime import datetime +from pyspark.sql.functions import current_timestamp + +# InfluxDB 1.x configuration +INFLUXDB_HOST = "localhost" +INFLUXDB_PORT = 8086 +INFLUXDB_DATABASE = "sensor_data" # InfluxDB 1.x database name + +# Create InfluxDB client (no need for token and org in InfluxDB 1.x) +influx_client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT) +# Select the database +influx_client.switch_database(INFLUXDB_DATABASE) + +def write_to_influxdb(batch_df): + """ + Write cleaned data from Spark batch to InfluxDB 1.x. + :param batch_df: Spark DataFrame containing cleaned data from the batch. + """ + try: + # Convert each row to a format suitable for InfluxDB + data = [] + for row in batch_df.collect(): + # Check if timestamp is null + if row["timestamp"] is None: + print("Timestamp is None, skipping this row.") + continue + + # Ensure timestamp is in the correct format + # timestamp = int(row["timestamp"]) * 1000000000 # Convert to nanoseconds + timestamp = int(row["timestamp"].timestamp()) * 1000000000 + # Construct data in the InfluxDB 1.x format + data.append( + { + "measurement": "cleaned_sensor_data", # This is the measurement name + "tags": {"source": "spark"}, + "fields": { + "pressure": row["pressure"], + "oil_temperature": row["oil_temperature"], + "motor_current": row["motor_current"], + "air_temperature": row["air_temperature"], + "humidity": row["humidity"], + "vibration": row["vibration"], + "air_pressure": row["air_pressure"] + }, + "time": timestamp + } + ) + + # Write data to InfluxDB in batch + if data: + influx_client.write_points(data) + print(f"Data written to InfluxDB: {data}") + except Exception as e: + print(f"Failed to write to InfluxDB: {e}") + +# Define the schema for combined sensor data +schema = StructType() \ + .add("timestamp", TimestampType()) \ + .add("pressure", FloatType()) \ + .add("oil_temperature", FloatType()) \ + .add("motor_current", FloatType()) \ + .add("air_temperature", FloatType()) \ + .add("humidity", FloatType()) \ + .add("vibration", FloatType()) \ + .add("air_pressure", FloatType()) \ + .add("valve_intake", BooleanType()) \ + .add("valve_outlet", BooleanType()) \ + .add("compressor_status", BooleanType()) \ + .add("filter_status", BooleanType()) \ + .add("safety_switch", BooleanType()) \ + .add("overload_protection", BooleanType()) \ + .add("emergency_stop", BooleanType()) \ + .add("door_sensor", BooleanType()) + +def main(): + # Create SparkSession + spark = SparkSession.builder \ + .appName("Simple Sensor Data Cleaning") \ + .getOrCreate() + spark.sparkContext.setLogLevel("WARN") + + # Read data from Kafka + raw_data = spark.readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "localhost:9092") \ + .option("subscribe", "fulldata") \ + .option("startingOffsets", "earliest") \ + .load() + + # Parse the JSON data + parsed_data = raw_data.selectExpr("CAST(value AS STRING)") \ + .select(from_json(col("value"), schema).alias("data")) \ + .select("data.*") + + # Ensure the timestamp is not null, or replace it with the current timestamp + parsed_data = parsed_data.withColumn( + "timestamp", when(col("timestamp").isNull(), current_timestamp()).otherwise(col("timestamp")) + ) + + # Filter invalid data based on defined ranges + cleaned_data = parsed_data.filter( + (col("pressure").between(1.0, 10.0)) & + (col("oil_temperature").between(30.0, 90.0)) & + (col("motor_current").between(0.5, 5.0)) & + (col("air_temperature").between(15.0, 40.0)) & + (col("humidity").between(30.0, 90.0)) & + (col("vibration").between(0.1, 2.0)) & + (col("air_pressure").between(1.0, 10.0)) + ) + + # Write the cleaned data to InfluxDB using foreachBatch + cleaned_data.writeStream \ + .foreachBatch(lambda batch_df, batch_id: write_to_influxdb(batch_df)) \ + .start() + + # Write cleaned data to a new Kafka topic + query = cleaned_data.selectExpr("to_json(struct(*)) AS value") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "localhost:9092") \ + .option("topic", "cleaned_sensor_data") \ + .option("checkpointLocation", "/tmp/spark_simple_clean_checkpoint") \ + .start() + + query.awaitTermination() + +if __name__ == "__main__": + main() diff --git a/data_processing/full_spark_kafka_processing.py b/data_processing/full_spark_kafka_processing.py index 80415b9..d221823 100644 --- a/data_processing/full_spark_kafka_processing.py +++ b/data_processing/full_spark_kafka_processing.py @@ -1,85 +1,160 @@ from pyspark.sql import SparkSession -from pyspark.sql.functions import from_json, col, when -from pyspark.sql.types import StructType, FloatType, BooleanType, TimestampType +from pyspark.sql.functions import from_json, col, when, current_timestamp, lit +from pyspark.sql.types import StructType, FloatType, BooleanType, TimestampType , IntegerType from influxdb import InfluxDBClient from datetime import datetime -from pyspark.sql.functions import current_timestamp +from functools import reduce +from pyspark.sql.functions import col +from kafka import KafkaConsumer +import json + # InfluxDB 1.x configuration INFLUXDB_HOST = "localhost" -INFLUXDB_PORT = 8086 -INFLUXDB_DATABASE = "sensor_data" # InfluxDB 1.x database name +INFLUXDB_PORT = 8086 +INFLUXDB_DATABASE = "sensor_data" -# Create InfluxDB client (no need for token and org in InfluxDB 1.x) +# Create InfluxDB client influx_client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT) -# Select the database influx_client.switch_database(INFLUXDB_DATABASE) + + +# Kafka Consumer for anomaly data +anomaly_consumer = KafkaConsumer( + 'future_anomaly_predictions', + bootstrap_servers='localhost:9092', + value_deserializer=lambda m: json.loads(m.decode('utf-8')) +) + + +# Cache for maintenance_flag +anomaly_cache = {} + + +def fetch_anomalies(): + """Fetch anomalies from Kafka and cache them by timestamp.""" + for message in anomaly_consumer: + anomaly_data = message.value + timestamp = anomaly_data['timestamp'] + maintenance_flag = anomaly_data.get('maintenance_flag', 0) + anomaly_cache[timestamp] = maintenance_flag + + +# def write_to_influxdb(batch_df): +# """ +# Write cleaned data from Spark batch to InfluxDB 1.x. +# """ +# try: +# data = [] +# for row in batch_df.collect(): +# if row["timestamp"] is None: +# print("Timestamp is None, skipping this row.") +# continue + +# timestamp = int(row["timestamp"].timestamp()) * 1000000000 +# data.append({ +# "measurement": "cleaned_sensor_data", +# "tags": {"source": "spark"}, +# "fields": { +# "TP2": row["TP2"], +# "TP3": row["TP3"], +# "H1": row["H1"], +# "DV_pressure": row["DV_pressure"], +# "Reservoirs": row["Reservoirs"], +# "Oil_temperature": row["Oil_temperature"], +# "Motor_current": row["Motor_current"], +# "COMP": row["COMP"], +# "DV_eletric": row["DV_eletric"], +# "Towers": row["Towers"], +# "MPG": row["MPG"], +# "LPS": row["LPS"], +# "Pressure_switch": row["Pressure_switch"], +# "Oil_level": row["Oil_level"], +# "Caudal_impulses": row["Caudal_impulses"] +# }, +# "time": timestamp +# }) + +# if data: +# influx_client.write_points(data) +# print(f"Data written to InfluxDB: {data}") +# except Exception as e: +# print(f"Failed to write to InfluxDB: {e}") + + def write_to_influxdb(batch_df): """ - Write cleaned data from Spark batch to InfluxDB 1.x. - :param batch_df: Spark DataFrame containing cleaned data from the batch. + Write cleaned data along with anomalies to InfluxDB. """ try: - # Convert each row to a format suitable for InfluxDB data = [] for row in batch_df.collect(): - # Check if timestamp is null if row["timestamp"] is None: print("Timestamp is None, skipping this row.") continue - - # Ensure timestamp is in the correct format - # timestamp = int(row["timestamp"]) * 1000000000 # Convert to nanoseconds - timestamp = int(row["timestamp"].timestamp()) * 1000000000 - # Construct data in the InfluxDB 1.x format - data.append( - { - "measurement": "cleaned_sensor_data", # This is the measurement name - "tags": {"source": "spark"}, - "fields": { - "pressure": row["pressure"], - "oil_temperature": row["oil_temperature"], - "motor_current": row["motor_current"], - "air_temperature": row["air_temperature"], - "humidity": row["humidity"], - "vibration": row["vibration"], - "air_pressure": row["air_pressure"] - }, - "time": timestamp - } - ) - - # Write data to InfluxDB in batch + + timestamp = row["timestamp"] + maintenance_flag = anomaly_cache.pop(timestamp, 0) # Fetch and remove from cache + + # Convert timestamp to nanoseconds + timestamp_ns = int(timestamp.timestamp()) * 1000000000 + + data.append({ + "measurement": "cleaned_sensor_data", + "tags": {"source": "spark"}, + "fields": { + "TP2": row["TP2"], + "TP3": row["TP3"], + "H1": row["H1"], + "DV_pressure": row["DV_pressure"], + "Reservoirs": row["Reservoirs"], + "Oil_temperature": row["Oil_temperature"], + "Motor_current": row["Motor_current"], + "COMP": row["COMP"], + "DV_eletric": row["DV_eletric"], + "Towers": row["Towers"], + "MPG": row["MPG"], + "LPS": row["LPS"], + "Pressure_switch": row["Pressure_switch"], + "Oil_level": row["Oil_level"], + "Caudal_impulses": row["Caudal_impulses"], + "maintenance_flag": maintenance_flag + }, + "time": timestamp_ns + }) + if data: influx_client.write_points(data) print(f"Data written to InfluxDB: {data}") except Exception as e: print(f"Failed to write to InfluxDB: {e}") -# Define the schema for combined sensor data + +# Define schema for sensor data schema = StructType() \ .add("timestamp", TimestampType()) \ - .add("pressure", FloatType()) \ - .add("oil_temperature", FloatType()) \ - .add("motor_current", FloatType()) \ - .add("air_temperature", FloatType()) \ - .add("humidity", FloatType()) \ - .add("vibration", FloatType()) \ - .add("air_pressure", FloatType()) \ - .add("valve_intake", BooleanType()) \ - .add("valve_outlet", BooleanType()) \ - .add("compressor_status", BooleanType()) \ - .add("filter_status", BooleanType()) \ - .add("safety_switch", BooleanType()) \ - .add("overload_protection", BooleanType()) \ - .add("emergency_stop", BooleanType()) \ - .add("door_sensor", BooleanType()) + .add("TP2", FloatType()) \ + .add("TP3", FloatType()) \ + .add("H1", FloatType()) \ + .add("DV_pressure", FloatType()) \ + .add("Reservoirs", FloatType()) \ + .add("Oil_temperature", FloatType()) \ + .add("Motor_current", FloatType()) \ + .add("COMP", FloatType()) \ + .add("DV_eletric", FloatType()) \ + .add("Towers", FloatType()) \ + .add("MPG", FloatType()) \ + .add("LPS", FloatType()) \ + .add("Pressure_switch", FloatType()) \ + .add("Oil_level", FloatType()) \ + .add("Caudal_impulses", FloatType()) def main(): - # Create SparkSession + + spark = SparkSession.builder \ - .appName("Simple Sensor Data Cleaning") \ + .appName("Sensor Data Cleaning") \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") @@ -91,39 +166,112 @@ def main(): .option("startingOffsets", "earliest") \ .load() - # Parse the JSON data + # Parse JSON data parsed_data = raw_data.selectExpr("CAST(value AS STRING)") \ .select(from_json(col("value"), schema).alias("data")) \ .select("data.*") - # Ensure the timestamp is not null, or replace it with the current timestamp + + # Start thread to fetch anomalies + from threading import Thread + anomaly_thread = Thread(target=fetch_anomalies, daemon=True) + anomaly_thread.start() + + + # Ensure the timestamp is valid parsed_data = parsed_data.withColumn( "timestamp", when(col("timestamp").isNull(), current_timestamp()).otherwise(col("timestamp")) ) - - # Filter invalid data based on defined ranges - cleaned_data = parsed_data.filter( - (col("pressure").between(1.0, 10.0)) & - (col("oil_temperature").between(30.0, 90.0)) & - (col("motor_current").between(0.5, 5.0)) & - (col("air_temperature").between(15.0, 40.0)) & - (col("humidity").between(30.0, 90.0)) & - (col("vibration").between(0.1, 2.0)) & - (col("air_pressure").between(1.0, 10.0)) + + # Cast digital variables to integers + digital_columns = [ + "COMP", "DV_eletric", "Towers", "MPG", "LPS", + "Pressure_switch", "Oil_level", "Caudal_impulses" + ] + + for col_name in digital_columns: + parsed_data = parsed_data.withColumn(col_name, col(col_name).cast(IntegerType())) + + # Add maintenance_flag column initialized to 0 + parsed_data = parsed_data.withColumn("maintenance_flag", lit(0).cast(IntegerType())) + + min_values = { + "TP2": -0.032, + "TP3": 0.73, + "H1": -0.036, + "DV_pressure": -0.032, + "Reservoirs": 0.712, + "Oil_temperature": 15.4, + "Motor_current": 0.02, + "COMP": 0.0, + "DV_eletric": 0.0, + "Towers": 0.0, + "MPG": 0.0, + "LPS": 0.0, + "Pressure_switch": 0.0, + "Oil_level": 0.0, + "Caudal_impulses": 0.0 + } + + max_values = { + "TP2": 10.676, + "TP3": 10.302, + "H1": 10.288, + "DV_pressure": 9.844, + "Reservoirs": 10.3, + "Oil_temperature": 89.05, + "Motor_current": 9.295, + "COMP": 1.0, + "DV_eletric": 1.0, + "Towers": 1.0, + "MPG": 1.0, + "LPS": 1.0, + "Pressure_switch": 1.0, + "Oil_level": 1.0, + "Caudal_impulses": 1.0 + } + + # Build filter conditions dynamically and combine them using reduce + conditions = reduce( + lambda a, b: a & b, + [col(feature).between(min_values[feature], max_values[feature]) for feature in min_values.keys()] ) - # Write the cleaned data to InfluxDB using foreachBatch + # Apply the combined condition to filter the data + cleaned_data = parsed_data.filter(conditions) + + # # Filter invalid data + # cleaned_data = parsed_data.filter( + # (col("TP2").between(-10.0, 10.0)) & + # (col("TP3").between(0.0, 20.0)) & + # (col("H1").between(-5.0, 15.0)) & + # (col("DV_pressure").between(0.0, 5.0)) & + # (col("Reservoirs").between(10.0, 50.0)) & + # (col("Oil_temperature").between(20.0, 100.0)) & + # (col("Motor_current").between(0.1, 5.0)) & + # (col("COMP").between(0, 1)) & + # (col("DV_eletric").between(0, 1)) & + # (col("Towers").between(0, 1)) & + # (col("MPG").between(0, 1)) & + # (col("LPS").between(0, 1)) & + # (col("Pressure_switch").between(0, 1)) & + # (col("Oil_level").between(0, 1)) & + # (col("Caudal_impulses").between(0, 1)) + # ) + + + # Write to InfluxDB cleaned_data.writeStream \ .foreachBatch(lambda batch_df, batch_id: write_to_influxdb(batch_df)) \ .start() - - # Write cleaned data to a new Kafka topic + + # Write cleaned data to Kafka query = cleaned_data.selectExpr("to_json(struct(*)) AS value") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "cleaned_sensor_data") \ - .option("checkpointLocation", "/tmp/spark_simple_clean_checkpoint") \ + .option("checkpointLocation", "/tmp/spark_clean_checkpoint") \ .start() query.awaitTermination() diff --git a/grafana/grafana_dashboard.json b/grafana/grafana_dashboard.json new file mode 100644 index 0000000..0b391b8 --- /dev/null +++ b/grafana/grafana_dashboard.json @@ -0,0 +1,747 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "panels": [ + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 1 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 9, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "query": "SELECT \"maintenance_flag\" FROM \"cleaned_sensor_data\"", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series" + } + ], + "title": "maintenance flag", + "type": "stat" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "#EAB839", + "value": 60 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "celsius" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "query": "SELECT \"Oil_temperature\" FROM \"cleaned_sensor_data\"\n\n", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series" + } + ], + "title": "Oil Temperature", + "type": "stat" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 8, + "options": { + "basemap": { + "config": {}, + "name": "Layer 0", + "type": "osm-standard" + }, + "controls": { + "mouseWheelZoom": true, + "showAttribution": true, + "showDebug": false, + "showMeasure": false, + "showScale": false, + "showZoom": true + }, + "layers": [ + { + "config": { + "rules": [], + "src": "http://localhost:8080/route.geojson", + "style": { + "color": { + "fixed": "#8AB8FF" + }, + "opacity": 0.4, + "rotation": { + "fixed": 0, + "max": 360, + "min": -360, + "mode": "mod" + }, + "size": { + "fixed": 5, + "max": 15, + "min": 2 + }, + "symbol": { + "fixed": "img/icons/marker/circle.svg", + "mode": "fixed" + }, + "symbolAlign": { + "horizontal": "center", + "vertical": "center" + }, + "textConfig": { + "fontSize": 12, + "offsetX": 0, + "offsetY": 0, + "textAlign": "center", + "textBaseline": "middle" + } + } + }, + "location": { + "mode": "auto" + }, + "name": "Layer 1", + "tooltip": true, + "type": "geojson" + } + ], + "tooltip": { + "mode": "details" + }, + "view": { + "allLayers": false, + "id": "coords", + "lastOnly": true, + "lat": 33.967326, + "layer": "Layer 1", + "lon": -7.409163, + "zoom": 15 + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "refId": "A" + } + ], + "title": "Train Pose", + "type": "geomap" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "displayName": "Motor current", + "mappings": [], + "max": 15, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "#6ED0E0", + "value": 7 + }, + { + "color": "#EAB839", + "value": 12 + }, + { + "color": "red", + "value": 15 + } + ] + }, + "unit": "amp" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 1, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "hide": false, + "query": "SELECT \"Motor_current\" FROM \"cleaned_sensor_data\"\n\n", + "rawQuery": true, + "refId": "B", + "resultFormat": "time_series" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "hide": false, + "refId": "A" + } + ], + "title": "Motor current", + "type": "gauge" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "#EAB839", + "value": 3 + }, + { + "color": "red", + "value": 5 + } + ] + }, + "unit": "pressurebar" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "/.*/", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "query": "SELECT \"DV_pressure\" FROM \"cleaned_sensor_data\"\n\n\n\n", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series" + } + ], + "title": "DV Pressure", + "type": "stat" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "max": 11, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "#EAB839", + "value": 8 + }, + { + "color": "red", + "value": 11 + } + ] + }, + "unit": "pressurebar" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 6, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "query": "SELECT \"Reservoirs\" FROM \"cleaned_sensor_data\"\n\n", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series" + } + ], + "title": "Reservoirs", + "type": "gauge" + }, + { + "datasource": { + "uid": "ae5p8itirhszka" + }, + "description": "if LPS = 1 the pressure has droped", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 1 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 24 + }, + "id": 7, + "options": { + "displayMode": "basic", + "legend": { + "calcs": [ + "last" + ], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "maxVizHeight": 300, + "minVizHeight": 16, + "minVizWidth": 8, + "namePlacement": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "/^cleaned_sensor_data\\.LPS$/", + "limit": 5, + "values": true + }, + "showUnfilled": true, + "sizing": "auto", + "valueMode": "color" + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "query": "SELECT \"LPS\" FROM \"cleaned_sensor_data\"", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series" + } + ], + "title": "LPS", + "type": "bargauge" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "max": 11, + "min": -0.04, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "#EAB839", + "value": 10 + }, + { + "color": "red", + "value": 11 + } + ] + }, + "unit": "pressurebar" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 24 + }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "query": "SELECT \"DV_pressure\" FROM \"cleaned_sensor_data\"\n\n", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series" + } + ], + "title": "DV Pressure", + "type": "stat" + }, + { + "datasource": { + "type": "influxdb", + "uid": "ae5p8itirhszka" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": true, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 74, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 2, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "dashed" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "#EAB839", + "value": 4 + }, + { + "color": "red", + "value": 7 + } + ] + }, + "unit": "pressurebar" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 32 + }, + "id": 4, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [ + "last" + ], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "query": "SELECT \"TP3\" FROM \"cleaned_sensor_data\"\n\n", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series" + } + ], + "title": "Pneumatic panel TP3", + "type": "barchart" + } + ], + "preload": false, + "refresh": "5s", + "schemaVersion": 40, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "2024-12-03T11:22:22.852Z", + "to": "2024-12-03T13:21:49.147Z" + }, + "timepicker": {}, + "timezone": "browser", + "title": "saad", + "uid": "de5pdb2dcavwge", + "version": 62, + "weekStart": "" +} diff --git a/machine_learning/data_preprocessing/test.py b/machine_learning/data_preprocessing/test.py new file mode 100644 index 0000000..b12188d --- /dev/null +++ b/machine_learning/data_preprocessing/test.py @@ -0,0 +1,56 @@ +import joblib +import pandas as pd + +# Load the saved model and scaler +model_regression = joblib.load('/home/ubuntu/railGuard/machine_learning/data_preprocessing/logistic_regression_model.pkl') +scaler = joblib.load('/home/ubuntu/railGuard/machine_learning/data_preprocessing/scaler.pkl') + +# Define a function for prediction +def predict_new_data(new_data): + # Ensure the new data is a DataFrame with the correct columns + training_feature_names = [ + 'timestamp', 'TP2', 'TP3', 'H1', 'DV_pressure', 'Reservoirs', + 'Oil_temperature', 'Motor_current', 'COMP', 'DV_eletric', 'Towers', + 'MPG', 'LPS', 'Pressure_switch', 'Oil_level', 'Caudal_impulses' + ] + new_df = pd.DataFrame(new_data)[training_feature_names] + + # Scale the new data using the pre-fitted scaler + new_scaled = scaler.transform(new_df) + + # Predict the class and probability + y_pred = model_regression.predict(new_scaled) + y_pred_prob = model_regression.predict_proba(new_scaled)[:, 1] + + return y_pred, y_pred_prob + +# Example new data +new_sample = { + 'timestamp': [150], # Replace with realistic timestamp + 'TP2': [-0.018], + 'TP3': [8.248], + 'H1': [8.238], + 'DV_pressure': [-0.024], + 'Reservoirs': [8.248], + 'Oil_temperature': [49.450], + 'Motor_current': [0.0400], + 'COMP': [1.0], + 'DV_eletric': [0.0], + 'Towers': [1.0], + 'MPG': [1.0], + 'LPS': [0.0], + 'Pressure_switch': [1.0], + 'Oil_level': [1.0], + 'Caudal_impulses': [1.0] +} + +# Predict for the new sample +class_labels = {0: "Not Failure", 1: "Failure"} + +y_pred, y_pred_prob = predict_new_data(new_sample) + +y_pred_label = [class_labels[label] for label in y_pred] + +print(f"Prediction (Class): {y_pred_label}") +print(f"Probability of Failure: {y_pred_prob}") + diff --git a/src/sensors_simulator/sensors_simulator/__pycache__/full_ros2_kafka_bridge.cpython-310.pyc b/src/sensors_simulator/sensors_simulator/__pycache__/full_ros2_kafka_bridge.cpython-310.pyc index 9ef2585..e9b561e 100644 Binary files a/src/sensors_simulator/sensors_simulator/__pycache__/full_ros2_kafka_bridge.cpython-310.pyc and b/src/sensors_simulator/sensors_simulator/__pycache__/full_ros2_kafka_bridge.cpython-310.pyc differ diff --git a/src/sensors_simulator/sensors_simulator/__pycache__/full_sensor_publisher.cpython-310.pyc b/src/sensors_simulator/sensors_simulator/__pycache__/full_sensor_publisher.cpython-310.pyc index 0a4891a..04c3c7a 100644 Binary files a/src/sensors_simulator/sensors_simulator/__pycache__/full_sensor_publisher.cpython-310.pyc and b/src/sensors_simulator/sensors_simulator/__pycache__/full_sensor_publisher.cpython-310.pyc differ diff --git a/src/sensors_simulator/sensors_simulator/full_ros2_kafka_bridge copy.py b/src/sensors_simulator/sensors_simulator/full_ros2_kafka_bridge copy.py new file mode 100644 index 0000000..2b3ad10 --- /dev/null +++ b/src/sensors_simulator/sensors_simulator/full_ros2_kafka_bridge copy.py @@ -0,0 +1,104 @@ +import rclpy +from rclpy.node import Node +from std_msgs.msg import Float32, Bool +from kafka import KafkaProducer +import json +import time + + +class FullSensorROS2KafkaBridge(Node): + def __init__(self): + super().__init__('full_ros2_kafka_bridge') + + # Initialize Kafka producer + self.kafka_producer = KafkaProducer( + bootstrap_servers='localhost:9092', # Replace with your Kafka broker address + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + + # Initialize variables for all sensors + self.sensors = { + "pressure": None, + "oil_temperature": None, + "motor_current": None, + "air_temperature": None, + "humidity": None, + "vibration": None, + "air_pressure": None, + "valve_intake": None, + "valve_outlet": None, + "compressor_status": None, + "filter_status": None, + "safety_switch": None, + "overload_protection": None, + "emergency_stop": None, + "door_sensor": None + } + + # Topic-to-sensor mapping for analog sensors + self.analog_topics = { + 'pressure': 'pressure', + 'oil_temperature': 'oil_temperature', + 'motor_current': 'motor_current', + 'air_temperature': 'air_temperature', + 'humidity': 'humidity', + 'vibration': 'vibration', + 'air_pressure': 'air_pressure' + } + + # Topic-to-sensor mapping for digital sensors + self.digital_topics = { + 'valve_intake': 'valve_intake', + 'valve_outlet': 'valve_outlet', + 'compressor_status': 'compressor_status', + 'filter_status': 'filter_status', + 'safety_switch': 'safety_switch', + 'overload_protection': 'overload_protection', + 'emergency_stop': 'emergency_stop', + 'door_sensor': 'door_sensor' + } + + # Subscriptions for analog sensors + for topic, sensor in self.analog_topics.items(): + self.create_subscription(Float32, topic, lambda msg, s=sensor: self.analog_callback(msg, s), 10) + + # Subscriptions for digital sensors + for topic, sensor in self.digital_topics.items(): + self.create_subscription(Bool, topic, lambda msg, s=sensor: self.digital_callback(msg, s), 10) + + # Timer to send combined data to Kafka + self.timer = self.create_timer(1.0, self.send_combined_data) + + def analog_callback(self, msg, sensor_name): + self.sensors[sensor_name] = msg.data + self.get_logger().info(f"Updated {sensor_name}: {msg.data}") + + def digital_callback(self, msg, sensor_name): + self.sensors[sensor_name] = msg.data + self.get_logger().info(f"Updated {sensor_name}: {msg.data}") + + def send_combined_data(self): + # Combine all sensor data and send to Kafka + combined_data = { + "timestamp": time.time(), + **self.sensors + } + self.kafka_producer.send('fulldata', combined_data) + self.get_logger().info(f"Sent combined data to Kafka: {combined_data}") + + +def main(args=None): + rclpy.init(args=args) + node = FullSensorROS2KafkaBridge() + try: + rclpy.spin(node) + except KeyboardInterrupt: + node.get_logger().info("Shutting down node...") + finally: + node.destroy_node() + if rclpy.ok(): # Ensure shutdown is only called if rclpy is still active + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/src/sensors_simulator/sensors_simulator/full_ros2_kafka_bridge.py b/src/sensors_simulator/sensors_simulator/full_ros2_kafka_bridge.py index 2b3ad10..964dc57 100644 --- a/src/sensors_simulator/sensors_simulator/full_ros2_kafka_bridge.py +++ b/src/sensors_simulator/sensors_simulator/full_ros2_kafka_bridge.py @@ -1,6 +1,6 @@ import rclpy from rclpy.node import Node -from std_msgs.msg import Float32, Bool +from std_msgs.msg import Float32, Bool , Int32 from kafka import KafkaProducer import json import time @@ -11,51 +11,56 @@ def __init__(self): super().__init__('full_ros2_kafka_bridge') # Initialize Kafka producer - self.kafka_producer = KafkaProducer( - bootstrap_servers='localhost:9092', # Replace with your Kafka broker address - value_serializer=lambda v: json.dumps(v).encode('utf-8') - ) + try: + self.kafka_producer = KafkaProducer( + bootstrap_servers='localhost:9092', # Replace with your Kafka broker address + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + self.get_logger().info("Connected to Kafka broker successfully.") + except Exception as e: + self.get_logger().error(f"Failed to connect to Kafka broker: {e}") + raise e # Initialize variables for all sensors self.sensors = { - "pressure": None, - "oil_temperature": None, - "motor_current": None, - "air_temperature": None, - "humidity": None, - "vibration": None, - "air_pressure": None, - "valve_intake": None, - "valve_outlet": None, - "compressor_status": None, - "filter_status": None, - "safety_switch": None, - "overload_protection": None, - "emergency_stop": None, - "door_sensor": None + "TP2": None, + "TP3": None, + "H1": None, + "DV_pressure": None, + "Reservoirs": None, + "Oil_temperature": None, + "Motor_current": None, + "COMP": None, + "DV_eletric": None, + "Towers": None, + "MPG": None, + "LPS": None, + "Pressure_switch": None, + "Oil_level": None, + "Caudal_impulses": None } # Topic-to-sensor mapping for analog sensors self.analog_topics = { - 'pressure': 'pressure', - 'oil_temperature': 'oil_temperature', - 'motor_current': 'motor_current', - 'air_temperature': 'air_temperature', - 'humidity': 'humidity', - 'vibration': 'vibration', - 'air_pressure': 'air_pressure' + 'TP2': 'TP2', + 'TP3': 'TP3', + 'H1': 'H1', + 'DV_pressure': 'DV_pressure', + 'Reservoirs': 'Reservoirs', + 'Oil_temperature': 'Oil_temperature', + 'Motor_current': 'Motor_current' } # Topic-to-sensor mapping for digital sensors self.digital_topics = { - 'valve_intake': 'valve_intake', - 'valve_outlet': 'valve_outlet', - 'compressor_status': 'compressor_status', - 'filter_status': 'filter_status', - 'safety_switch': 'safety_switch', - 'overload_protection': 'overload_protection', - 'emergency_stop': 'emergency_stop', - 'door_sensor': 'door_sensor' + 'COMP': 'COMP', + 'DV_eletric': 'DV_eletric', + 'Towers': 'Towers', + 'MPG': 'MPG', + 'LPS': 'LPS', + 'Pressure_switch': 'Pressure_switch', + 'Oil_level': 'Oil_level', + 'Caudal_impulses': 'Caudal_impulses' } # Subscriptions for analog sensors @@ -64,7 +69,7 @@ def __init__(self): # Subscriptions for digital sensors for topic, sensor in self.digital_topics.items(): - self.create_subscription(Bool, topic, lambda msg, s=sensor: self.digital_callback(msg, s), 10) + self.create_subscription(Int32, topic, lambda msg, s=sensor: self.digital_callback(msg, s), 10) # Timer to send combined data to Kafka self.timer = self.create_timer(1.0, self.send_combined_data) @@ -83,8 +88,17 @@ def send_combined_data(self): "timestamp": time.time(), **self.sensors } - self.kafka_producer.send('fulldata', combined_data) - self.get_logger().info(f"Sent combined data to Kafka: {combined_data}") + + # Check for None values to ensure completeness + missing_sensors = [key for key, value in self.sensors.items() if value is None] + if missing_sensors: + self.get_logger().warning(f"Missing sensor data for: {missing_sensors}") + + try: + self.kafka_producer.send('fulldata', combined_data) + self.get_logger().info(f"Sent combined data to Kafka: {combined_data}") + except Exception as e: + self.get_logger().error(f"Failed to send data to Kafka: {e}") def main(args=None): diff --git a/src/sensors_simulator/sensors_simulator/full_sensor_publisher.py b/src/sensors_simulator/sensors_simulator/full_sensor_publisher.py index 4436659..dfa0281 100644 --- a/src/sensors_simulator/sensors_simulator/full_sensor_publisher.py +++ b/src/sensors_simulator/sensors_simulator/full_sensor_publisher.py @@ -1,6 +1,6 @@ import random import time -from std_msgs.msg import Float32, Bool +from std_msgs.msg import Float32, Bool , Int32 import rclpy from rclpy.node import Node @@ -8,71 +8,99 @@ class FullSensorPublisher(Node): def __init__(self): super().__init__('full_sensor_publisher') - # Analog sensor publishers (7) - self.pressure_pub = self.create_publisher(Float32, 'pressure', 10) - self.oil_temp_pub = self.create_publisher(Float32, 'oil_temperature', 10) - self.motor_current_pub = self.create_publisher(Float32, 'motor_current', 10) - self.air_temp_pub = self.create_publisher(Float32, 'air_temperature', 10) - self.humidity_pub = self.create_publisher(Float32, 'humidity', 10) - self.vibration_pub = self.create_publisher(Float32, 'vibration', 10) - self.air_pressure_pub = self.create_publisher(Float32, 'air_pressure', 10) + # Analog sensor publishers (7) renamed to match dataset features + self.tp2_pub = self.create_publisher(Float32, 'TP2', 10) + self.tp3_pub = self.create_publisher(Float32, 'TP3', 10) + self.h1_pub = self.create_publisher(Float32, 'H1', 10) + self.dv_pressure_pub = self.create_publisher(Float32, 'DV_pressure', 10) + self.reservoirs_pub = self.create_publisher(Float32, 'Reservoirs', 10) + self.oil_temperature_pub = self.create_publisher(Float32, 'Oil_temperature', 10) + self.motor_current_pub = self.create_publisher(Float32, 'Motor_current', 10) - # Digital sensor publishers (8) - self.valve_intake_pub = self.create_publisher(Bool, 'valve_intake', 10) - self.valve_outlet_pub = self.create_publisher(Bool, 'valve_outlet', 10) - self.compressor_status_pub = self.create_publisher(Bool, 'compressor_status', 10) - self.filter_status_pub = self.create_publisher(Bool, 'filter_status', 10) - self.safety_switch_pub = self.create_publisher(Bool, 'safety_switch', 10) - self.overload_protection_pub = self.create_publisher(Bool, 'overload_protection', 10) - self.emergency_stop_pub = self.create_publisher(Bool, 'emergency_stop', 10) - self.door_sensor_pub = self.create_publisher(Bool, 'door_sensor', 10) + # Digital sensor publishers (8) renamed to match dataset features + self.comp_pub = self.create_publisher(Int32, 'COMP', 10) + self.dv_eletric_pub = self.create_publisher(Int32, 'DV_eletric', 10) + self.towers_pub = self.create_publisher(Int32, 'Towers', 10) + self.mpg_pub = self.create_publisher(Int32, 'MPG', 10) + self.lps_pub = self.create_publisher(Int32, 'LPS', 10) + self.pressure_switch_pub = self.create_publisher(Int32, 'Pressure_switch', 10) + self.oil_level_pub = self.create_publisher(Int32, 'Oil_level', 10) + self.caudal_impulses_pub = self.create_publisher(Int32, 'Caudal_impulses', 10) # Timer for publishing data self.timer = self.create_timer(1.0, self.publish_data) # Publish at 1 Hz - def inject_fault(self, value, min_val, max_val, fault_probability=0.2): - """Inject a fault with the given probability.""" - if random.random() < fault_probability: # Lower probability for faults - return random.uniform(min_val - 10, min_val - 1) # Faulty value + def inject_fault(self, value, min_val, max_val, fault_probability=0.1): + """ + Inject a fault with the given probability by generating out-of-range values. + """ + if random.random() < fault_probability: # Introduce fault with the specified probability + # Generate an out-of-range value + if random.choice([True, False]): # Randomly decide whether it's below or above the range + return random.uniform(min_val - 10, min_val - 1) # Below range + else: + return random.uniform(max_val + 1, max_val + 10) # Above range return value def publish_data(self): - # Generate and publish analog sensor data with fault injection - pressure = self.inject_fault(random.uniform(1.0, 10.0), 1.0, 10.0) - oil_temp = self.inject_fault(random.uniform(30.0, 90.0), 30.0, 90.0) - motor_current = self.inject_fault(random.uniform(0.5, 5.0), 0.5, 5.0) - air_temp = self.inject_fault(random.uniform(15.0, 40.0), 15.0, 40.0) - humidity = self.inject_fault(random.uniform(30.0, 90.0), 30.0, 90.0) - vibration = self.inject_fault(random.uniform(0.1, 2.0), 0.1, 2.0) - air_pressure = self.inject_fault(random.uniform(1.0, 10.0), 1.0, 10.0) - self.pressure_pub.publish(Float32(data=pressure)) - self.oil_temp_pub.publish(Float32(data=oil_temp)) + # Min and max ranges for each sensor + ranges = { + "TP2": (-0.032, 10.676), + "TP3": (0.73, 10.302), + "H1": (-0.036, 10.288), + "DV_pressure": (-0.032, 9.844), + "Reservoirs": (0.712, 10.3), + "Oil_temperature": (15.4, 89.05), + "Motor_current": (0.02, 9.295) + } + + def generate_value(min_val, max_val, fault_probability=0.1): + """Generate a value within range, occasionally injecting faults.""" + if random.random() < fault_probability: # Inject fault + # Generate an out-of-range value + if random.choice([True, False]): # Below or above the range + return random.uniform(min_val - 10, min_val - 1) + else: + return random.uniform(max_val + 1, max_val + 10) + return random.uniform(min_val, max_val) # Within range + + # Generate values for each sensor + tp2 = generate_value(*ranges["TP2"]) + tp3 = generate_value(*ranges["TP3"]) + h1 = generate_value(*ranges["H1"]) + dv_pressure = generate_value(*ranges["DV_pressure"]) + reservoirs = generate_value(*ranges["Reservoirs"]) + oil_temperature = generate_value(*ranges["Oil_temperature"]) + motor_current = generate_value(*ranges["Motor_current"]) + + self.tp2_pub.publish(Float32(data=tp2)) + self.tp3_pub.publish(Float32(data=tp3)) + self.h1_pub.publish(Float32(data=h1)) + self.dv_pressure_pub.publish(Float32(data=dv_pressure)) + self.reservoirs_pub.publish(Float32(data=reservoirs)) + self.oil_temperature_pub.publish(Float32(data=oil_temperature)) self.motor_current_pub.publish(Float32(data=motor_current)) - self.air_temp_pub.publish(Float32(data=air_temp)) - self.humidity_pub.publish(Float32(data=humidity)) - self.vibration_pub.publish(Float32(data=vibration)) - self.air_pressure_pub.publish(Float32(data=air_pressure)) - # Generate and publish digital sensor data - self.valve_intake_pub.publish(Bool(data=random.choice([True, False]))) - self.valve_outlet_pub.publish(Bool(data=random.choice([True, False]))) - self.compressor_status_pub.publish(Bool(data=random.choice([True, False]))) - self.filter_status_pub.publish(Bool(data=random.choice([True, False]))) - self.safety_switch_pub.publish(Bool(data=random.choice([True, False]))) - self.overload_protection_pub.publish(Bool(data=random.choice([True, False]))) - self.emergency_stop_pub.publish(Bool(data=random.choice([True, False]))) - self.door_sensor_pub.publish(Bool(data=random.choice([True, False]))) + # Generate and publish digital sensor data (0 for False, 1 for True) + self.comp_pub.publish(Int32(data=int(random.choice([0, 1])))) + self.dv_eletric_pub.publish(Int32(data=int(random.choice([0, 1])))) + self.towers_pub.publish(Int32(data=int(random.choice([0, 1])))) + self.mpg_pub.publish(Int32(data=int(random.choice([0, 1])))) + self.lps_pub.publish(Int32(data=int(random.choice([0, 1])))) + self.pressure_switch_pub.publish(Int32(data=int(random.choice([0, 1])))) + self.oil_level_pub.publish(Int32(data=int(random.choice([0, 1])))) + self.caudal_impulses_pub.publish(Int32(data=int(random.choice([0, 1])))) # Log published data with fault status fault_injected = any([ - pressure < 1.0 or pressure > 10.0, - oil_temp < 30.0 or oil_temp > 90.0, - motor_current < 0.5 or motor_current > 5.0, - air_temp < 15.0 or air_temp > 40.0, - humidity < 30.0 or humidity > 90.0, - vibration < 0.1 or vibration > 2.0, - air_pressure < 1.0 or air_pressure > 10.0, + not (ranges["TP2"][0] <= tp2 <= ranges["TP2"][1]), + not (ranges["TP3"][0] <= tp3 <= ranges["TP3"][1]), + not (ranges["H1"][0] <= h1 <= ranges["H1"][1]), + not (ranges["DV_pressure"][0] <= dv_pressure <= ranges["DV_pressure"][1]), + not (ranges["Reservoirs"][0] <= reservoirs <= ranges["Reservoirs"][1]), + not (ranges["Oil_temperature"][0] <= oil_temperature <= ranges["Oil_temperature"][1]), + not (ranges["Motor_current"][0] <= motor_current <= ranges["Motor_current"][1]), ]) self.get_logger().info(f"Published sensor data. Fault injected: {fault_injected}") @@ -83,4 +111,4 @@ def main(args=None): rclpy.shutdown() if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/src/sensors_simulator/sensors_simulator/full_sensor_publisher_copy.py b/src/sensors_simulator/sensors_simulator/full_sensor_publisher_copy.py new file mode 100644 index 0000000..c5ad59d --- /dev/null +++ b/src/sensors_simulator/sensors_simulator/full_sensor_publisher_copy.py @@ -0,0 +1,86 @@ +import random +import time +from std_msgs.msg import Float32, Bool +import rclpy +from rclpy.node import Node + +class FullSensorPublisher(Node): + def __init__(self): + super().__init__('full_sensor_publisher_copy') + + # Analog sensor publishers (7) + self.pressure_pub = self.create_publisher(Float32, 'pressure', 10) + self.oil_temp_pub = self.create_publisher(Float32, 'oil_temperature', 10) + self.motor_current_pub = self.create_publisher(Float32, 'motor_current', 10) + self.air_temp_pub = self.create_publisher(Float32, 'air_temperature', 10) + self.humidity_pub = self.create_publisher(Float32, 'humidity', 10) + self.vibration_pub = self.create_publisher(Float32, 'vibration', 10) + self.air_pressure_pub = self.create_publisher(Float32, 'air_pressure', 10) + + # Digital sensor publishers (8) + self.valve_intake_pub = self.create_publisher(Bool, 'valve_intake', 10) + self.valve_outlet_pub = self.create_publisher(Bool, 'valve_outlet', 10) + self.compressor_status_pub = self.create_publisher(Bool, 'compressor_status', 10) + self.filter_status_pub = self.create_publisher(Bool, 'filter_status', 10) + self.safety_switch_pub = self.create_publisher(Bool, 'safety_switch', 10) + self.overload_protection_pub = self.create_publisher(Bool, 'overload_protection', 10) + self.emergency_stop_pub = self.create_publisher(Bool, 'emergency_stop', 10) + self.door_sensor_pub = self.create_publisher(Bool, 'door_sensor', 10) + + # Timer for publishing data + self.timer = self.create_timer(1.0, self.publish_data) # Publish at 1 Hz + + def inject_fault(self, value, min_val, max_val, fault_probability=0.2): + """Inject a fault with the given probability.""" + if random.random() < fault_probability: # Lower probability for faults + return random.uniform(min_val - 10, min_val - 1) # Faulty value + return value + + def publish_data(self): + # Generate and publish analog sensor data with fault injection + pressure = self.inject_fault(random.uniform(1.0, 10.0), 1.0, 10.0) + oil_temp = self.inject_fault(random.uniform(30.0, 90.0), 30.0, 90.0) + motor_current = self.inject_fault(random.uniform(0.5, 5.0), 0.5, 5.0) + air_temp = self.inject_fault(random.uniform(15.0, 40.0), 15.0, 40.0) + humidity = self.inject_fault(random.uniform(30.0, 90.0), 30.0, 90.0) + vibration = self.inject_fault(random.uniform(0.1, 2.0), 0.1, 2.0) + air_pressure = self.inject_fault(random.uniform(1.0, 10.0), 1.0, 10.0) + + self.pressure_pub.publish(Float32(data=pressure)) + self.oil_temp_pub.publish(Float32(data=oil_temp)) + self.motor_current_pub.publish(Float32(data=motor_current)) + self.air_temp_pub.publish(Float32(data=air_temp)) + self.humidity_pub.publish(Float32(data=humidity)) + self.vibration_pub.publish(Float32(data=vibration)) + self.air_pressure_pub.publish(Float32(data=air_pressure)) + + # Generate and publish digital sensor data + self.valve_intake_pub.publish(Bool(data=random.choice([True, False]))) + self.valve_outlet_pub.publish(Bool(data=random.choice([True, False]))) + self.compressor_status_pub.publish(Bool(data=random.choice([True, False]))) + self.filter_status_pub.publish(Bool(data=random.choice([True, False]))) + self.safety_switch_pub.publish(Bool(data=random.choice([True, False]))) + self.overload_protection_pub.publish(Bool(data=random.choice([True, False]))) + self.emergency_stop_pub.publish(Bool(data=random.choice([True, False]))) + self.door_sensor_pub.publish(Bool(data=random.choice([True, False]))) + + # Log published data with fault status + fault_injected = any([ + pressure < 1.0 or pressure > 10.0, + oil_temp < 30.0 or oil_temp > 90.0, + motor_current < 0.5 or motor_current > 5.0, + air_temp < 15.0 or air_temp > 40.0, + humidity < 30.0 or humidity > 90.0, + vibration < 0.1 or vibration > 2.0, + air_pressure < 1.0 or air_pressure > 10.0, + ]) + self.get_logger().info(f"Published sensor data. Fault injected: {fault_injected}") + +def main(args=None): + rclpy.init(args=args) + node = FullSensorPublisher() + rclpy.spin(node) + rclpy.shutdown() + +if __name__ == '__main__': + main() diff --git a/visualization/dashbord.py b/visualization/dashbord.py index e69de29..ce88593 100644 --- a/visualization/dashbord.py +++ b/visualization/dashbord.py @@ -0,0 +1,53 @@ +from influxdb_client import InfluxDBClient, Point, WriteOptions +from datetime import datetime, timezone +import json + +# InfluxDB configuration (for InfluxDB 2.x) +INFLUXDB_URL = "http://localhost:8086" +INFLUXDB_TOKEN = "your_influxdb_token" # Replace with your token +INFLUXDB_ORG = "your_org" # Replace with your org name +INFLUXDB_BUCKET = "smart_sensors" # Replace with your InfluxDB bucket name + +# Create InfluxDB client +influx_client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) +write_api = influx_client.write_api(write_options=WriteOptions(batch_size=1)) + +def write_to_influxdb(row): + """ + Write cleaned data from Spark to InfluxDB. + :param row: Row of cleaned data from Spark DataFrame. + """ + try: + # Validate that timestamp is not None + if row["timestamp"] is None: + raise ValueError("Invalid timestamp: None") + + # Ensure timestamp is in the correct format + timestamp = datetime.fromtimestamp(row["timestamp"], timezone.utc).isoformat() + + # Construct data in the InfluxDB 2.x format + data = [ + { + "measurement": "cleaned_sensor_data", + "tags": { + "source": "spark" + }, + "fields": { + "pressure": row["pressure"], + "oil_temperature": row["oil_temperature"], + "motor_current": row["motor_current"], + "air_temperature": row["air_temperature"], + "humidity": row["humidity"], + "vibration": row["vibration"], + "air_pressure": row["air_pressure"] + }, + "time": timestamp + } + ] + + # Write data to InfluxDB + write_api.write(bucket=INFLUXDB_BUCKET, record=data) + print(f"Data written to InfluxDB: {data}") + except Exception as e: + print(f"Failed to write to InfluxDB: {e}") +