#!/usr/pkg/bin/python """ASCII text document compression scheme a (possibly) new method of compression that leaves documents searchable in their compressed state by tokenizing words""" Copyright = """ docpack -- an ASCII text document compression scheme Copyright (C) 2005 John Comeau This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. """ errormessage = "Not all needed libraries found, upgrade or check path: " try: True # not defined in older Python releases except: True, False = 1, 0 try: import sys, os, types, re, pwd sys.path.append(os.path.join(pwd.getpwuid(os.geteuid())[5], 'lib', 'python')) errormessage = errormessage + repr(sys.path) from com.jcomeau import gpl, jclicense except: try: sys.stderr.write("%s\n" % errormessage) except: print errormessage raise # get name this program was called as self = os.path.split(sys.argv[0])[1] command = os.path.splitext(self)[0] # chop any suffix (extension) # now get name we gave it when we wrote it originalself = re.compile('[0-9A-Za-z]+').search(Copyright).group() # globals and routines that should be in every program # (yes, you could import them, but there are problems in that approach too) def DebugPrint(*whatever): return False # defined instead by pytest module, use that for debugging def join(*args): "for pythons without str.join" string, array = args if type(array) == types.StringType: array = eval(array) if hasattr(str, 'join'): return string.join(array) else: joined = '' for index in range(0, len(array)): joined = joined + array[index] if index != (len(array) - 1): joined = joined + string return joined def split(*args): "for pythons without str.split" string, string_to_split = args if not len(string): string = None if hasattr('str', 'split'): return string_to_split.split(string) else: return re.compile(re.escape(string)).split(string_to_split) # other globals, specific to this program from com.jcomeau.arcane import countdict import struct TOKENPATTERN = re.compile('[0-9A-Za-z]+') # google-style i think (hope) # NOTE: keep the following two patterns consistent! PACKPATTERN = re.compile('\d+.') # packed punctuation UNPACKPATTERN = re.compile('(\d+)(.)') # grouped version of above TOKENFILE = 'tokens.txt' PTOKENFILE = 'ptokens.txt' DATA = 'data' PDATA = 'pdata' UNICODE_TEST = u'\U00010000' # for determining Unicode capability BASECHAR = u' ' # compressed characters start at this def capitalization(*args): """codify the capitalization scheme of an array of words a word longer than 36 characters with uneven capitalization will break this""" capdata, word = '', args[0] if word == word.upper(): return 'C' elif word == word.lower(): return '' else: capdata = 'c' for index in range(len(word)): if word[index] == word[index].upper(): capdata = capdata + str(index) + ';' return capdata[0:-1] # chop final ';' def capitalize(*args): "return the capitalized token" token, capitalization = args #DebugPrint('token', token, 'capitalization', capitalization) if capitalization == 'C': return token.upper() elif capitalization == '': return token elif capitalization[0] == 'c': letters = map(None, token) capdata = map(int, split(';', capitalization[1:])) for index in capdata: try: letters[index] = letters[index].upper() except: #DebugPrint('index', index, 'letters', letters) raise return ''.join(letters) def writekeys(*args): "write out keys of countdict starting from highest-frequency token" filehandle, tokendict = args if type(filehandle) == types.StringType: filehandle = open(filehandle, 'wb') if type(tokendict) == types.StringType and tokendict[0] == '{': tokendict = eval(tokendict) tokenlist = tokendict.keys() tokenlist.sort(lambda a, b: cmp(tokendict[b], tokendict[a])) for token in tokenlist: filehandle.write('%s\n' % (token)) filehandle.close() def extract_tokens(*args): if len(args) == 1 and type(args[0]) != types.StringType: args = args[0] filelist = args tokens, ptokens = countdict(), countdict() for filename in filelist: DebugPrint('extracting tokens from', filename) for line in open(filename).readlines(): words = TOKENPATTERN.findall(line) punctuation = TOKENPATTERN.split(line) for index in range(len(words)): word, punct = words[index], punctuation[index] pword = punctuation_token(punct) + capitalization(word) tokens[word.lower()] = tokens[word.lower()] + 1 ptokens[pword] = ptokens[pword] + 1 # always one more punctuation token than words pword = punctuation_token(punctuation[-1]) ptokens[pword] = ptokens[pword] + 1 return tokens, ptokens def punctuation_token(*args): "codify the punctuation according to a standard scheme" ptoken = args[0] if len(ptoken) == 0: return 'N' elif len(ptoken) == 1: return eolcode(ptoken) else: # compress punctuation by replacing multiple characters with number newptoken, current, count = '', '', 0 count = 0 for char in map(None, ptoken) + ['']: if char != current: if count > 0: if count == 1: newptoken = newptoken + eolcode(current) else: newptoken = newptoken + str(count) + eolcode(current) current, count = char, 1 else: count = count + 1 return newptoken def eolcode(*args): "codify any end-of-line characters" token, newtoken = args[0], '' for index in range(0, len(token)): if token[index] in os.linesep: newtoken = newtoken + chr(ord(token[index]) + 0x60) else: newtoken = newtoken + token[index] return newtoken def eoldecode(*args): "convert coded EOL characters back to their escape codes" coded = map(lambda c: chr(ord(c) + 0x60), os.linesep) decoded = map(None, os.linesep) token, newtoken = args[0], '' for index in range(0, len(token)): if token[index] in coded: newtoken = newtoken + decoded[coded.index(token[index])] else: newtoken = newtoken + token[index] return newtoken def detokenize(*args): char, pchar, tokens, ptokens = args #DebugPrint('detokenizing "%s" and "%s"' % (char, pchar)) if len(char): try: token = tokens[safe_ord(char) - ord(BASECHAR)] except: sys.stderr.write('No such char %s (%d)\n' % (repr(char), safe_ord(char))) raise else: token = '' punctuation = ptokens[safe_ord(pchar) - ord(BASECHAR)] capspattern = re.compile('[Cc][0-9;]*$') match = capspattern.search(punctuation) if match: punctuation = punctuation[0:match.start()] capitalization = match.group() else: capitalization = '' punctuation = punctuation.replace('N', '') nonpacked = PACKPATTERN.split(punctuation) packed = UNPACKPATTERN.findall(punctuation) #DebugPrint('nonpacked:', nonpacked, 'packed:', packed) punctuation = '' for index in range(0, len(packed)): punctuation += nonpacked[index] + (int(packed[index][0]) * packed[index][1]) punctuation += nonpacked[-1] return eoldecode(punctuation) + capitalize(token, capitalization) def load_tokens(*args): destdir = args[0] tokenfile = os.path.join(destdir, TOKENFILE) ptokenfile = os.path.join(destdir, PTOKENFILE) DebugPrint('loading tokenfiles %s and %s' % (tokenfile, ptokenfile)) try: tokens = map(str.strip, open(tokenfile).readlines()) ptokens = map(lambda s: s[:-1], open(ptokenfile, 'rb').readlines()) except: sys.stderr.write('Problem loading tokens from %s and %s\n' % ( tokenfile, ptokenfile)) raise return tokens, ptokens def dpcat(*args): """docpack version of unix 'cat' utility Usage: dpcat TOKENDIR FILEPATH [FILEPATH...] first arg must be the location of the token files, all following args are the filenames to be displayed""" if len(args) < 2: sys.stderr.write(dpcat.__doc__ + '\n') sys.exit(1) else: srcdir, filelist = args[0], args[1:] tokens, ptokens = load_tokens(srcdir) datadir = os.path.join(srcdir, DATA) os.chdir(datadir) for path in relpath(datadir, filelist): DebugPrint('displaying path %s' % path) lines = docunpack_file(srcdir, path, tokens, ptokens) sys.stdout.writelines(lines) def dpgrep(*args): """docpack version (very limited) of unix 'grep' utility Usage: dpgrep TOKENDIR PATTERN FILEPATH [FILEPATH...] first arg must be the location of the token files, second must be a word (token) for which to search (case-insensitive), and at least one file must be specified""" if len(args) < 3: sys.stderr.write(dpgrep.__doc__ + '\n') sys.exit(1) else: srcdir = args[0] tokens, ptokens = load_tokens(srcdir) stripdir = os.path.join(srcdir, DATA, '') os.chdir(stripdir) searchtoken = args[1] searchchar = compress_token(tokens, searchtoken) for path in args[2:]: #DebugPrint('searching for "%s" in %s' % (searchtoken, path)) if path.startswith(stripdir): path = path.split(stripdir)[1] lines = map(lambda s: s.decode('utf-8'), open(path).readlines()) if filter(lambda s: searchchar in s, lines): lines = docunpack_file(srcdir, path, tokens, ptokens) sys.stdout.writelines(map(lambda s: '%s: %s' % (path, s), filter(lambda s: searchtoken in s.lower(), lines))) def usage(*args): errorlevel, text = args if errorlevel: sys.stderr.write(text) sys.exit(errorlevel) else: sys.stdout.write(text) sys.exit(0) def docpack_files(*args): """pack ASCII text files into docpack format Usage: docpack_files SOURCE_DIRECTORY DESTINATION_DIRECTORY FILE [...]""" #DebugPrint('args', args) if len(args) == 1 and type(args[0]) == types.ListType: try: args = args[0][0], args[0][1], args[0][2:] except: raise; usage(1, docpack_files.__doc__) elif len(args) > 3: try: args = args[0], args[1], args[2:] except: usage(1, docpack_files.__doc__) elif len(args) == 3 and type(args[2]) == types.StringType: args[2] = [args[2]] try: srcdir, destdir, filelist = args except: usage(1, docpack_files.__doc__) if type(filelist) == types.StringType: filelist = [filelist] filelist = relpath(srcdir, filelist) tokens, ptokens = load_tokens(destdir) for filename in filelist: datafile, pdatafile = prepare_output(destdir, filename) if datafile: tokenlines, ptokenlines = docpack_file(filename) for index in range(0, len(tokenlines)): writecompressed(tokenlines[index], tokens, datafile) writecompressed(ptokenlines[index], ptokens, pdatafile) datafile.close(); pdatafile.close() def docunpack_files(*args): srcdir, destdir, filelist = args[0], args[1], args[2] try: tokens, ptokens = args[3], args[4] except: tokens, ptokens = load_tokens(srcdir) os.chdir(os.path.join(srcdir, DATA)) if type(filelist) == types.StringType: filelist = [filelist] for filename in filelist: lines = docunpack_file(srcdir, filename, tokens, ptokens) #DebugPrint('lines from docunpack_file', lines) textfile = os.path.join(destdir, filename) mkdirs(os.path.split(textfile)[0]) output = open(textfile, 'wb') output.writelines(lines) output.close() def mkdirs(*args): path = args[0] dirs = [path] while len(os.path.split(dirs[-1])[1]): dirs.append(os.path.split(dirs[-1])[0]) dirs.reverse() for dir in dirs: if not os.path.exists(dir): os.mkdir(dir) def docpack_file(*args): filename = args[0] DebugPrint('compressing file', filename) tokenlines, ptokenlines = [], [] for line in open(filename).readlines(): #DebugPrint('compressing line', line) words = TOKENPATTERN.findall(line) punctuation = TOKENPATTERN.split(line) tokenline, ptokenline = [], [] for index in range(len(words)): word, punct = words[index], punctuation[index] pword = punctuation_token(punct) + capitalization(word) tokenline.append(word.lower()) ptokenline.append(pword) # always one more punctuation token than words pword = punctuation_token(punctuation[-1]) ptokenline.append(pword) tokenlines.append(tokenline) ptokenlines.append(ptokenline) return tokenlines, ptokenlines def docunpack_file(*args): srcdir, filename = args[0], args[1] pdatadir = os.path.join(srcdir, PDATA) datadir = os.path.join(srcdir, DATA) DebugPrint('unpacking document %s' % filename) try: tokens, ptokens = args[2], args[3] except: tokens, ptokens = load_tokens(srcdir) DebugPrint('changing directory to %s' % datadir) os.chdir(os.path.join(datadir)) pfilename = os.path.join(pdatadir, filename) tokenlines = map(lambda s: s[:-1].decode('utf-8'), open(filename).readlines()) ptokenlines = map(lambda s: s[:-1].decode('utf-8'), open(pfilename).readlines()) lines = [] for index in range(0, len(tokenlines)): line = '' for char, pchar in map(None, unicode_array(tokenlines[index]) + [''], unicode_array(ptokenlines[index])): line = line + detokenize(char, pchar, tokens, ptokens) lines += [line] return lines def unicode_array(*args): string = args[0] if len(UNICODE_TEST) == 1: # this Python can handle unicode > 0x10000 return map(None, string) else: bytes = string.encode('utf-8') #DebugPrint('bytes', bytes) index, array = 0, [] while index < len(bytes): if ord(bytes[index]) >= 0xf0: array.append(bytes[index:index + 4].decode('utf-8')) index += 4 elif ord(bytes[index]) >= 0xe0: array.append(bytes[index:index + 3].decode('utf-8')) index += 3 elif ord(bytes[index]) >=0x80: array.append(bytes[index:index + 2].decode('utf-8')) index += 2 else: array.append(bytes[index].decode('utf-8')) index += 1 #DebugPrint('unicode_array', array) return array def prepare_output(*args): "put together pathname of output and create directories if necessary" destdir, filename = args datafile = os.path.abspath(os.path.join(destdir, DATA, filename)) pdatafile = os.path.abspath(os.path.join(destdir, PDATA, filename)) mkdirs(os.path.split(datafile)[0]) mkdirs(os.path.split(pdatafile)[0]) if os.path.exists(pdatafile): return None, None else: return open(datafile, 'wb'), open(pdatafile, 'wb') def safe_unichr(*args): charvalue = int(args[0]) if charvalue >= 0xd800: charvalue += 0x800 # skip "ill-formed" characters try: char = unichr(charvalue) except ValueError: #DebugPrint('narrow-build Python simulating unichr for value %x' % charvalue) char = eval("u'\\U%08x'" % charvalue) #DebugPrint('char for', charvalue, 'is', char) return char def safe_ord(*args): char = args[0] if len(char) == 1: index = ord(char) elif len(char.encode('utf-8')) != 4: raise ValueError, 'cannot convert unicode character "%s"' % repr(char) else: #DebugPrint('narrow-build Python decoding %s' % repr(char)) b1, b2, b3, b4 = map(ord, char.encode('utf-8')) index = ((b1 & 0x7) << 18) | ((b2 & 0x3f) << 12) | \ ((b3 & 0x3f) << 6) | ((b4 & 0x3f)) #DebugPrint('safe_ord(%s) = %d' % (char, index)) if index >= 0xd800: index = index - 0x800 # skip "ill-formed" unicode return index def writecompressed(*args): line, tokenlist, outfile = args #DebugPrint('compressing %s to %s' % (line, outfile)) outline = u'' for token in line: try: outline = outline + compress_token(tokenlist, token) #DebugPrint('outline', outline) except: DebugPrint('token "%s" not in tokenlist' % token) raise #DebugPrint('writing', line, 'as', outline) outfile.write(('%s\n' % outline.encode('utf-8'))) def compress_token(*args): tokenlist, token = args return safe_unichr(tokenlist.index(token) + ord(BASECHAR)) def packinit(*args): try: src = args[0] except: sys.stderr.write( 'Usage: %s SOURCE_DIRECTORY [DESTINATION_DIRECTORY]\n' % command) sys.exit(1) try: dest = os.path.abspath(args[1]) except: dest = os.path.abspath(os.path.join(src, '.docpack')) return src, dest def unpackinit(*args): try: src = args[0] except: sys.stderr.write( 'Usage: %s SOURCE_DIRECTORY [DESTINATION_DIRECTORY]\n' % command) sys.exit(1) try: dest = os.path.abspath(args[1]) except: dest = os.path.split(dest)[0] return src, dest def relpath(*args): "trim off front of path so we don't overwrite sources!" if len(args) == 2 and type(args[1]) != types.StringType: srcpath, filelist = args else: srcpath, filelist = args[0], args[1:] srcpath = os.path.abspath(srcpath) filelist = map(os.path.abspath, filelist) srcarray = os.path.split(srcpath) srcpath = os.path.join(srcarray[0], srcarray[1], '') filelist = map(lambda path: path.split(srcpath)[1], filelist) return filelist def docpack(*args): "pack entire source tree into destination tree" src, dest = packinit(*args) sys.stderr.write('Writing compressed files to %s\n' % dest) os.chdir(src) filelist, tokenfiles_exist = [], False os.path.walk('.', getfiles, filelist) if not os.path.exists(dest): os.mkdir(dest) if not os.path.exists(os.path.join(dest, PTOKENFILE)): tokens, ptokens = extract_tokens(filelist) writekeys(os.path.join(dest, TOKENFILE), tokens) writekeys(os.path.join(dest, PTOKENFILE), ptokens) tokens, ptokens = {}, {} # hopefully allows them to be garbage-collected docpack_files(src, dest, filelist) def docunpack(*args): src, dest = unpackinit(*args) os.chdir(os.path.join(src, DATA)) filelist = [] os.path.walk('.', getfiles, filelist) try: os.mkdir(dest) except: sys.stderr.write( 'Destination path %s already exists, try another or delete\n' % dest) sys.exit(1) tokens, ptokens = load_tokens(src) docunpack_files(src, dest, filelist, tokens, ptokens) def getfiles(*args): filelist, dirname, fnames = args for fname in fnames: path = os.path.join(dirname, fname) if not os.path.isdir(path): filelist.append(path) if __name__ == '__main__': # if this program was imported by another, the above test will fail, # and this following code won't be used... function = command; args = sys.argv[1:] # default case if command == originalself: try: if len(args) and eval('type(%s) == types.FunctionType' % args[0]): function = sys.argv[1]; args = sys.argv[2:] except: pass if len(args) < 256: action = '%s%s' % (function, repr(tuple(args))) else: action = '%s(%s)' % (function, repr(args)) print eval(action) or '' else: # if you want something to be done on import, do it here; otherwise pass pass