This repository has been archived by the owner on Dec 4, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlz77.py
488 lines (415 loc) · 20 KB
/
lz77.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
import ctypes
import os
from collections import defaultdict
from typing import Generator, List, MutableMapping, Optional, Set, Tuple
from typing_extensions import Final
#from .. import package_root
# Attempt to use the faster C++ libraries if they're available
try:
clib = None
clib_path = os.path.join(package_root, "protocol")
files = [f for f in os.listdir(clib_path) if f.startswith("lz77cpp") and f.endswith(".so")]
if len(files) > 0:
clib = ctypes.cdll.LoadLibrary(os.path.join(clib_path, files[0]))
clib.decompress.argtypes = (ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_int)
clib.decompress.restype = ctypes.c_int
clib.compress.argtypes = (ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_int)
clib.compress.restype = ctypes.c_int
except Exception:
clib = None
class LzException(Exception):
"""
An exception thrown when we encounter an error with Lz77 encoding/decoding.
"""
class Lz77Decompress:
"""
A class that can decompress an Lz77 stream of data. Notably, this is a different
variant to the Lz77 found in firebeat executables and BIOS. This is used for
over-the-wire compression of XML data, as well as compression inside a decent
amount of file formats found in various Konami games.
"""
RING_LENGTH: Final[int] = 0x1000
FLAG_COPY: Final[int] = 1
FLAG_BACKREF: Final[int] = 0
def __init__(self, data: bytes, backref: Optional[int] = None) -> None:
"""
Initialize the object.
Parameters:
data - Binary blob representing the data to be decompressed.
"""
self.eof: bool = False
self.data: bytes = data
self.read_pos: int = 0
self.left: int = len(self.data)
self.flags: int = 1
self.write_pos: int = 0
self.pending_copy_amount: int = 0
self.pending_copy_pos: int = 0
self.pending_copy_max: int = 0
self.ringlength: int = backref or self.RING_LENGTH
self.ring: bytes = b'\x00' * self.ringlength
def _ring_read(self, copy_pos: int, copy_len: int) -> Generator[bytes, None, None]:
"""
Read the next bytes from the backref ring at the current copy position.
Returns:
a generator which yields bytes.
"""
while copy_len > 0:
if copy_pos + copy_len > self.ringlength:
# Copy first chunk, let subsequent loop handle the next chunks
amount = self.ringlength - copy_pos
else:
# Copy the whole thing out, we have enough space to do so
amount = copy_len
ret = self.ring[copy_pos:(copy_pos + amount)]
self._ring_write(ret)
yield ret
copy_pos = (copy_pos + amount) % self.ringlength
copy_len -= amount
def _ring_write(self, bytedata: bytes) -> None:
"""
Write bytes into the backref ring.
Parameters:
byte - A byte to be written at the current write offset
"""
while True:
amount = len(bytedata)
if amount == 0:
return
if amount > (self.ringlength - self.write_pos):
amount = self.ringlength - self.write_pos
self.ring = self.ring[:self.write_pos] + bytedata[:amount] + self.ring[(self.write_pos + amount):]
bytedata = bytedata[amount:]
self.write_pos = (self.write_pos + amount) % self.ringlength
def decompress_bytes(self) -> Generator[bytes, None, None]:
"""
Yield the next byte from the decompressed output. If we are
in a backref copy, read the next byte from the backref. If
we aren't, read the next flag to see if we should decode
a byte directly or if we should start another backref read.
If we don't have any flags, exit reporting EOF (None). If
we hit the end of the stream, stop yielding to signify EOF.
In all cases, whatever byte we read should be added back to
the backref buffer.
Returns:
a generator that yields bytes.
"""
while not self.eof:
if self.pending_copy_amount > 0:
# We had a backref that would have copied more data than we had available
# in the ringbuffer (because every read, even a backref, adds to the
# ringbuffer). So, since we read that last time and wrote it to the backbuffer
# we are safe to read again.
amount = min(self.pending_copy_amount, self.pending_copy_max)
yield from self._ring_read(self.pending_copy_pos, amount)
# We read this many bytes and are about to write them to the ringbuffer,
# so bookkeep that.
self.pending_copy_amount -= amount
self.pending_copy_max = amount
else:
if self.flags == 1:
# Load the next byte for processing
if self.left == 0:
# We have nothing left to read, so skip the
# ringbuffer write below and exit early.
return
else:
self.flags = 0x100 | self.data[self.read_pos]
self.read_pos += 1
self.left -= 1
# Shift the lowest bit out to be retrieved as a flag
flag = self.flags & 1
self.flags >>= 1
if flag == self.FLAG_COPY:
# Figure out how much to pull at once
amount = 1
while self.flags != 1 and (self.flags & 1) == self.FLAG_COPY:
# We would do a copy next time, so pop that flag and just add to our read amount
self.flags >>= 1
amount += 1
# Grab chunk right out of the data source
b = self.data[self.read_pos:(self.read_pos + amount)]
self._ring_write(b)
yield b
self.read_pos += amount
self.left -= amount
elif flag == self.FLAG_BACKREF:
yield from self._read_backref()
else:
raise Exception("Logic error!")
def _read_backref(self) -> Generator[bytes, None, None]:
"""
Read a backref chunk. Grab the copy length and copy position
from the first two bytes and then read the first byte from
the backref. Sets up variables such that Lz77Decompress.__read()
can finish copying out of the backref on subsequent calls. Should
only be called by Lz77Decompress.__read(). If we discover the end
of stream, we don't generate any bytes and instead set the eof
flag which terminates the main decompression loop above.
Returns:
a generator that yields bytes.
"""
if self.left == 0:
self.eof = True
return
if self.left == 1:
raise LzException('Unexpected EOF mid-backref')
hi = self.data[self.read_pos]
lo = self.data[self.read_pos + 1]
self.read_pos += 2
self.left -= 2
copy_len = lo & 0xF
copy_pos = (hi << 4) | (lo >> 4)
if copy_pos > 0:
copy_len += 3
if copy_len > copy_pos:
# Remember what we have to do left, and the safe
# amount to copy next time (which is our length,
# since we are about to write that many butes to
# the ringbuffer right after reading them).
self.pending_copy_amount = copy_len - copy_pos
self.pending_copy_pos = self.write_pos
self.pending_copy_max = copy_pos
# Only copy the available bytes
copy_len = copy_pos
copy_pos = (self.write_pos - copy_pos)
while copy_pos < 0:
copy_pos += self.ringlength
copy_pos = copy_pos % self.ringlength
yield from self._ring_read(copy_pos, copy_len)
else:
self.eof = True
return
class Lz77Compress:
"""
A class that can compress arbitrary binary data using the Lz77 protocol.
Note that this does support overlapped backtracks, so for instance the
string "abcabcabc" will be compressed properly (see unit tests for examples).
Great care has been taken in optimizing this and then we further optimize
by using Cython to build, netting us another 40% speed-up. This is important
because for any given packet we are decompressing and compressing at least
once, and if we use a proxy to direct traffic, possibly a second time.
"""
RING_LENGTH: Final[int] = 0x1000
LOOSE_COMPRESS_THRESHOLD: Final[int] = 1024 * 512
FLAG_COPY: Final[int] = 1
FLAG_BACKREF: Final[int] = 0
def __init__(self, data: bytes, backref: Optional[int] = None) -> None:
"""
Initialize the object.
Parameters:
data - Binary blob representing the data to be decompressed.
"""
self.data: bytes = data
self.read_pos: int = 0
self.left: int = len(self.data)
self.eof: bool = False
self.bytes_written: int = 0
self.ringlength: int = backref or self.RING_LENGTH
self.locations: MutableMapping[int, Set[int]] = defaultdict(set)
self.starts: MutableMapping[bytes, Set[int]] = defaultdict(set)
self.last_start: Tuple[int, int, int] = (0, 0, 0)
if len(data) > self.LOOSE_COMPRESS_THRESHOLD:
self._ring_write = self._ring_write_starts_only
else:
self._ring_write = self._ring_write_both
def _ring_write_starts_only(self, bytedata: bytes) -> None:
"""
Write bytes into the backref ring.
Parameters:
byte - A byte to be written at the current write offset
"""
for byte in bytedata:
# Update the start locations hashmap if we're past the beginning
self.last_start = (self.last_start[1], self.last_start[2], byte)
if self.bytes_written >= 2:
self.starts[bytes(self.last_start)].add(self.bytes_written - 2)
# Keep track of the fact that we wrote this byte.
self.bytes_written += 1
def _ring_write_both(self, bytedata: bytes) -> None:
"""
Write bytes into the backref ring.
Parameters:
byte - A byte to be written at the current write offset
"""
for byte in bytedata:
# Update the start locations hashmap if we're past the beginning
self.last_start = (self.last_start[1], self.last_start[2], byte)
if self.bytes_written >= 2:
self.starts[bytes(self.last_start)].add(self.bytes_written - 2)
# Update the rest of the location hashmaps
self.locations[byte].add(self.bytes_written)
# Keep track of the fact that we wrote this byte.
self.bytes_written += 1
def compress_bytes(self) -> Generator[bytes, None, None]:
"""
Given the current stream, go through and assemble the next flag byte
followed by the next chunk of compressed data.
"""
while not self.eof:
if self.left == 0:
# Output a dummy flag and an end of stream marker.
self.eof = True
yield b"\x00\x00\x00"
else:
# Need to assemble and return the next chunk, which is a flag
# byte and then 8 instructions.
flags = 0x0
flagpos = -1
data: List[bytes] = [b""] * 8
for _ in range(8):
# Track what flag we're generating data for
flagpos += 1
if self.left == 0:
# Output the end of stream marker, set EOF since we've succeeded
# in outputting all flags.
flags |= self.FLAG_BACKREF << flagpos
data[flagpos] = b"\x00\x00"
self.eof = True
break
elif self.left < 3 or self.bytes_written < 3:
# We either don't have enough data written to backref, or we
# don't have enough data in the stream that could be made into
# a backref.
flags |= self.FLAG_COPY << flagpos
chunk = self.data[self.read_pos:(self.read_pos + 1)]
data[flagpos] = chunk
self._ring_write(chunk)
self.read_pos += 1
self.left -= 1
continue
# Figure out the maximum backref we can attempt to find
backref_amount = min(self.left, 18)
# Iterate over all spots where the first byte equals, and is in range.
earliest = max(0, self.bytes_written - (self.ringlength - 1))
index = self.data[self.read_pos:(self.read_pos + 3)]
updated_backref_locations: Set[int] = set(
absolute_pos for absolute_pos in self.starts[index]
if absolute_pos >= earliest
)
self.starts[index] = updated_backref_locations
possible_backref_locations: List[int] = list(updated_backref_locations)
# Output the data as a copy if we couldn't find a backref
if not possible_backref_locations:
flags |= self.FLAG_COPY << flagpos
chunk = self.data[self.read_pos:(self.read_pos + 1)]
data[flagpos] = chunk
self._ring_write(chunk)
self.read_pos += 1
self.left -= 1
continue
# Now, find the longest actual backref of our possibilities. We know
# we're going to write at least these three bytes, so append it to the
# output buffer.
start_write_size = self.bytes_written
self._ring_write(index)
copy_amount = 3
while copy_amount < backref_amount:
# First, let's see if we have any 3-wide chunks to consume.
index = self.data[(self.read_pos + copy_amount):(self.read_pos + copy_amount + 3)]
locations = self.starts[index]
new_backref_locations: List[int] = [
absolute_pos for absolute_pos in possible_backref_locations
if absolute_pos + copy_amount in locations
]
if new_backref_locations:
# Mark that we're copying an extra byte from the backref.
self._ring_write(index)
copy_amount += 3
possible_backref_locations = new_backref_locations
else:
# Check our existing locations to figure out if we still have
# longest prefixes of 1 or 2 left.
while copy_amount < backref_amount:
locations = self.locations[self.data[self.read_pos + copy_amount]]
new_backref_locations = [
absolute_pos for absolute_pos in possible_backref_locations
if absolute_pos + copy_amount in locations
]
# If we have no longest prefixes, that means that any of the
# previous prefixes are good enough.
if not new_backref_locations:
break
# Mark that we're copying an extra byte from the backref.
self._ring_write(self.data[(self.read_pos + copy_amount):(self.read_pos + copy_amount + 1)])
copy_amount += 1
possible_backref_locations = new_backref_locations
# Either we looped above in the inner while, or we didn't. Either way,
# we don't have any more to check since there were no more 3-long backref
# locations.
break
# Now that we have a list of candidates, arbitrarily pick the
# first one as our candidate and output it.
absolute_pos = possible_backref_locations[0]
backref_pos = start_write_size - absolute_pos
lo = (copy_amount - 3) & 0xF | ((backref_pos & 0xF) << 4)
hi = (backref_pos >> 4) & 0xFF
flags |= self.FLAG_BACKREF << flagpos
data[flagpos] = bytes([hi, lo])
self.read_pos += copy_amount
self.left -= copy_amount
yield bytes([flags]) + b"".join(data)
class Lz77:
"""
A wrapper class encapsulating Lz77 encoding and decoding.
"""
def __init__(self, backref: Optional[int] = None) -> None:
"""
Initialize the object.
"""
self.backref = backref
def decompress(self, data: bytes) -> bytes:
"""
Given a binary blob, return a new binary blob representing the decompressed data.
Parameters:
data - Lz77-compressed binary data
Returns:
Raw binary data.
"""
if clib is not None:
# Given a maximum backref length of 18, if we had a file that was
# only backrefs of maximum size. We would get a compression of around
# (18 * 8) / (2 * 8 + 1), or 8.47. So, allocate 9 times in the output
# buffer.
outbuf = b"\0" * (len(data) * 9)
result = clib.decompress(data, len(data), outbuf, len(outbuf))
if result >= 0:
return outbuf[:result]
elif result == -1:
raise LzException("Not enough room in output buffer!")
elif result == -2:
raise LzException("Unexpected EOF during decompression!")
elif result == -3:
raise LzException("Not enough room to write output byte!")
else:
raise LzException("Unknown exception in C++ code!")
else:
lz = Lz77Decompress(data, backref=self.backref)
return b''.join(lz.decompress_bytes())
def compress(self, data: bytes) -> bytes:
"""
Given a binary blob, return a new binary blob representing the compressed data.
Parameters:
data - Raw binary data.
Returns:
L7zz-compressed binary data.
"""
if clib is not None:
# Given a worst case scenario where we end up copying every byte to
# the output, compression would actually inflate the file by 9/8 size.
# Leave enough room for a trailing EOF reference.
outbuf = b"\0" * (int((len(data) * 9) / 8) + 3)
result = clib.compress(data, len(data), outbuf, len(outbuf))
if result >= 0:
return outbuf[:result]
elif result == -1:
raise LzException("Not enough room in output buffer!")
elif result == -2:
raise LzException("Unexpected lack of backref during compression!")
elif result == -3:
raise LzException("Not enough room to write output byte!")
else:
raise LzException("Unknown exception in C++ code!")
else:
lz = Lz77Compress(data, backref=self.backref)
return b''.join(lz.compress_bytes())