LZW Compression & Decompression

Abstract: Have been coding for years, still never tried to implement these things on my own. So I decided to do it.

#

Not implementing any compression algorithms by hand feels very wrong to me so I'll start by doing that.

LZ78

The LZ78 algorithm goes as follows:

LZW

LZW is LZ78 but with a twist:

The code

# because indexing a bytestring in python gives you int & it's like wtf
# have to do some special manuvering instead
def _chr(x):
    return bytes([x])

def lzw_compress(bstr):
    # type TrieNode = (Code, Map Byte TrieNode)
    # type Trie = Map Byte TrieNode
    d = {x: (x, {}) for x in range(0, 256)}
    d_counter = 256
    def _gensym():
        nonlocal d_counter
        res = d_counter
        # maximum code 12-bit
        # wikipedia says that the original LZW paper encodes 8-bit into
        # 12-bit codes.
        if res >= 4095:
            return None
        d_counter += 1
        return res
    res = []
    p = b''
    def _chkpc(pc):
        nonlocal d
        d_pointer = d
        res = True
        code = -1
        for c in pc:
            if d_pointer.get(c):
                code = d_pointer[c][0]
                d_pointer = d_pointer[c][1]
            else:
                res = False
                break
        return res, code, d_pointer

    for i, c in enumerate(bstr):
        cc = _chr(c)
        chkres, pcode, d_parent = _chkpc(p + cc)
        if chkres:
            p = p + cc
        else:
            res.append(pcode)
            sym = _gensym()
            if sym is not None:
                d_parent[c] = (sym, {})
            p = cc
        
    chkres, pcode, d_parent = _chkpc(p)
    res.append(pcode)
    return res, d

def lzw_decompress(code_list):
    d = {x: _chr(x) for x in range(0, 256)}
    d_counter = 256
    def _gensym():
        nonlocal d_counter
        res = d_counter
        d_counter += 1
        return res
    cw = code_list[0]
    res = [d[cw]]
    pw = cw
    p = b''
    for i, cw in enumerate(code_list[1:]):
        if d.get(cw):
            res.append(d[cw])
            p = d[pw]
            c = _chr(d[cw][0])
            d[_gensym()] = p+c
        else:
            p = d[pw]
            c = _chr(d[pw][0])
            res.append(p+c)
            d[_gensym()] = p+c
        pw = cw
    return b''.join(res)
        

Some explanations

The test

At first I wrote some script to randomly generate the test cases & measures how well the algorithm works...

import random
import lzw_demo

def _gentest():
    x = []
    lenx = random.randint(4096, 16384)
    for i in range(lenx):
        x.append(chr(random.randint(0, 255)).encode('latin-1'))
    return b''.join(x)


def _test(n):
    failed_test = []
    for _ in range(n):
        x = _gentest()
        a, _ = lzw_demo.lzw_compress(x)
        xx = lzw_demo.lzw_decompress(a)
        if x != xx:
            failed_test.append((x, a, xx))
    return failed_test


def _agg(n):
    ratio_list = []
    for _ in range(n):
        x = _gentest()
        a, _ = lzw_demo.lzw_compress(x)
        ratio_list.append((len(a) * 12 / 8) / len(x))
    return ratio_list
            

...but I quickly found out that I was stupid to write the generator function like that. LZW (or pretty much all compression algorithms) assumes that repeated patterns exist in input (so that one can shorten them) & totally random bytes stream almost definitely wouldn't compress well.

Used a lorem ipsum generator instead. On randomly generated texts the algorithm works pretty well, can get roughly 60% reduce in size.

The homemade lorem ipsum generator

I really don't enjoy manually using online tools so I decided to write one myself. Too lazy to find an English dictionary ready for processing so I wrote a function that generates random V/CV syllables instead.


def _not_jpn_syllable_gen():
    c = b'0mnpbtdkgszhrjwh'
    v = b'ieaou'
    return bytes([c[random.randrange(0, len(c))], v[random.randrange(0, len(v))]]).replace(b'0', b'')
        

...and build the rest of the generator upon this function. The magic numbers are basically me roughly estimating common Japanese texts.

def _not_jpn_word_gen():
    syll_cnt = random.randrange(1, 5)
    # more spaces = more chance of not generating an ending consonant
    v = b'mnpbtdkgszhrjwh                                                   '
    res = []
    for _ in range(syll_cnt):
        res.append(_not_jpn_syllable_gen())
    return b''.join(res) + bytes([v[random.randrange(0, len(v))]]).replace(b' ', b'')

def _not_jpn_sentence_gen():
    sentence = []
    for _ in range(random.randint(1, 4)):
        subsentence = []
        for _ in range(random.randint(1, 15)):
            subsentence.append(_not_jpn_word_gen())
        sentence.append(b' '.join(subsentence))
    return b', '.join(sentence)

def _not_jpn_paragraph_gen():
    res = []
    for _ in range(random.randint(1, 10)):
        res.append(_not_jpn_sentence_gen())
    return b'. '.join(res) + b'.'
    

Test it out a bit...

