picseal_pub/libs/jpg_bin.py

257 lines
6.6 KiB
Python

#
# This Class handles the binary parsing of a jpg file
#
#
# 0xFF,0xD8 - Start of Image (0xFFD8FF to be used as JPG file marker)
# 0xFF,0xEn - App Exif data
# 0xFF,0xDB - DQT - Define Quantization Table
# 0xFF,0xDD - DRI - Define Restart Interval
# 0xFF,0xFE - Comments
# 0xFF,0xC0 - SOF0 - Start of Frame
# 0xFF,0xC2 - SOF2 - Start of Frame
# 0xFF,0xC4 - DHT - Define Huffman Tables
# 0xFF,0xDA - SOS - Start of Scan
# 0xFF,0xDn - RST - Restart (n=0..7)
# 0xFF,0xD9 - EOI - End of Image
#
# high,low - length
#
# https://en.wikipedia.org/wiki/JPEG
import re
import struct
import logging
from libs.jpg_fp import JpgFingerprint
#
class JpgBin:
BUF_CHUNK_SIZE = 2048
def __init__(self):
self.data_buf = None
self.data_idx = 0
self.data_len = 0
self.fh = None
# self.hh = None
self.continue_process = True
self.metadata_h = {}
self.img_bin_h = {}
self.prev_fpos = 0
self.prev_mhex = 0xdead
self.prev_mstr = "DUH!"
self.prev_imgData = False
self.jpg_fp = JpgFingerprint()
#
# check for JPG file type marker
# add the marker to the processing variables to be used later
#
def __isJPG(self):
(m1, m2) = struct.unpack('>HB', self.data_buf[0:3])
if (0xffd8 == m1 and 0xff == m2):
self.data_idx = 2
self.prev_mhex = 0xffd8
self.prev_mstr = "SOI "
self.prev_fpos = 0
self.prev_imgData = False
return True
return False
#
def processFile(self, file_h):
self.fh = file_h
self.__getMoreBytes(True)
if (self.data_buf):
if (not self.__isJPG()):
return False
while(self.continue_process):
self.findAllMarkers()
self.__getMoreBytes()
return True
return False
#
def genHash(self, hash_h):
self.fh.seek(0)
for marker in self.jpg_fp.markers_img:
cpos = self.fh.tell()
if (marker.fpos != cpos):
self.fh.seek(marker.fpos)
buf = self.fh.read(marker.len)
hash_h.update(buf)
img_hash = hash_h.digest()
return img_hash
#
def findAllMarkers(self):
(word_b,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2])
hex_str = word_b.to_bytes(2, 'big').hex()
# RST 0xD(n) (n==0..7)
if (0xffd0 == word_b or 0xffd1 == word_b or 0xffd2 == word_b or 0xffd3 == word_b or 0xffd4 == word_b or 0xffd5 == word_b or 0xffd6 == word_b or 0xffd7 == word_b):
logging.info("[ {} : RST ]".format(hex_str))
self.markerRST(word_b)
# Comments section
elif (0xfffe == word_b):
logging.info("[ {} : Comment ]".format(hex_str))
self.markerComment(word_b)
# DQT
elif (0xffdb == word_b):
logging.info("[ {} : DQT ]".format(hex_str))
self.markerDQT(word_b)
# DRI
elif (0xffdd == word_b):
logging.info("[ {} : DRI ]".format(hex_str))
self.markerDRI(word_b)
# SOF0 - Start of Frame 0
elif (0xffc0 == word_b):
logging.info("[ {} : SOF0 ]".format(hex_str))
self.markerSOF0(word_b)
# SOF2 - Start of Frame 2
elif (0xffc2 == word_b):
logging.info("[ {} : SOF2 ]".format(hex_str))
self.markerSOF2(word_b)
# DHT
elif (0xffC4 == word_b):
logging.info("[ {} : DHT ]".format(hex_str))
self.markerDHT(word_b)
# SOS
elif (0xffda == word_b):
logging.info("[ {} : SOS ]".format(hex_str))
self.markerSOS(word_b)
# EOI
elif (0xffd9 == word_b):
logging.info("[ {} : EOI ]".format(hex_str))
self.markerEOI(word_b)
# APP 0xE(n) - App Exif Data
# struct.pack(">H", intt).hex()
elif ( re.search(r'ffe.', hex_str ) ):
logging.info("[ {} : App Data ]".format(hex_str))
self.markerAppData(word_b)
else:
self.data_idx += 1
#
# Image Metadata, Exif
#
#
def markerAppData(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "APP ", False)
rec_len = self.__calcSeekBytes()
#
def markerComment(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "COM ", False)
rec_len = self.__calcSeekBytes()
#
# Image Data
#
#
def markerSOS(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "SOS ")
#
def markerRST(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "RST ")
#
def markerDQT(self, marker_hex):
rec_len = self.__calcSeekBytes()
self.__addPrevMarkerData(marker_hex, "DQT ")
#
def markerDRI(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "DRI ")
#
def markerSOF0(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "SOF0")
rec_len = self.__calcSeekBytes()
#
def markerSOF2(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "SOF2")
rec_len = self.__calcSeekBytes()
def markerDHT(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "DHT ")
rec_len = self.__calcSeekBytes()
#
def markerEOI(self, marker_hex):
self.__addPrevMarkerData(marker_hex, "EOI ")
self.__addPrevMarkerData(marker_hex, "JUST A DUMMY VALUE")
# private helper function
def __addPrevMarkerData(self, mhex, mstr, imgData=True):
fpos = self.fh.tell()
cur_fpos = (fpos - (self.data_len - self.data_idx))
rec_len = cur_fpos - self.prev_fpos
if (self.prev_imgData):
self.jpg_fp.addImgData(self.prev_mhex, self.prev_fpos, rec_len, self.prev_mstr)
else:
self.jpg_fp.addImgMetadata(self.prev_mhex, self.prev_fpos, rec_len, self.prev_mstr)
self.prev_mhex = mhex
self.prev_mstr = mstr
self.prev_fpos = cur_fpos
self.data_idx += 2
self.prev_imgData = imgData
#
def __getMoreBytes(self, force_bytes=False):
if (self.data_idx >= (self.data_len-1) or force_bytes):
self.data_buf = self.fh.read(self.BUF_CHUNK_SIZE)
self.data_idx = 0
self.data_len = len(self.data_buf)
if (0 == self.data_len):
self.continue_process = False
logging.debug("DATA: len=={}".format(self.data_len))
#
# move the index 2 bytes, then read the 2 bytes to get the length
#
def __calcSeekBytes(self):
self.data_idx += 2
(rec_len,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2])
remain_bytes = self.data_len - self.data_idx
if (rec_len >= remain_bytes):
rec_diff = rec_len - remain_bytes
self.__seekBytes(rec_diff)
else:
self.data_idx += rec_len
return rec_len
#
def __seekBytes(self, num_bytes):
pos = self.fh.seek(num_bytes, 1)
self.__getMoreBytes(True)
logging.debug("SEEK: seek=={}, cur_loc=={}".format(num_bytes, pos))
return pos
#
def printMarkerImg(self):
return self.jpg_fp.printImgMarkers()
#
def printMarkerMeta(self):
return self.jpg_fp.printMDMarkers()
#
def findMarker(self, marker):
pass
#
def __repr__(self):
return repr(self.jpg_fp)