Skip to content

Commit

Permalink
Setting up Main for Command Line Version
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiran Jojare committed Jul 19, 2024
1 parent f25e394 commit 2741588
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 25 deletions.
12 changes: 5 additions & 7 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import cantools
import sys
import os
import matplotlib.pyplot as plt
import argparse

Expand Down Expand Up @@ -98,7 +99,7 @@ def plot_signals(parsed_data, signal_name, start_time, end_time):
print(f"No data found for signal: {signal_name}")
return

plt.figure(figsize=(30, 15)) # Adjust the size as needed
plt.figure(figsize=(15, 5)) # Adjust the size as needed
plt.plot(timestamps, values)
plt.xlabel('Time (s)')
plt.ylabel('Value')
Expand All @@ -120,12 +121,9 @@ def plot_signals(parsed_data, signal_name, start_time, end_time):

args = parser.parse_args()

if args.mode == "test":
dbc_file = "C:\\Github\\CAN-Log-Parser\\test\\data\\test.dbc"
log_file = "C:\\Github\\CAN-Log-Parser\\test\\data\\test_log.txt"
elif args.mode == "main":
dbc_file = "C:\\Github\\CAN-Log-Parser\\data\\1200G_CAN-DBC_v01.01.00.dbc"
log_file = "C:\\Github\\CAN-Log-Parser\\data\\DMW_Message_Timeout_CAN_Log.txt"
script_dir = os.path.dirname(os.path.abspath(__file__))
dbc_file = os.path.join(script_dir, "..", "data", "test.dbc")
log_file = os.path.join(script_dir, "..", "data", "test_log.txt")

db = parse_dbc(dbc_file)
if db:
Expand Down
96 changes: 78 additions & 18 deletions test/test_signal_plotting.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,43 @@
"""
/**
* @file main.py
* @brief Script to parse CAN DBC files and log files, and plot specific CAN signal data over time.
*
* This script is designed to parse a DBC file to understand the structure of CAN messages and a log file
* to extract actual CAN message data. It then plots the specified signal data over time.
*
* The script uses the `cantools` library to parse the DBC file and understand the CAN message structure.
* The log file is parsed to extract individual CAN messages and their signals, which are then plotted using `matplotlib`.
*
*
* The script is designed to work with relative paths, making it easy to clone the repository and run the script without modifying paths.
*
* Author: Kiran Jojare
*
* Usage:
* python main.py <test|main> <signal_name> [start_time] [end_time]
*
* Arguments:
* mode: Mode to run the script in ('test' or 'main')
* signal: Signal name to plot
* start: (Optional) Start time for the plot
* end: (Optional) End time for the plot
*/
"""

import cantools
import os
import sys
import matplotlib.pyplot as plt
import argparse

def parse_dbc(file_path):
"""
@brief Parse the DBC file to get CAN message definitions.
@param file_path Path to the DBC file.
@return Parsed DBC database object.
"""
try:
db = cantools.database.load_file(file_path)
print(f"Successfully parsed DBC file: {file_path}")
Expand All @@ -13,21 +47,28 @@ def parse_dbc(file_path):
return None

def parse_log(db, log_file_path):
"""
@brief Parse the log file to extract CAN messages.
@param db Parsed DBC database object.
@param log_file_path Path to the log file.
@return List of parsed CAN messages.
"""
try:
parsed_data = []
with open(log_file_path, 'r') as log_file:
current_message = None
for line in log_file:
# print(f"Reading line: {line.strip()}") # Commented for clean output
# print(f"Reading line: {line.strip()}") # Debug print
if line.startswith("CAN"):
if current_message:
process_message(db, current_message, parsed_data)
current_message = [line.strip()]
# print("Detected start of a new message") # Commented for clean output
# print("Detected start of a new message") # Debug print
elif line.strip().startswith("->"):
# print("Detected start of line with ->")
# print("Detected start of line with ->") # Debug print
current_message.append(line.strip())
# print(f"Appending signal line: {line.strip()}") # Commented for clean output
# print(f"Appending signal line: {line.strip()}") # Debug print
if current_message:
process_message(db, current_message, parsed_data)
print_parsed_data(parsed_data)
Expand All @@ -37,21 +78,26 @@ def parse_log(db, log_file_path):
return None