>>> _not_jpn_paragraph_gen()
b'gohesu jusepa hukasese ni kas to wop beb rapiduwo sosome miru more, buhijiho wizano domagoja dea soudume
rudoguwo me noezome pomajaza ei, kewisij, zod zirijuzi mehunaho besokis guoju ruas keposej pohege. a zomusaj
zu pigegi zuromi jinuehi mitiwo hibarana pu, bubuse tirebep siji jen woki jajapehu sodisudi hemedoji jiwezi
hipezi ta, zozo be, nadajehes gatujose bepu pihepu na. kuwuzori meose. bugewaw jo zipozu mu wosuhe bagig
hugige wuje wodizeuh worago runure hor wisop su. hozotesa dukujo peinu gojeza dipe medi zadodihi botipiwo.'
>>> _not_jpn_paragraph_gen()
b'suribi wijumo hepededu hoso zomo zasitakak utihuza moju sujibap bujonihi sadi nibob pigopumin wunu ruobena.'
>>> _not_jpn_paragraph_gen()
b'ri tomowoi a tututimi jenoo penu gib jamuhi nawesiza niwima gabeno, ti tede rahine dahunag bonao aze hupota
niheku tek, saoto kiho ha gepazima, zise zegawona hohisuo.'
>>> _not_jpn_paragraph_gen()
b'tuka benegos pa go, gajesok huhejis zukiheka sego. ke. redak tujuwe, jo jura du pejo bipadato, hoh ho
zosuge genose tuhu direba hibuki rubedi nununabo sazira zukabu ga pemuka. he geho. kodi wakonomu hegi uporu
nimazi wite, wemojedi wonanoma odobe goh za worupas gusuhopi hosozute ro tahamu kehesu, howiriju ejejitop
jipijeteg zuho aaguru jikotes mawuwu gosupa zawu bugada risiha dubiru giipa gi nat, bupu hez ja mogaka bahone
hihenehe da gutazomi. tudehum. hatoa, naga ju segi bebuheri runibuh zuboma mosin joh wejaza tanosow rohenoze
benewa suriwik zujamu semu, witogohu doso neha gepajotuh jizesib degeku huje titazopez. gojusado pi
hojuhowu uraba wog mipu nonomonir, horoh winamabo sohu posihaj rigagu du gaiha meejanu azikodi marih hetita,
jogonigi gakowehoj ga asozi dozije gupepa rodoposu rinajad zajemu kude wasuj. paw momiwiji wana pi winiwue
ta hegekuog wugibure ros pi juga.'
        

oh hell that definitely does not look like Japanese at all, but for what I need it to be it's good enough.

The test, pt. 2

Now that we have our new generator function, let's plug it in:

def _gentest():
    x = []
    lenx = random.randint(10, 30)
    for i in range(lenx):
        x.append(_not_jpn_paragraph_gen())
    return b'\n\n'.join(x)
        
>>> sum(_agg(1024)) / 1024
0.4271855249051684
        

Roughly 58% reduce in size, Slightly less than using lipsum.com. It's probably due to how the texts are generated: normal texts will have more repeated patterns.

Wrapping it up (into a CLI tool)

Remember the 4095 above? This is where it's useful: I decide to pack the code like this:
byte  7  6  5  4  3  2  1  0  7  6  5  4  3  2  1  0  7  6  5  4  3  2  1  0
code 11 10 09 08 07 06 05 04 03 02 01 00 11 10 09 08 07 06 05 04 03 02 01 00
part  A  A  A  A  A  A  A  A  A  A  A  B  B  B  B  B  B  B  B  B  B  B  B  B
        

The code for packing & de-packing:


def _pack(code_list):
    if len(code_list) % 2:
        code_list.append(4095)
    i = 0
    lenx = len(code_list)
    res = []
    while i < lenx:
        code1 = code_list[i]
        code2 = code_list[i+1]
        byte1 = (code1 >> 4)
        byte2 = ((code1 & 0xf) << 4) | (code2 >> 8)
        byte3 = (code2 & 0xff)
        res.append(byte1)
        res.append(byte2)
        res.append(byte3)
        i += 2
    return bytes(res)

def _depack(bstr):
    res = []
    i = 0
    lenx = len(bstr)
    while i < lenx:
        piece = bstr[i:i+3]
        code1 = (piece[0] << 4) | (piece[1] >> 4)
        code2 = ((piece[1] & 0xf) << 8) | piece[2]
        res.append(code1)
        res.append(code2)
        i += 3
    return res        
        

The rest is simple:


import sys

if len(sys.argv) < 3:
    print('Usage: lzw_demo [compress|decompress] [filename]')
elif sys.argv[1] == 'compress':
    with open(sys.argv[2], 'rb') as f:
        data = f.read()
    res, _ = lzw_compress(data)
    with open(f'{sys.argv[2]}.lzw', 'wb') as f:
        f.write(_pack(res))
elif sys.argv[1] == 'decompress':
    with open(sys.argv[2], 'rb') as f:
        data = f.read()
    unpacked = _unpack(data)
    if (unpacked[len(unpacked) - 1] == 4095):
        unpacked.pop()
    res = lzw_decompress(unpacked)
    with open(f'{sys.argv[2]}.lzw_dec', 'wb') as f:
        f.write(res)
        


Back