LZW Compression & Decompression
Abstract: Have been coding for years, still never tried to implement these things on my own. So I decided to do it.
#
Not implementing any compression algorithms by hand feels very wrong to me so I'll start by doing that.
LZ78
The LZ78 algorithm goes as follows:- There is a dictionary
D
containing found patterns & their encodings. - There is a variable
P
containing the "prefix". In the beginning, the prefix is empty. - Repeat the following as long as there are characters in the input stream:
- Let variable
C
pointing to the most current character in the input stream. -
Check if string
P + C
is inD
.- If yes, then let
P
beP + C
(addingC
to the prefix) - If no:
- Output the encoding of
P
to the output stream. - Output
C
as well. - Add the string
P + C
to the dictionary. - Let
P
be empty.
- Output the encoding of
- If yes, then let
- Let variable
- If
P
still contains characters, output the corresponding encoding.
LZW
LZW is LZ78 but with a twist:
- The result only contains code; it does not output characters directly. This is possible because:
- The dictionary is initialized with every character in the character set, and:
- When a mismatch happen:
- The algorithm does not output
C
, and P
will be set asC
instead of empty.
- The algorithm does not output
The code
# because indexing a bytestring in python gives you int & it's like wtf # have to do some special manuvering instead def _chr(x): return bytes([x]) def lzw_compress(bstr): # type TrieNode = (Code, Map Byte TrieNode) # type Trie = Map Byte TrieNode d = {x: (x, {}) for x in range(0, 256)} d_counter = 256 def _gensym(): nonlocal d_counter res = d_counter # maximum code 12-bit # wikipedia says that the original LZW paper encodes 8-bit into # 12-bit codes. if res >= 4095: return None d_counter += 1 return res res = [] p = b'' def _chkpc(pc): nonlocal d d_pointer = d res = True code = -1 for c in pc: if d_pointer.get(c): code = d_pointer[c][0] d_pointer = d_pointer[c][1] else: res = False break return res, code, d_pointer for i, c in enumerate(bstr): cc = _chr(c) chkres, pcode, d_parent = _chkpc(p + cc) if chkres: p = p + cc else: res.append(pcode) sym = _gensym() if sym is not None: d_parent[c] = (sym, {}) p = cc chkres, pcode, d_parent = _chkpc(p) res.append(pcode) return res, d def lzw_decompress(code_list): d = {x: _chr(x) for x in range(0, 256)} d_counter = 256 def _gensym(): nonlocal d_counter res = d_counter d_counter += 1 return res cw = code_list[0] res = [d[cw]] pw = cw p = b'' for i, cw in enumerate(code_list[1:]): if d.get(cw): res.append(d[cw]) p = d[pw] c = _chr(d[cw][0]) d[_gensym()] = p+c else: p = d[pw] c = _chr(d[pw][0]) res.append(p+c) d[_gensym()] = p+c pw = cw return b''.join(res)
Some explanations
- This part:
# type TrieNode = (Code, Map Byte TrieNode) # type Trie = Map Byte TrieNode d = {x: (x, {}) for x in range(0, 256)} d_counter = 256 def _gensym(): nonlocal d_counter res = d_counter # maximum code 12-bit # wikipedia says that the original LZW paper encodes 8-bit into # 12-bit codes. if res >= 4095: return None d_counter += 1 return res def _chkpc(pc): nonlocal d d_pointer = d res = True code = -1 for c in pc: if d_pointer.get(c): code = d_pointer[c][0] d_pointer = d_pointer[c][1] else: res = False break return res, code, d_pointer
...is a trie used as the dictionary. If you're doing this in other languages (e.g. C), you might want to use other data structures (e.g. skip list) instead. But why4095
? (For 12-bit you can have 4096 different code which is 1 more) You'll know soon. -
This part:
sym = _gensym() if sym is not None: d_parent[c] = (sym, {})
...means we are not adding any new strings into the dictionary because the capacity (4096 for 12-bit code) is full.
The test
At first I wrote some script to randomly generate the test cases & measures how well the algorithm works...
import random import lzw_demo def _gentest(): x = [] lenx = random.randint(4096, 16384) for i in range(lenx): x.append(chr(random.randint(0, 255)).encode('latin-1')) return b''.join(x) def _test(n): failed_test = [] for _ in range(n): x = _gentest() a, _ = lzw_demo.lzw_compress(x) xx = lzw_demo.lzw_decompress(a) if x != xx: failed_test.append((x, a, xx)) return failed_test def _agg(n): ratio_list = [] for _ in range(n): x = _gentest() a, _ = lzw_demo.lzw_compress(x) ratio_list.append((len(a) * 12 / 8) / len(x)) return ratio_list
...but I quickly found out that I was stupid to write the generator function like that. LZW (or pretty much all compression algorithms) assumes that repeated patterns exist in input (so that one can shorten them) & totally random bytes stream almost definitely wouldn't compress well.
Used a lorem ipsum generator instead. On randomly generated texts the algorithm works pretty well, can get roughly 60% reduce in size.
The homemade lorem ipsum generator
I really don't enjoy manually using online tools so I decided to write one myself. Too lazy to find an English dictionary ready for processing so I wrote a function that generates random V/CV syllables instead.
def _not_jpn_syllable_gen(): c = b'0mnpbtdkgszhrjwh' v = b'ieaou' return bytes([c[random.randrange(0, len(c))], v[random.randrange(0, len(v))]]).replace(b'0', b'')
...and build the rest of the generator upon this function. The magic numbers are basically me roughly estimating common Japanese texts.
def _not_jpn_word_gen(): syll_cnt = random.randrange(1, 5) # more spaces = more chance of not generating an ending consonant v = b'mnpbtdkgszhrjwh ' res = [] for _ in range(syll_cnt): res.append(_not_jpn_syllable_gen()) return b''.join(res) + bytes([v[random.randrange(0, len(v))]]).replace(b' ', b'') def _not_jpn_sentence_gen(): sentence = [] for _ in range(random.randint(1, 4)): subsentence = [] for _ in range(random.randint(1, 15)): subsentence.append(_not_jpn_word_gen()) sentence.append(b' '.join(subsentence)) return b', '.join(sentence) def _not_jpn_paragraph_gen(): res = [] for _ in range(random.randint(1, 10)): res.append(_not_jpn_sentence_gen()) return b'. '.join(res) + b'.'
Test it out a bit...
>>> _not_jpn_paragraph_gen() b'gohesu jusepa hukasese ni kas to wop beb rapiduwo sosome miru more, buhijiho wizano domagoja dea soudume rudoguwo me noezome pomajaza ei, kewisij, zod zirijuzi mehunaho besokis guoju ruas keposej pohege. a zomusaj zu pigegi zuromi jinuehi mitiwo hibarana pu, bubuse tirebep siji jen woki jajapehu sodisudi hemedoji jiwezi hipezi ta, zozo be, nadajehes gatujose bepu pihepu na. kuwuzori meose. bugewaw jo zipozu mu wosuhe bagig hugige wuje wodizeuh worago runure hor wisop su. hozotesa dukujo peinu gojeza dipe medi zadodihi botipiwo.' >>> _not_jpn_paragraph_gen() b'suribi wijumo hepededu hoso zomo zasitakak utihuza moju sujibap bujonihi sadi nibob pigopumin wunu ruobena.' >>> _not_jpn_paragraph_gen() b'ri tomowoi a tututimi jenoo penu gib jamuhi nawesiza niwima gabeno, ti tede rahine dahunag bonao aze hupota niheku tek, saoto kiho ha gepazima, zise zegawona hohisuo.' >>> _not_jpn_paragraph_gen() b'tuka benegos pa go, gajesok huhejis zukiheka sego. ke. redak tujuwe, jo jura du pejo bipadato, hoh ho zosuge genose tuhu direba hibuki rubedi nununabo sazira zukabu ga pemuka. he geho. kodi wakonomu hegi uporu nimazi wite, wemojedi wonanoma odobe goh za worupas gusuhopi hosozute ro tahamu kehesu, howiriju ejejitop jipijeteg zuho aaguru jikotes mawuwu gosupa zawu bugada risiha dubiru giipa gi nat, bupu hez ja mogaka bahone hihenehe da gutazomi. tudehum. hatoa, naga ju segi bebuheri runibuh zuboma mosin joh wejaza tanosow rohenoze benewa suriwik zujamu semu, witogohu doso neha gepajotuh jizesib degeku huje titazopez. gojusado pi hojuhowu uraba wog mipu nonomonir, horoh winamabo sohu posihaj rigagu du gaiha meejanu azikodi marih hetita, jogonigi gakowehoj ga asozi dozije gupepa rodoposu rinajad zajemu kude wasuj. paw momiwiji wana pi winiwue ta hegekuog wugibure ros pi juga.'
oh hell that definitely does not look like Japanese at all, but for what I need it to be it's good enough.
The test, pt. 2
Now that we have our new generator function, let's plug it in:
def _gentest(): x = [] lenx = random.randint(10, 30) for i in range(lenx): x.append(_not_jpn_paragraph_gen()) return b'\n\n'.join(x)
>>> sum(_agg(1024)) / 1024 0.4271855249051684
Roughly 58% reduce in size, Slightly less than using lipsum.com. It's probably due to how the texts are generated: normal texts will have more repeated patterns.
Wrapping it up (into a CLI tool)
Remember the4095
above? This is where it's useful:
- Each code takes 12-bit to store which is one and a half byte.
- Half of a byte is (somewhat) tricky to handle; it's easier to pack 2 code together into 3 bytes.
- Imagine a situation where we have exactly one code left for packing; we need something to fill in the blank.
- If we have allowed
4095
as a valid code value, there will be situations where all the valid code values are used and we have exactly one code left. In such situations we won't be able to find anything to fill within the 12-bit range because they'll be mistaken as valid encoding of data that does not exist; by disallowing4095
we can be sure that it's a padding code.
byte 7 6 5 4 3 2 1 0 7 6 5 4 3 2 1 0 7 6 5 4 3 2 1 0 code 11 10 09 08 07 06 05 04 03 02 01 00 11 10 09 08 07 06 05 04 03 02 01 00 part A A A A A A A A A A A B B B B B B B B B B B B B
The code for packing & de-packing:
def _pack(code_list): if len(code_list) % 2: code_list.append(4095) i = 0 lenx = len(code_list) res = [] while i < lenx: code1 = code_list[i] code2 = code_list[i+1] byte1 = (code1 >> 4) byte2 = ((code1 & 0xf) << 4) | (code2 >> 8) byte3 = (code2 & 0xff) res.append(byte1) res.append(byte2) res.append(byte3) i += 2 return bytes(res) def _depack(bstr): res = [] i = 0 lenx = len(bstr) while i < lenx: piece = bstr[i:i+3] code1 = (piece[0] << 4) | (piece[1] >> 4) code2 = ((piece[1] & 0xf) << 8) | piece[2] res.append(code1) res.append(code2) i += 3 return res
The rest is simple:
import sys if len(sys.argv) < 3: print('Usage: lzw_demo [compress|decompress] [filename]') elif sys.argv[1] == 'compress': with open(sys.argv[2], 'rb') as f: data = f.read() res, _ = lzw_compress(data) with open(f'{sys.argv[2]}.lzw', 'wb') as f: f.write(_pack(res)) elif sys.argv[1] == 'decompress': with open(sys.argv[2], 'rb') as f: data = f.read() unpacked = _unpack(data) if (unpacked[len(unpacked) - 1] == 4095): unpacked.pop() res = lzw_decompress(unpacked) with open(f'{sys.argv[2]}.lzw_dec', 'wb') as f: f.write(res)
Back