-
Notifications
You must be signed in to change notification settings - Fork 1
/
Windows.py
167 lines (139 loc) · 6.04 KB
/
Windows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import ctypes
import os
import win32com.client
import time
from concurrent.futures import ThreadPoolExecutor
from colorama import init, Fore, Back, Style
import pyfiglet
# Initialize colorama
init(autoreset=True)
def list_physical_disks():
"""List all physical disks available on the system."""
physical_disks = []
wmi = win32com.client.Dispatch("WbemScripting.SWbemLocator")
service = wmi.ConnectServer(".", "root\\cimv2")
for disk in service.ExecQuery("SELECT DeviceID, Model FROM Win32_DiskDrive"):
physical_disks.append((disk.DeviceID, disk.Model))
return physical_disks
def get_disk_size(disk):
"""Get the size of the physical disk in bytes."""
wmi = win32com.client.Dispatch("WbemScripting.SWbemLocator")
service = wmi.ConnectServer(".", "root\\cimv2")
for d in service.ExecQuery("SELECT Size, DeviceID FROM Win32_DiskDrive"):
if d.DeviceID == disk:
return int(d.Size)
return 0
def read_physical_disk(disk, block_size, offset):
"""Read a block of specified size from the physical disk at the given offset."""
handle = ctypes.windll.kernel32.CreateFileW(
f"\\\\.\\{disk}",
0x80000000, # GENERIC_READ
0x00000001 | 0x00000002, # FILE_SHARE_READ | FILE_SHARE_WRITE
None,
0x00000003, # OPEN_EXISTING
0,
None
)
if handle == ctypes.c_void_p(-1).value:
raise Exception(f"Failed to open disk {disk}")
# Move the file pointer to the correct position using SetFilePointerEx
offset_high = ctypes.c_long(offset >> 32)
offset_low = ctypes.c_long(offset & 0xFFFFFFFF)
result = ctypes.windll.kernel32.SetFilePointerEx(handle, offset_low, ctypes.byref(offset_high), 0)
if not result:
ctypes.windll.kernel32.CloseHandle(handle)
raise Exception(f"Failed to set file pointer for disk {disk}")
read_buffer = ctypes.create_string_buffer(block_size)
read = ctypes.c_ulong(0)
success = ctypes.windll.kernel32.ReadFile(
handle,
read_buffer,
block_size,
ctypes.byref(read),
None
)
ctypes.windll.kernel32.CloseHandle(handle)
if not success or read.value == 0:
return None, 0
return read_buffer.raw[:read.value], read.value
def format_speed(bytes_per_second):
"""Format the speed to be human-readable."""
units = ["B/s", "KB/s", "MB/s", "GB/s"]
speed = bytes_per_second
unit = units[0]
for u in units:
if speed < 1024:
unit = u
break
speed /= 1024
return f"{speed:.2f} {unit}"
def format_time(seconds):
"""Format seconds into HH:MM:SS format."""
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}"
def main():
# ASCII art header
header = pyfiglet.figlet_format("Disk Copy Tool")
print(Fore.YELLOW + header)
# Show credit line
print(Fore.CYAN + "Credit: Development by DRC Lab/ Nguyen Vu Ha +84903408066 Ha Noi, Viet Nam")
# List all physical disks
disks = list_physical_disks()
if not disks:
print(Fore.RED + "No physical disks found.")
return
# Display the list of disks to the user
print(Fore.CYAN + "\nSelect a physical disk to copy:")
for idx, (device_id, model) in enumerate(disks):
print(f"{Fore.GREEN}{idx + 1}: {device_id} ({model})")
# Get the user's choice
choice = int(input(Fore.YELLOW + "Enter the number of the disk: ")) - 1
if choice < 0 or choice >= len(disks):
print(Fore.RED + "Invalid choice.")
return
selected_disk, disk_model = disks[choice]
# Prompt for the directory path to save the image file
directory_path = input(Fore.YELLOW + "Enter the directory path to save the image file: ")
# Ensure the directory exists
if not os.path.isdir(directory_path):
print(Fore.RED + "The specified directory does not exist.")
return
# Create the full save file path with the disk model as the file name
save_file_name = f"{disk_model.replace(' ', '_').replace('/', '_')}.img"
save_file_path = os.path.join(directory_path, save_file_name)
block_size = 256 * 512 # 256 sectors * 512 bytes per sector
# Get the disk size
disk_size = get_disk_size(selected_disk)
if disk_size == 0:
print(Fore.RED + "Unable to determine the disk size.")
return
total_sectors = disk_size // 512
def copy_block(offset):
return read_physical_disk(selected_disk, block_size, offset)
# Copy the disk to the image file
try:
with open(save_file_path, 'wb') as img_file:
start_time = time.time()
total_read = 0
with ThreadPoolExecutor() as executor:
futures = {executor.submit(copy_block, offset): offset for offset in range(0, disk_size, block_size)}
for future in futures:
block, read_size = future.result()
if block is not None:
img_file.write(block)
total_read += read_size
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > 0:
speed = total_read / elapsed_time
progress = (total_read / disk_size) * 100
remaining_seconds = (disk_size - total_read) / speed if speed > 0 else 0
print(f"\r{Fore.CYAN}Progress: {progress:.2f}% | Speed: {format_speed(speed)} | "
f"Sectors: {total_read // 512}/{total_sectors} | "
f"ETA: {format_time(remaining_seconds)}", end='')
print(Fore.GREEN + f"\nDisk {selected_disk} copied to {save_file_path} successfully.")
except Exception as e:
print(Fore.RED + f"An error occurred: {e}")
if __name__ == "__main__":
main()