274 lines
6.8 KiB
Python
274 lines
6.8 KiB
Python
#
|
|
#
|
|
#
|
|
#
|
|
# 0xFF,0xD8 - Start of Image (0xFFD8FF to be used as JPG file marker)
|
|
# 0xFF,0xEn - App Exif data
|
|
# 0xFF,0xDB - DQT - Define Quantization Table
|
|
# 0xFF,0xDD - DRI - Define Restart Interval
|
|
# 0xFF,0xFE - Comments
|
|
# 0xFF,0xC0 - SOF0 - Start of Frame
|
|
# 0xFF,0xC2 - SOF2 - Start of Frame
|
|
# 0xFF,0xC4 - DHT - Define Huffman Tables
|
|
# 0xFF,0xDA - SOS - Start of Scan
|
|
# 0xFF,0xDn - RST - Restart (n=0..7)
|
|
# 0xFF,0xD9 - EOI - End of Image
|
|
#
|
|
# high,low - length
|
|
#
|
|
# https://en.wikipedia.org/wiki/JPEG
|
|
import re
|
|
import struct
|
|
import logging
|
|
from libs.jpg_fp import JpgFingerprint
|
|
|
|
#
|
|
class JpgBin:
|
|
|
|
BUF_CHUNK_SIZE = 2048
|
|
|
|
markers = {
|
|
'SOS': 0xffd9
|
|
}
|
|
|
|
def __init__(self):
|
|
self.data_buf = None
|
|
self.data_idx = 0
|
|
self.data_len = 0
|
|
self.fh = None
|
|
self.hh = None
|
|
|
|
self.continue_process = True
|
|
|
|
self.metadata_h = {}
|
|
self.img_bin_h = {}
|
|
|
|
self.prev_fpos = 0
|
|
self.prev_mhex = 0xdead
|
|
self.prev_mstr = "DUH!"
|
|
|
|
self.jpg_fp = JpgFingerprint()
|
|
|
|
#
|
|
def __isJPG(self):
|
|
(m1, m2) = struct.unpack('>HB', self.data_buf[0:3])
|
|
if (0xffd8 == m1 and 0xff == m2):
|
|
self.data_idx = 2
|
|
return True
|
|
return False
|
|
|
|
#
|
|
def processFile(self, file_h):
|
|
self.fh = file_h
|
|
self.getMoreBytes(True)
|
|
if (self.data_buf):
|
|
if (not self.__isJPG()):
|
|
return False
|
|
|
|
while(self.continue_process):
|
|
self.findAllMarkers()
|
|
self.getMoreBytes()
|
|
|
|
return True
|
|
return False
|
|
|
|
#
|
|
def findAllMarkers(self):
|
|
(word_b,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2])
|
|
hex_str = word_b.to_bytes(2, 'big').hex()
|
|
# RST 0xD(n) (n==0..7)
|
|
if (0xffd0 == word_b or 0xffd1 == word_b or 0xffd2 == word_b or 0xffd3 == word_b or 0xffd4 == word_b or 0xffd5 == word_b or 0xffd6 == word_b or 0xffd7 == word_b):
|
|
logging.info("[ {} : RST ]".format(hex_str))
|
|
self.markerRST(word_b)
|
|
# Comments section
|
|
elif (0xfffe == word_b):
|
|
logging.info("[ {} : Comment ]".format(hex_str))
|
|
self.markerComment(word_b)
|
|
# DQT
|
|
elif (0xffdb == word_b):
|
|
logging.info("[ {} : DQT ]".format(hex_str))
|
|
self.markerDQT(word_b)
|
|
# DRI
|
|
elif (0xffdd == word_b):
|
|
logging.info("[ {} : DRI ]".format(hex_str))
|
|
self.markerDRI(word_b)
|
|
# SOF0 - Start of Frame 0
|
|
elif (0xffc0 == word_b):
|
|
logging.info("[ {} : SOF0 ]".format(hex_str))
|
|
self.markerSOF0(word_b)
|
|
# SOF2 - Start of Frame 2
|
|
elif (0xffc2 == word_b):
|
|
logging.info("[ {} : SOF2 ]".format(hex_str))
|
|
self.markerSOF2(word_b)
|
|
# DHT
|
|
elif (0xffC4 == word_b):
|
|
logging.info("[ {} : DHT ]".format(hex_str))
|
|
self.markerDHT(word_b)
|
|
# SOS
|
|
elif (0xffda == word_b):
|
|
logging.info("[ {} : SOS ]".format(hex_str))
|
|
self.markerSOS(word_b)
|
|
# EOI
|
|
elif (0xffd9 == word_b):
|
|
logging.info("[ {} : EOI ]".format(hex_str))
|
|
self.markerEOI(word_b)
|
|
# APP 0xE(n) - App Exif Data
|
|
# struct.pack(">H", intt).hex()
|
|
elif ( re.search(r'ffe.', hex_str ) ):
|
|
logging.info("[ {} : App Data ]".format(hex_str))
|
|
self.markerAppData(word_b)
|
|
else:
|
|
self.data_idx += 1
|
|
|
|
#
|
|
# Image Metadata, Exif
|
|
#
|
|
#
|
|
def markerAppData(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
rec_len = self.calcSeekBytes()
|
|
self.jpg_fp.addImgMetadata(marker_hex, fpos, rec_len, "APP ")
|
|
|
|
#
|
|
def markerComment(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
rec_len = self.calcSeekBytes()
|
|
self.jpg_fp.addImgMetadata(marker_hex, fpos, rec_len, "COM ")
|
|
|
|
#
|
|
# Image Data
|
|
#
|
|
#
|
|
def markerSOS(self, marker_hex):
|
|
# rec_len = self.calcSeekBytes()
|
|
self.__addImgData(marker_hex, "SOS ")
|
|
|
|
#
|
|
def markerRST(self, marker_hex):
|
|
self.__addImgData(marker_hex, "RST ")
|
|
|
|
#
|
|
def markerDQT(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
rec_len = self.calcSeekBytes()
|
|
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "DQT ")
|
|
|
|
#
|
|
def markerDRI(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
self.data_idx += 4
|
|
self.jpg_fp.addImgData(marker_hex, fpos, 4, "DRI ")
|
|
|
|
#
|
|
def markerSOF0(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
rec_len = self.calcSeekBytes()
|
|
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "SOF0")
|
|
|
|
#
|
|
def markerSOF2(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
rec_len = self.calcSeekBytes()
|
|
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "SOF2")
|
|
|
|
def markerDHT(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
rec_len = self.calcSeekBytes()
|
|
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "DHT ")
|
|
|
|
#
|
|
def markerEOI(self, marker_hex):
|
|
fpos = self.fh.tell()
|
|
self.__addImgData(marker_hex, "EOI ")
|
|
self.jpg_fp.addImgData(marker_hex, fpos, 2, "EOI ")
|
|
|
|
|
|
# private helper function
|
|
def __addImgData(self, mhex, mstr):
|
|
fpos = self.fh.tell()
|
|
cur_fpos = (fpos - (self.data_len - self.data_idx))
|
|
if (self.prev_fpos > 0):
|
|
rec_len = cur_fpos - self.prev_fpos
|
|
self.jpg_fp.addImgData(self.prev_mhex, self.prev_fpos, rec_len, self.prev_mstr)
|
|
|
|
self.prev_mhex = mhex
|
|
self.prev_mstr = mstr
|
|
self.prev_fpos = cur_fpos
|
|
self.data_idx += 2
|
|
|
|
#
|
|
def genHash(self, file_h, hash_h):
|
|
self.hh = hash_h
|
|
#self.processFile(file_h)
|
|
|
|
while(self.continue_process):
|
|
if (self.findMarker(self.makers['SOS'])):
|
|
self.genImgHash()
|
|
self.getMoreBytes()
|
|
|
|
return self.hh
|
|
|
|
#
|
|
def genImgHash(self):
|
|
self.hh.update(self.data_buf[self.data_idx:])
|
|
while(self.continue_process):
|
|
self.getMoreBytes()
|
|
self.hh.update(self.data_buf)
|
|
|
|
#
|
|
def getMoreBytes(self, force_bytes=False):
|
|
if (self.data_idx >= (self.data_len-1) or force_bytes):
|
|
self.data_buf = self.fh.read(self.BUF_CHUNK_SIZE)
|
|
self.data_idx = 0
|
|
self.data_len = len(self.data_buf)
|
|
if (0 == self.data_len):
|
|
self.continue_process = False
|
|
logging.debug("DATA: len=={}".format(self.data_len))
|
|
|
|
#
|
|
def calcSeekBytes(self):
|
|
self.data_idx += 2
|
|
(rec_len,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2])
|
|
remain_bytes = self.data_len - self.data_idx
|
|
if (rec_len >= remain_bytes):
|
|
rec_diff = rec_len - remain_bytes
|
|
self.seekBytes(rec_diff)
|
|
else:
|
|
self.data_idx += rec_len
|
|
return rec_len
|
|
|
|
#
|
|
def seekBytes(self, num_bytes):
|
|
pos = self.fh.seek(num_bytes, 1)
|
|
logging.debug("SEEK: seek=={}, cur_loc=={}".format(num_bytes, pos))
|
|
self.getMoreBytes(True)
|
|
return pos
|
|
|
|
#
|
|
def printMarkerImg(self):
|
|
return self.jpg_fp.printImgMarkers()
|
|
|
|
#
|
|
def printMarkerMeta(self):
|
|
return self.jpg_fp.printMDMarkers()
|
|
|
|
#
|
|
def findMarker(self, marker):
|
|
pass
|
|
|
|
#
|
|
def __repr__(self):
|
|
|
|
return repr(self.jpg_fp)
|
|
|
|
#
|
|
def findMarkers222(self):
|
|
last_idx = len(self.data_buf)
|
|
|
|
while ord(self.data_buf[self.data_idx]) != 0xFF:
|
|
self.data_idx = self.data_idx+1
|
|
|
|
for idx in range(last_idx):
|
|
pass
|
|
|