def process_message(db, message_lines, parsed_data):
"""
@brief Process a single CAN message from the log file.
@param db Parsed DBC database object.
@param message_lines List of lines representing a single CAN message.
@param parsed_data List to append the parsed message data.
"""
try:
# print(f"Processing message lines: {message_lines}") # Commented for clean output
# print(f"Processing message lines: {message_lines}") # Debug print
main_line = message_lines[0].split()
# print(f"Main line parts: {main_line}") # Commented for clean output
# print(f"Main line parts: {main_line}") # Debug print
can_id = main_line[2]
# if can_id in ["00000000", "0000040A"]:
# return
ecu_name, message_name = main_line[6].split('.')
timestamp = main_line[7]
direction = main_line[8]

signals = []

for signal_line in message_lines[1:]:
# print("Signal Line ---- ", signal_line)
# print("Signal Line ---- ", signal_line) # Debug print
parts = signal_line.split()
signal_name = parts[1]
signal_value = parts[2]
Expand All @@ -66,18 +112,30 @@ def process_message(db, message_lines, parsed_data):
})

except Exception as e:
print(f"Error processing message: {e}, for CAN ID (Error Frames): {can_id}")
print(f"Error processing message: {e}, for CAN ID: {can_id}")
return

def print_parsed_data(parsed_data):
"""
@brief Print parsed data for debugging purposes.
@param parsed_data List of parsed CAN messages.
"""
for data in parsed_data:
#print(f"Message: {data['message_name']}, CAN ID: {data['can_id']}, Timestamp: {data['timestamp']}, Direction: {data['direction']}")
for signal in data['signals']:
signal_name, signal_value, signal_timestamp = signal
#print(f" Signal: {signal_name}, Value: {signal_value}, Timestamp: {signal_timestamp}")


def plot_signals(parsed_data, signal_name, start_time, end_time):
"""
@brief Plot the signal data over time.
@param parsed_data List of parsed CAN messages.
@param signal_name Name of the signal to plot.
@param start_time Start time for the plot.
@param end_time End time for the plot.
"""
timestamps = []
values = []
for data in parsed_data:
Expand Down Expand Up @@ -120,26 +178,28 @@ def plot_signals(parsed_data, signal_name, start_time, end_time):

args = parser.parse_args()

if args.mode == "test":
dbc_file = "C:\\Github\\CAN-Log-Parser\\test\\data\\test.dbc"
log_file = "C:\\Github\\CAN-Log-Parser\\test\\data\\test_log.txt"
elif args.mode == "main":
dbc_file = "C:\\Github\\CAN-Log-Parser\\data\\1200G_CAN-DBC_v01.01.00.dbc"
log_file = "C:\\Github\\CAN-Log-Parser\\data\\DMW_Message_Timeout_CAN_Log.txt"
# Determine the paths to the DBC and log files based on the mode
script_dir = os.path.dirname(os.path.abspath(__file__))
dbc_file = os.path.join(script_dir, "..", "data", "test.dbc")
log_file = os.path.join(script_dir, "..", "data", "test_log.txt")

# Parse the DBC file
db = parse_dbc(dbc_file)
if db:
# Parse the log file
parsed_data = parse_log(db, log_file)
if parsed_data:
# Determine start and end times for the plot if not provided
if args.start is None or args.end is None:
all_timestamps = [float(data['timestamp']) / 1000 for data in parsed_data for signal in data['signals']]
start_time = min(all_timestamps) if args.start is None else args.start
end_time = max(all_timestamps) if args.end is None else args.end
else:
start_time = args.start
end_time = args.end
# Plot the signals
plot_signals(parsed_data, args.signal, start_time, end_time)
else:
print("Parsed data is empty or None.")
else:
print("DBC parsing failed.")
print("DBC parsing failed.")

0 comments on commit 2741588

Please sign in to comment.