# # # # # 0xFF,0xD8 - Start of Image (0xFFD8FF to be used as JPG file marker) # 0xFF,0xEn - App Exif data # 0xFF,0xDB - DQT - Define Quantization Table # 0xFF,0xDD - DRI - Define Restart Interval # 0xFF,0xFE - Comments # 0xFF,0xC0 - SOF0 - Start of Frame # 0xFF,0xC2 - SOF2 - Start of Frame # 0xFF,0xC4 - DHT - Define Huffman Tables # 0xFF,0xDA - SOS - Start of Scan # 0xFF,0xDn - RST - Restart (n=0..7) # 0xFF,0xD9 - EOI - End of Image # # high,low - length # # https://en.wikipedia.org/wiki/JPEG import re import struct import logging # class JpgBin: BUF_CHUNK_SIZE = 2048 data_buf = None data_idx = 0 data_len = 0 fh = None hh = None markers = { 'SOS': 0xffd9 } continue_process = True metadata_h = {} img_bin_h = {} def __init__(self): pass # def __isJPG(self): (m1, m2) = struct.unpack('>HB', self.data_buf[0:3]) if (0xffd8 == m1 and 0xff == m2): self.data_idx = 2 return True return False # def processFile(self, file_h): self.fh = file_h self.getMoreBytes(True) if (self.data_buf): if (not self.__isJPG()): return False while(self.continue_process): self.findAllMarker() self.getMoreBytes() return True return False # def genHash(self, file_h, hash_h): self.hh = hash_h self.processFile(file_h) while(self.continue_process): if (self.findMarker(self.makers['SOS'])): self.genImgHash() self.getMoreBytes() return self.hh # def genImgHash(self): self.hh.update(self.data_buf[self.data_idx:]) while(self.continue_process): self.getMoreBytes() self.hh.update(self.data_buf) # def getMoreBytes(self, force_bytes=False): if (self.data_idx >= (self.data_len-1) or force_bytes): self.data_buf = self.fh.read(self.BUF_CHUNK_SIZE) self.data_idx = 0 self.data_len = len(self.data_buf) if (0 == self.data_len): self.continue_process = False logging.debug("DATA: len=={}".format(self.data_len)) # def calcSeekBytes(self): self.data_idx += 2 (rec_len,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2]) remain_bytes = self.data_len - self.data_idx if (rec_len >= remain_bytes): rec_diff = rec_len - remain_bytes self.seekBytes(rec_diff) else: self.data_idx += rec_len # def seekBytes(self, num_bytes): pos = self.fh.seek(num_bytes, 1) logging.debug("SEEK: seek=={}, cur_loc=={}".format(num_bytes, pos)) self.getMoreBytes(True) return pos # def findMarker(self, marker): pass # def findAllMarker(self): (word_b,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2]) hex_str = word_b.to_bytes(2, 'big').hex() # RST 0xD(n) (n==0..7) if (0xffd0 == word_b or 0xffd1 == word_b or 0xffd2 == word_b or 0xffd3 == word_b or 0xffd4 == word_b or 0xffd5 == word_b or 0xffd6 == word_b or 0xffd7 == word_b): logging.info("[ {} : RST ]".format(hex_str)) self.markerRST() # Comments section elif (0xfffe == word_b): logging.info("[ {} : Comment ]".format(hex_str)) self.markerComment() # DQT elif (0xffdb == word_b): logging.info("[ {} : DQT ]".format(hex_str)) self.markerDQT() # DRI elif (0xffdd == word_b): logging.info("[ {} : DRI ]".format(hex_str)) self.markerDRI() # SOF0 - Start of Frame 0 elif (0xffc0 == word_b): logging.info("[ {} : SOF0 ]".format(hex_str)) self.markerSOF0() # SOF2 - Start of Frame 2 elif (0xffc2 == word_b): logging.info("[ {} : SOF2 ]".format(hex_str)) self.markerSOF2() # DHT elif (0xffC4 == word_b): logging.info("[ {} : DHT ]".format(hex_str)) self.markerDHT() # SOS elif (0xffda == word_b): logging.info("[ {} : SOS ]".format(hex_str)) self.markerSOS() # EOI elif (0xffd9 == word_b): logging.info("[ {} : EOI ]".format(hex_str)) self.markerEOI() # APP 0xE(n) - App Exif Data # struct.pack(">H", intt).hex() elif ( re.search(r'ffe.', hex_str ) ): logging.info("[ {} : App Data ]".format(hex_str)) self.markerAppData() else: self.data_idx += 1 # # Image Metadata, Exif # # def markerAppData(self): self.calcSeekBytes() # def markerComment(self): self.calcSeekBytes() # # Image Data # # def markerSOS(self): self.calcSeekBytes() def markerRST(self): self.data_idx += 2 def markerDQT(self): self.calcSeekBytes() # def markerDRI(self): self.data_idx += 4 # def markerSOF0(self): self.calcSeekBytes() # def markerSOF2(self): self.calcSeekBytes() def markerDHT(self): self.calcSeekBytes() def markerEOI(self): self.data_idx += 2 # self.continue_process = False def __repr__(self): pass def findMarkers222(self): last_idx = len(self.data_buf) while ord(self.data_buf[self.data_idx]) != 0xFF: self.data_idx = self.data_idx+1 for idx in range(last_idx): pass