-
Notifications
You must be signed in to change notification settings - Fork 0
/
LogScanner.py
181 lines (151 loc) · 8.18 KB
/
LogScanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import re
import json
import matplotlib.pyplot as plt
from collections import defaultdict
from typing import Dict
from colorama import init, Fore, Style
from tqdm import tqdm
import concurrent.futures
init(autoreset=True)
print(f"""{Fore.RED}
██╗ ██████╗ ██████╗ ███████╗ ██████╗ █████╗ ███╗ ██╗███╗ ██╗███████╗██████╗
██║ ██╔═══██╗██╔════╝ ██╔════╝██╔════╝██╔══██╗████╗ ██║████╗ ██║██╔════╝██╔══██╗
██║ ██║ ██║██║ ███╗███████╗██║ ███████║██╔██╗ ██║██╔██╗ ██║█████╗ ██████╔╝
██║ ██║ ██║██║ ██║╚════██║██║ ██╔══██║██║╚██╗██║██║╚██╗██║██╔══╝ ██╔══██╗
███████╗╚██████╔╝╚██████╔╝███████║╚██████╗██║ ██║██║ ╚████║██║ ╚████║███████╗██║ ██║
╚══════╝ ╚═════╝ ╚═════╝ ╚══════╝ ╚═════╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝ ╚═══╝╚══════╝╚═╝ ╚═╝
{Fore.CYAN}By Lolemploi5{Style.RESET_ALL}
""" + Style.RESET_ALL)
class LogScanner:
def __init__(self, log_directory: str, config_file: str, solutions_file: str):
self.log_directory = log_directory
self.patterns = {}
self.severity_levels = defaultdict(int)
self.solutions = {}
self.lines_by_severity = defaultdict(list)
self.suggestions = defaultdict(list)
self.files_stats = defaultdict(lambda: defaultdict(int))
self.load_config(config_file)
self.load_solutions(solutions_file)
def load_config(self, config_file: str):
try:
with open(config_file, 'r') as file:
config = json.load(file)
for name, pattern in config.get("patterns", {}).items():
self.add_pattern(name, pattern)
except Exception as e:
print(f"{Fore.RED}Error loading configuration file {config_file}: {e}{Style.RESET_ALL}")
def load_solutions(self, solutions_file: str):
try:
with open(solutions_file, 'r') as file:
self.solutions = json.load(file)
except Exception as e:
print(f"{Fore.RED}Error loading solutions file {solutions_file}: {e}{Style.RESET_ALL}")
def add_pattern(self, name: str, pattern: str):
self.patterns[name] = re.compile(pattern)
def scan_logs(self, files):
with concurrent.futures.ThreadPoolExecutor() as executor:
list(tqdm(executor.map(self._process_file, files), total=len(files), desc="Scanning files"))
def _process_file(self, file_path: str):
try:
with open(file_path, 'r') as file:
lines = file.readlines()
for line in tqdm(lines, desc=f"Processing {os.path.basename(file_path)}", unit="line"):
self._process_line(line, os.path.basename(file_path))
except Exception as e:
print(f"{Fore.RED}Error reading {file_path}: {e}{Style.RESET_ALL}")
def _process_line(self, line: str, file_name: str):
for name, pattern in self.patterns.items():
if pattern.search(line):
self.severity_levels[name] += 1
self.lines_by_severity[name].append(line.strip())
self.files_stats[file_name][name] += 1
self._suggest_solution(name, line)
def _suggest_solution(self, severity: str, line: str):
for error_type, solutions in self.solutions.get(severity, {}).items():
if error_type in line:
self.suggestions[severity].append((error_type, solutions))
def generate_report(self) -> Dict[str, int]:
return dict(self.severity_levels)
def print_report(self):
report = self.generate_report()
for severity, count in report.items():
color = self._get_severity_color(severity)
print(f"{color}{severity}: {count} occurrences{Style.RESET_ALL}")
self.print_suggestions()
self.print_statistics()
def print_suggestions(self):
for severity, suggestions in self.suggestions.items():
print(f"\n{self._get_severity_color(severity)}Suggestions for {severity}:{Style.RESET_ALL}")
for error_type, solution in suggestions:
print(f"{Fore.CYAN}Error: {error_type}{Style.RESET_ALL}")
print(f"{Fore.GREEN}Solution: {solution}{Style.RESET_ALL}")
def print_statistics(self):
print("\nStatistics by file:")
for file_name, stats in self.files_stats.items():
print(f"\n{Fore.BLUE}{file_name}{Style.RESET_ALL}")
for severity, count in stats.items():
color = self._get_severity_color(severity)
print(f"{color}{severity}: {count} occurences{Style.RESET_ALL}")
self.plot_statistics()
def plot_statistics(self):
severities = list(self.severity_levels.keys())
counts = list(self.severity_levels.values())
plt.bar(severities, counts, color=['red', 'yellow', 'green', 'cyan'])
plt.xlabel('Severity')
plt.ylabel('Count')
plt.title('Log Severity Distribution')
plt.show()
def _get_severity_color(self, severity: str) -> str:
color_map = {
'ERROR': Fore.RED,
'WARNING': Fore.YELLOW,
'INFO': Fore.GREEN,
'DEBUG': Fore.CYAN
}
return color_map.get(severity, Fore.WHITE)
def display_severity_lines(self, severity: str):
if severity in self.lines_by_severity:
print(f"\n{self._get_severity_color(severity)}Lines with {severity}:{Style.RESET_ALL}")
for line in self.lines_by_severity[severity]:
print(line)
else:
print(f"{Fore.RED}No lines found for severity level {severity}.{Style.RESET_ALL}")
def main():
log_directory = 'logscan'
config_file = 'config/config.json'
solutions_file = 'config/error_solutions.json'
if not os.path.exists(log_directory):
print(f"{Fore.RED}The directory '{log_directory}' does not exist.{Style.RESET_ALL}")
return
if not os.path.exists(config_file):
print(f"{Fore.RED}Configuration file '{config_file}' not found.{Style.RESET_ALL}")
return
if not os.path.exists(solutions_file):
print(f"{Fore.RED}Solutions file '{solutions_file}' not found.{Style.RESET_ALL}")
return
files = [f for f in os.listdir(log_directory) if f.endswith('.log')]
if not files:
print(f"{Fore.RED}No log files found in '{log_directory}'.{Style.RESET_ALL}")
return
print(f"{Fore.GREEN}Available log files:{Style.RESET_ALL}")
for i, file in enumerate(files):
print(f"{Fore.CYAN}{i + 1}. {file} {Style.RESET_ALL}")
choice = input("Enter the number of the file to scan, 'all' to scan all files, or a comma-separated list of numbers to scan multiple files: ")
if choice == 'all':
selected_files = [os.path.join(log_directory, file) for file in files]
else:
try:
selected_indices = [int(i) for i in choice.split(',')]
selected_files = [os.path.join(log_directory, files[i - 1]) for i in selected_indices if 1 <= i <= len(files)]
except ValueError:
print(f"{Fore.RED}Invalid input.{Style.RESET_ALL}")
return
scanner = LogScanner(log_directory, config_file, solutions_file)
scanner.scan_logs(selected_files)
scanner.print_report()
severity_choice = input("Enter the severity level to display lines (e.g., ERROR, WARNING, INFO, DEBUG): ").strip()
scanner.display_severity_lines(severity_choice.upper())
if __name__ == '__main__':
main()