Source code for bbarchivist.hashutils

#!/usr/bin/env python3
"""This module is used to generate file hashes/checksums."""

import concurrent.futures  # parallelization
import hashlib  # all other hashes
import hmac  # escreens is a hmac, news at 11
import os  # path work
import zlib  # crc32/adler32

from bbarchivist import bbconstants  # premade stuff
from bbarchivist import exceptions  # exceptions
from bbarchivist import iniconfig  # config parsing
from bbarchivist import utilities  # cores

__author__ = "Thurask"
__license__ = "WTFPL v2"
__copyright__ = "2015-2018 Thurask"


[docs]def zlib_hash(filepath, method, blocksize=16 * 1024 * 1024): """ Return zlib-based (i.e. CRC32/Adler32) checksum of a file. :param filepath: File you wish to verify. :type filepath: str :param method: "crc32" or "adler32". :type method: str :param blocksize: How much of file to read at once. Default is 16MB. :type blocksize: int """ hashfunc, seed = zlib_handler(method) with open(filepath, 'rb') as file: for chunk in iter(lambda: file.read(blocksize), b''): seed = hashfunc(chunk, seed) final = format(seed & 0xFFFFFFFF, "08x") return final
[docs]def zlib_handler(method): """ Prepare hash method and seed depending on CRC32/Adler32. :param method: "crc32" or "adler32". :type method: str """ hashfunc = zlib.crc32 if method == "crc32" else zlib.adler32 seed = 0 if method == "crc32" else 1 return hashfunc, seed
[docs]def hashlib_hash(filepath, engine, blocksize=16 * 1024 * 1024): """ Return MD5/SHA-1/SHA-2/SHA-3 hash of a file. :param filepath: File you wish to verify. :type filepath: str :param engine: Hash object to update with file contents. :type engine: _hashlib.HASH :param blocksize: How much of file to read at once. Default is 16MB. :type blocksize: int """ hashfunc_reader(filepath, engine, blocksize) return engine.hexdigest()
[docs]def hashfunc_reader(filepath, engine, blocksize=16 * 1024 * 1024): """ Generate hash from file contents. :param filepath: File you wish to verify. :type filepath: str :param engine: Hash object to update with file contents. :type engine: _hashlib.HASH :param blocksize: How much of file to read at once. Default is 16MB. :type blocksize: int """ with open(filepath, 'rb') as file: while True: data = file.read(blocksize) if not data: break engine.update(data)
[docs]def ssl_hash(filepath, method, blocksize=16 * 1024 * 1024): """ Return SSL-library dependent hash of a file. :param filepath: File you wish to verify. :type filepath: str :param method: Method to use: algorithms in hashlib that are not guaranteed. :type method: str :param blocksize: How much of file to read at once. Default is 16MB. :type blocksize: int """ try: engine = hashlib.new(method) hashfunc_reader(filepath, engine, blocksize) return engine.hexdigest() except ValueError as exc: msg = "{0} HASH FAILED".format(method.upper()) exceptions.handle_exception(exc, msg, None)
[docs]def calculate_escreens(pin, app, uptime, duration=30): """ Calculate key for the Engineering Screens based on input. :param pin: PIN to check. 8 character hexadecimal, lowercase. :type pin: str :param app: App version. 10.x.y.zzzz. :type app: str :param uptime: Uptime in ms. :type uptime: str :param duration: 1, 3, 6, 15, 30 (days). :type duration: str """ #: Somehow, values for lifetimes for escreens. lifetimes = { 1: "", 3: "Hello my baby, hello my honey, hello my rag time gal", 7: "He was a boy, and she was a girl, can I make it any more obvious?", 15: "So am I, still waiting, for this world to stop hating?", 30: "I love myself today, not like yesterday. I'm cool, I'm calm, I'm gonna be okay" } #: Escreens magic HMAC secret. ehmac = 'Up the time stream without a TARDIS' duration = int(duration) if duration not in [1, 3, 6, 15, 30]: duration = 1 data = pin.lower() + app + str(uptime) + lifetimes[duration] newhmac = hmac.new(ehmac.encode(), data.encode(), digestmod=hashlib.sha1) key = newhmac.hexdigest()[:8] return key.upper()
[docs]def get_hashfunc(hashtype): """ Get genericized hash function from hash type. :param hashtype: Hash type. :type hashtype: str """ hashfuncs = {"adler32": zlib_hash, "crc32": zlib_hash, "md4": ssl_hash, "sha0": ssl_hash, "ripemd160": ssl_hash, "whirlpool": ssl_hash, "md5": hashlib_hash, "sha1": hashlib_hash, "sha224": hashlib_hash, "sha256": hashlib_hash, "sha384": hashlib_hash, "sha512": hashlib_hash, "sha3224": hashlib_hash, "sha3256": hashlib_hash, "sha3384": hashlib_hash, "sha3512": hashlib_hash} return hashfuncs[hashtype]
[docs]def get_engine(hashtype): """ Get hashlib engine from hash type. :param hashtype: Hash type. :type hashtype: str """ hashengines = {"md5": hashlib.md5(), "sha1": hashlib.sha1(), "sha224": hashlib.sha224(), "sha256": hashlib.sha256(), "sha384": hashlib.sha384(), "sha512": hashlib.sha512()} if utilities.new_enough(6): hashengines.update({"sha3224": hashlib.sha3_224(), "sha3256": hashlib.sha3_256(), "sha3384": hashlib.sha3_384(), "sha3512": hashlib.sha3_512()}) return hashengines[hashtype]
[docs]def hash_get(filename, hashfunc, hashtype, workingdir, blocksize=16777216): """ Generate and pretty format the hash result for a file. :param filename: File to hash. :type filename: str :param hashfunc: Hash function to use. :type hashfunc: function :param hashtype: Hash type. :type hashtype: str :param workingdir: Working directory. :type workingdir: str :param blocksize: Block size. Default is 16MB. :type blocksize: int """ if hashfunc == hashlib_hash: method = get_engine(hashtype) else: method = hashtype result = hashfunc(os.path.join(workingdir, filename), method, blocksize) return "{0} {1}\n".format(result.upper(), os.path.basename(filename))
[docs]def base_hash(hashtype, source, workingdir, block, target, kwargs=None): """ Generic hash function; get hash, write to file. :param hashtype: Hash type. :type hashtype: str :param source: File to be hashed; foobar.ext :type source: str :param workingdir: Path containing files you wish to verify. :type workingdir: str :param block: Blocksize, in bytes. :type block: int :param target: File to write to. :type target: file :param kwargs: Values. Refer to `:func:verifier_config_loader`. :type kwargs: dict """ if kwargs[hashtype]: hash_generic = [hashtype.upper()] hashfunc = get_hashfunc(hashtype) hashtype2 = "sha" if hashtype == "sha0" else hashtype hash_generic.append(hash_get(source, hashfunc, hashtype2, workingdir, block)) target.write("\n".join(hash_generic))
[docs]def hash_writer(source, dest, workingdir, kwargs=None): """ Write per-file hashes. :param source: File to be hashed; foobar.ext :type source: str :param dest: Destination file; foobar.ext.cksum :type dest: str :param workingdir: Path containing files you wish to verify. :type workingdir: str :param kwargs: Values. Refer to `:func:verifier_config_loader`. :type kwargs: dict """ block = int(kwargs['blocksize']) with open(dest, 'w') as target: base_hash("adler32", source, workingdir, block, target, kwargs) base_hash("crc32", source, workingdir, block, target, kwargs) base_hash("md4", source, workingdir, block, target, kwargs) base_hash("md5", source, workingdir, block, target, kwargs) base_hash("sha0", source, workingdir, block, target, kwargs) base_hash("sha1", source, workingdir, block, target, kwargs) base_hash("sha224", source, workingdir, block, target, kwargs) base_hash("sha256", source, workingdir, block, target, kwargs) base_hash("sha384", source, workingdir, block, target, kwargs) base_hash("sha512", source, workingdir, block, target, kwargs) base_hash("ripemd160", source, workingdir, block, target, kwargs) base_hash("whirlpool", source, workingdir, block, target, kwargs) if utilities.new_enough(6): base_hash("sha3224", source, workingdir, block, target, kwargs) base_hash("sha3256", source, workingdir, block, target, kwargs) base_hash("sha3384", source, workingdir, block, target, kwargs) base_hash("sha3512", source, workingdir, block, target, kwargs)
[docs]def filefilter(file, workingdir, extras=()): """ Check if file in folder is a folder, or if it's got a forbidden extension. :param file: File to be hashed. :type file: str :param workingdir: Path containing files you wish to verify. :type workingdir: str :param extras: Tuple of extra extensions. :type extras: tuple """ return not (os.path.isdir(os.path.join(workingdir, file)) or file.endswith(bbconstants.SUPPS + extras))
[docs]def prep_verifier(ldir, selective=False): """ Prepare files for verifier function. :param ldir: Path containing files you wish to verify. :type ldir: str :param selective: Filtering filenames/extensions. Default is false. :type selective: bool """ exts = (".txt",) if selective else () fxs = [os.path.join(ldir, afx) for afx in os.listdir(ldir) if filefilter(afx, ldir, exts)] return fxs
[docs]def verifier(ldir, kwargs=None, selective=False): """ For all files in a directory, perform various hash/checksum functions. Take dict to define hashes, write output to a/individual .cksum file(s). :param ldir: Path containing files you wish to verify. :type ldir: str :param kwargs: Values. Refer to `:func:verifier_config_loader`. :type kwargs: dict :param selective: Filtering filenames/extensions. Default is false. :type selective: bool """ kwargs = verifier_config_loader() if kwargs is None else kwargs fxs = prep_verifier(ldir, selective) with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.cpu_workers(fxs)) as xec: for file in fxs: verifier_individual(xec, ldir, file, kwargs)
[docs]def verifier_individual(xec, ldir, file, kwargs): """ Individually verify files through a ThreadPoolExecutor. :param xec: ThreadPoolExecutor instance. :type xec: concurrent.futures.ThreadPoolExecutor :param ldir: Path containing files you wish to verify. :type ldir: str :param file: Filename. :type file: str :param kwargs: Values. Refer to `:func:verifier_config_loader`. :type kwargs: dict """ print("HASHING:", os.path.basename(file)) basename = file + ".cksum" targetname = os.path.join(ldir, basename) try: xec.submit(hash_writer, file, targetname, ldir, kwargs) except Exception as exc: exceptions.handle_exception(exc)
[docs]def verifier_config_loader(homepath=None): """ Read a ConfigParser file to get hash preferences. :param homepath: Folder containing ini file. Default is user directory. :type homepath: str """ ini = iniconfig.generic_loader("hashmodes", homepath) results = {} results['crc32'] = bool(ini.getboolean('crc32', fallback=False)) results['adler32'] = bool(ini.getboolean('adler32', fallback=False)) results['sha0'] = bool(ini.getboolean('sha0', fallback=False)) results['sha1'] = bool(ini.getboolean('sha1', fallback=True)) results['sha224'] = bool(ini.getboolean('sha224', fallback=False)) results['sha256'] = bool(ini.getboolean('sha256', fallback=True)) results['sha384'] = bool(ini.getboolean('sha384', fallback=False)) results['sha512'] = bool(ini.getboolean('sha512', fallback=False)) results['md5'] = bool(ini.getboolean('md5', fallback=True)) results['md4'] = bool(ini.getboolean('md4', fallback=False)) results['ripemd160'] = bool(ini.getboolean('ripemd160', fallback=False)) results['whirlpool'] = bool(ini.getboolean('whirlpool', fallback=False)) results['blocksize'] = int(ini.getint('blocksize', fallback=16777216)) results['sha3224'] = bool(ini.getboolean('sha3224', fallback=False)) results['sha3256'] = bool(ini.getboolean('sha3256', fallback=False)) results['sha3384'] = bool(ini.getboolean('sha3384', fallback=False)) results['sha3512'] = bool(ini.getboolean('sha3512', fallback=False)) return results
[docs]def verifier_config_writer(resultdict=None, homepath=None): """ Write a ConfigParser file to store hash preferences. :param resultdict: Dictionary of results: {method, bool} :type resultdict: dict({str, bool}) :param homepath: Folder containing ini file. Default is user directory. :type homepath: str """ if resultdict is None: resultdict = verifier_config_loader() results = {method: str(flag).lower() for method, flag in resultdict.items()} iniconfig.generic_writer("hashmodes", results, homepath)