# # This Class handles the binary parsing of a jpg file # # # 0xFF,0xD8 - Start of Image (0xFFD8FF to be used as JPG file marker) # 0xFF,0xEn - App Exif data # 0xFF,0xDB - DQT - Define Quantization Table # 0xFF,0xDD - DRI - Define Restart Interval # 0xFF,0xFE - Comments # 0xFF,0xC0 - SOF0 - Start of Frame # 0xFF,0xC2 - SOF2 - Start of Frame # 0xFF,0xC4 - DHT - Define Huffman Tables # 0xFF,0xDA - SOS - Start of Scan # 0xFF,0xDn - RST - Restart (n=0..7) # 0xFF,0xD9 - EOI - End of Image # # high,low - length # # https://en.wikipedia.org/wiki/JPEG import re import struct import logging from libs.jpg_fp import JpgFingerprint from libs.jpg_picseal import JpgPicSeal # class JpgBin: BUF_CHUNK_SIZE = 8192 soi_marker = b'\xff\xd8' eof_marker = b'\xff\xd9' def __init__(self): self.data_buf = None self.data_idx = 0 self.data_len = 0 self.fh = None # self.hh = None self.continue_process = True self.metadata_h = {} self.img_bin_h = {} self.prev_fpos = 0 self.prev_mhex = 0xdead self.prev_mstr = "DUH!" self.prev_imgData = False self.jpg_fp = JpgFingerprint() self.picseal = None # # check for JPG file type marker # add the marker to the processing variables to be used later # def __isJPG(self): (m1, m2) = struct.unpack('>HB', self.data_buf[0:3]) if (0xffd8 == m1 and 0xff == m2): # set these variables to be used in the marker insertion loop self.data_idx = 2 self.prev_fpos = 0 self.prev_imgData = None return True return False # def processFile(self, file_h): self.fh = file_h self.__getMoreBytes(True) if (self.data_buf): if (not self.__isJPG()): return False while(self.continue_process): self.findAllMarkers() self.__getMoreBytes() return True return False # def genHash(self, hash_h): self.fh.seek(0) for marker in self.jpg_fp.markers_img: cpos = self.fh.tell() if (marker.fpos != cpos): self.fh.seek(marker.fpos) buf = self.fh.read(marker.len) hash_h.update(buf) img_hash = hash_h.digest() return img_hash # def findAllMarkers(self): (word_b,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2]) hex_str = word_b.to_bytes(2, 'big').hex() # RST 0xD(n) (n==0..7) if (0xffd0 == word_b or 0xffd1 == word_b or 0xffd2 == word_b or 0xffd3 == word_b or 0xffd4 == word_b or 0xffd5 == word_b or 0xffd6 == word_b or 0xffd7 == word_b): logging.info("[ {} : RST ]".format(hex_str)) self.markerRST(word_b) # Comments section elif (0xfffe == word_b): logging.info("[ {} : Comment ]".format(hex_str)) self.markerComment(word_b) # DQT elif (0xffdb == word_b): logging.info("[ {} : DQT ]".format(hex_str)) self.markerDQT(word_b) # DRI elif (0xffdd == word_b): logging.info("[ {} : DRI ]".format(hex_str)) self.markerDRI(word_b) # SOF0 - Start of Frame 0 elif (0xffc0 == word_b): logging.info("[ {} : SOF0 ]".format(hex_str)) self.markerSOF0(word_b) # SOF2 - Start of Frame 2 elif (0xffc2 == word_b): logging.info("[ {} : SOF2 ]".format(hex_str)) self.markerSOF2(word_b) # DHT elif (0xffC4 == word_b): logging.info("[ {} : DHT ]".format(hex_str)) self.markerDHT(word_b) # SOS elif (0xffda == word_b): logging.info("[ {} : SOS ]".format(hex_str)) self.markerSOS(word_b) # EOI elif (0xffd9 == word_b): logging.info("[ {} : EOI ]".format(hex_str)) self.markerEOI(word_b) # APP 0xE(n) - App Exif Data # struct.pack(">H", intt).hex() elif ( re.search(r'ffe.', hex_str ) ): logging.info("[ {} : App Data ]".format(hex_str)) self.markerAppData(word_b) else: self.data_idx += 1 # # Image Metadata, Exif # # def markerAppData(self, marker_hex): seek = True self.__addPrevMarkerData(marker_hex, "APP ", False) if (0xffef == marker_hex): if (self.__processPicSeal()): seek = False if (seek): (rec_len, prev_buf) = self.__calcSeekBytes() logging.info("length=={}".format(str(rec_len))) # def markerComment(self, marker_hex): self.__addPrevMarkerData(marker_hex, "COM ", False) self.__calcSeekBytes() # # Image Data # # def markerSOS(self, marker_hex): self.__addPrevMarkerData(marker_hex, "SOS ") self.data_idx += 2 # def markerRST(self, marker_hex): self.__addPrevMarkerData(marker_hex, "RST ") self.data_idx += 2 # def markerDQT(self, marker_hex): self.__addPrevMarkerData(marker_hex, "DQT ") self.__calcSeekBytes() # def markerDRI(self, marker_hex): self.__addPrevMarkerData(marker_hex, "DRI ") self.data_idx += 4 # def markerSOF0(self, marker_hex): self.__addPrevMarkerData(marker_hex, "SOF0") self.__calcSeekBytes() # def markerSOF2(self, marker_hex): self.__addPrevMarkerData(marker_hex, "SOF2") self.__calcSeekBytes() def markerDHT(self, marker_hex): self.__addPrevMarkerData(marker_hex, "DHT ") self.__calcSeekBytes() # # end of file marker is never added to the marker array list (same as start of image marker) # def markerEOI(self, marker_hex): self.__addPrevMarkerData(marker_hex, "DUMMY VALUE ") self.data_idx += 2 # private helper function def __addPrevMarkerData(self, mhex, mstr, imgData=True): fpos = self.fh.tell() cur_fpos = (fpos - (self.data_len - self.data_idx)) rec_len = cur_fpos - self.prev_fpos if (self.prev_imgData is not None): if (self.prev_imgData): self.jpg_fp.addImgData(self.prev_mhex, self.prev_fpos, rec_len, self.prev_mstr) else: self.jpg_fp.addImgMetadata(self.prev_mhex, self.prev_fpos, rec_len, self.prev_mstr) self.prev_mhex = mhex self.prev_mstr = mstr self.prev_fpos = cur_fpos self.prev_imgData = imgData # def __getMoreBytes(self, force_bytes=False): if (self.data_idx >= (self.data_len-1) or force_bytes): self.data_buf = self.fh.read(self.BUF_CHUNK_SIZE) self.data_idx = 0 self.data_len = len(self.data_buf) if (0 == self.data_len): self.continue_process = False logging.debug("DATA: len=={}".format(self.data_len)) # # move the index 2 bytes, then read the 2 bytes to get the length # def __calcSeekBytes(self): prev_buf = None self.data_idx += 2 (rec_len,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2]) remain_bytes = self.data_len - self.data_idx if (rec_len >= remain_bytes): prev_buf = self.data_buf[self.data_idx:] rec_diff = rec_len - remain_bytes self.__seekBytes(rec_diff) else: self.data_idx += rec_len return rec_len, prev_buf # def __seekBytes(self, num_bytes): pos = self.fh.seek(num_bytes, 1) self.__getMoreBytes(True) logging.debug("SEEK: seek=={}, cur_loc=={}".format(num_bytes, pos)) return pos # def __getBuf(self): new_buf = None (rec_len, prev_buf) = self.__calcSeekBytes() if (prev_buf): remain_bytes = rec_len - len(prev_buf) new_buf = prev_buf.join(self.data_buf[:remain_bytes]) pass else: new_buf = self.data_buf[(self.data_idx-rec_len):self.data_idx] return new_buf # # [app_rec_header:2|len:2|picseal_header:7] # def __processPicSeal(self): rec_hdr = 4 ps_hdr_size = rec_hdr+len(JpgPicSeal.picseal_marker) remain_buf = self.data_len-(self.data_idx+ps_hdr_size) if (remain_buf > ps_hdr_size): self.picseal = JpgPicSeal() if (self.picseal.isPicSeal(self.data_buf[self.data_idx+rec_hdr:self.data_idx+ps_hdr_size])): # # calculate size, check buffer, maybe read more bytes from file # buf = self.__getBuf() retval = self.picseal.deserialize(buf) return True return False # def printMarkerImg(self): return self.jpg_fp.printImgMarkers() # def printMarkerMeta(self): return self.jpg_fp.printMDMarkers() # def findMarker(self, marker): pass # def __repr__(self): return repr(self.jpg_fp)