-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetadata_generator.py
165 lines (131 loc) · 4.57 KB
/
metadata_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""Metadata generator module."""
import argparse
import hashlib
import json
import os
from datetime import datetime
from mcap.reader import make_reader
DEFAULT_PATH = '/recorded_datasets/edinburgh'
def read_mcap_file(mcap_file_path):
"""
Read an MCAP file and extract metadata.
Parameters:
mcap_file_path (str): The path to the MCAP file.
Returns:
dict: A dictionary containing information.
"""
topic_message_counts = {}
topic_message_types = {}
start_time = None
end_time = None
with open(mcap_file_path, 'rb') as f:
reader = make_reader(f)
for schema, channel, message in reader.iter_messages():
if start_time is None:
start_time = message.log_time
end_time = message.log_time
topic = channel.topic
if topic not in topic_message_counts:
topic_message_counts[topic] = 0
topic_message_types[topic] = (
schema.name
) # Store the message type
topic_message_counts[topic] += 1
duration = end_time - start_time
duration_seconds = duration / 1e9 # Convert nanoseconds to seconds
result = {
'duration': f'{duration_seconds: .0f}s',
'topics': topic_message_counts,
'types': topic_message_types,
}
return result
def get_file_size(file_path):
"""
Get the size of a file.
Parameters:
file_path (str): The path to the file.
Returns:
int: The size of the file in bytes.
"""
return os.path.getsize(file_path)
def get_file_hash(file_path):
"""
Get the hash of a file.
Parameters:
file_path (str): The path to the file.
Returns:
str: The MD5 hash of the file.
"""
hash_func = hashlib.md5()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hash_func.update(chunk)
return hash_func.hexdigest()
def generate_metadata(file_path, root_dir):
"""
Generate metadata for an MCAP file.
Parameters:
file_path (str): The path to the MCAP file.
root_dir (str): The root directory for relative path calculation.
Returns:
tuple: A tuple containing the relative path and the metadata dictionary.
"""
relative_path = os.path.relpath(file_path, root_dir)
mcap_info = read_mcap_file(file_path)
metadata = {
'name': os.path.basename(file_path),
'resource:identifier': os.path.splitext(os.path.basename(file_path))[
0
],
'resource:description': 'Rosbag MCAP log file',
'resource:format': 'MCAP',
'resource:licence': 'cc-by-4.0',
'resource:size': get_file_size(file_path),
'resource:hash': get_file_hash(file_path),
'resource:issued': datetime.now().strftime('%Y-%m-%d'),
'resource:modified': datetime.fromtimestamp(
os.path.getmtime(file_path)
).strftime('%Y-%m-%d'),
'duration': mcap_info['duration'],
'topics': mcap_info['topics'],
'types': mcap_info['types'], # Include the message types
}
return relative_path, metadata
def create_resources_json(directory):
"""
Create a JSON file containing metadata for all MCAP files in a directory.
Parameters:
directory (str): The directory to search for MCAP files.
"""
resources = {
'name': 'dataset',
'resource:identifier': 'Autonomous driving dataset',
'resource:description': 'description of the dataset',
'resource:licence': 'cc-by-4.0',
'resource:format': 'MCAP',
}
for root, _, files in os.walk(directory):
for file in files:
if file.endswith('.mcap'):
file_path = os.path.join(root, file)
relative_path, metadata = generate_metadata(
file_path, directory
)
resources[relative_path] = metadata
with open(os.path.join(directory, 'resources.json'), 'w') as json_file:
json.dump(resources, json_file, indent=4)
def main():
"""Parse arguments and generate metadata."""
parser = argparse.ArgumentParser(
description='Generate metadata for Rosbag MCAP files.'
)
parser.add_argument(
'-p',
type=str,
default=DEFAULT_PATH,
help='Path to the directory containing MCAP files',
)
args = parser.parse_args()
create_resources_json(args.p)
if __name__ == '__main__':
main()