FIN: parsing all markers, saved to fingerprint class
This commit is contained in:
parent
9b9df9482e
commit
ed28d01437
249
libs/jpg_bin.py
249
libs/jpg_bin.py
|
@ -20,29 +20,34 @@
|
|||
import re
|
||||
import struct
|
||||
import logging
|
||||
from libs.jpg_fp import JpgFingerprint
|
||||
|
||||
#
|
||||
class JpgBin:
|
||||
|
||||
BUF_CHUNK_SIZE = 2048
|
||||
data_buf = None
|
||||
data_idx = 0
|
||||
data_len = 0
|
||||
fh = None
|
||||
hh = None
|
||||
|
||||
markers = {
|
||||
'SOS': 0xffd9
|
||||
}
|
||||
|
||||
continue_process = True
|
||||
|
||||
metadata_h = {}
|
||||
img_bin_h = {}
|
||||
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
self.data_buf = None
|
||||
self.data_idx = 0
|
||||
self.data_len = 0
|
||||
self.fh = None
|
||||
self.hh = None
|
||||
|
||||
self.continue_process = True
|
||||
|
||||
self.metadata_h = {}
|
||||
self.img_bin_h = {}
|
||||
|
||||
self.prev_fpos = 0
|
||||
self.prev_mhex = 0xdead
|
||||
self.prev_mstr = "DUH!"
|
||||
|
||||
self.jpg_fp = JpgFingerprint()
|
||||
|
||||
#
|
||||
def __isJPG(self):
|
||||
|
@ -67,6 +72,131 @@ class JpgBin:
|
|||
return True
|
||||
return False
|
||||
|
||||
#
|
||||
def findAllMarkers(self):
|
||||
(word_b,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2])
|
||||
hex_str = word_b.to_bytes(2, 'big').hex()
|
||||
# RST 0xD(n) (n==0..7)
|
||||
if (0xffd0 == word_b or 0xffd1 == word_b or 0xffd2 == word_b or 0xffd3 == word_b or 0xffd4 == word_b or 0xffd5 == word_b or 0xffd6 == word_b or 0xffd7 == word_b):
|
||||
logging.info("[ {} : RST ]".format(hex_str))
|
||||
self.markerRST(word_b)
|
||||
# Comments section
|
||||
elif (0xfffe == word_b):
|
||||
logging.info("[ {} : Comment ]".format(hex_str))
|
||||
self.markerComment(word_b)
|
||||
# DQT
|
||||
elif (0xffdb == word_b):
|
||||
logging.info("[ {} : DQT ]".format(hex_str))
|
||||
self.markerDQT(word_b)
|
||||
# DRI
|
||||
elif (0xffdd == word_b):
|
||||
logging.info("[ {} : DRI ]".format(hex_str))
|
||||
self.markerDRI(word_b)
|
||||
# SOF0 - Start of Frame 0
|
||||
elif (0xffc0 == word_b):
|
||||
logging.info("[ {} : SOF0 ]".format(hex_str))
|
||||
self.markerSOF0(word_b)
|
||||
# SOF2 - Start of Frame 2
|
||||
elif (0xffc2 == word_b):
|
||||
logging.info("[ {} : SOF2 ]".format(hex_str))
|
||||
self.markerSOF2(word_b)
|
||||
# DHT
|
||||
elif (0xffC4 == word_b):
|
||||
logging.info("[ {} : DHT ]".format(hex_str))
|
||||
self.markerDHT(word_b)
|
||||
# SOS
|
||||
elif (0xffda == word_b):
|
||||
logging.info("[ {} : SOS ]".format(hex_str))
|
||||
self.markerSOS(word_b)
|
||||
# EOI
|
||||
elif (0xffd9 == word_b):
|
||||
logging.info("[ {} : EOI ]".format(hex_str))
|
||||
self.markerEOI(word_b)
|
||||
# APP 0xE(n) - App Exif Data
|
||||
# struct.pack(">H", intt).hex()
|
||||
elif ( re.search(r'ffe.', hex_str ) ):
|
||||
logging.info("[ {} : App Data ]".format(hex_str))
|
||||
self.markerAppData(word_b)
|
||||
else:
|
||||
self.data_idx += 1
|
||||
|
||||
#
|
||||
# Image Metadata, Exif
|
||||
#
|
||||
#
|
||||
def markerAppData(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
rec_len = self.calcSeekBytes()
|
||||
self.jpg_fp.addImgMetadata(marker_hex, fpos, rec_len, "APP ")
|
||||
|
||||
#
|
||||
def markerComment(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
rec_len = self.calcSeekBytes()
|
||||
self.jpg_fp.addImgMetadata(marker_hex, fpos, rec_len, "COM ")
|
||||
|
||||
#
|
||||
# Image Data
|
||||
#
|
||||
#
|
||||
def markerSOS(self, marker_hex):
|
||||
# rec_len = self.calcSeekBytes()
|
||||
self.__addImgData(marker_hex, "SOS ")
|
||||
|
||||
|
||||
#
|
||||
def markerRST(self, marker_hex):
|
||||
self.__addImgData(marker_hex, "RST ")
|
||||
|
||||
|
||||
def markerDQT(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
rec_len = self.calcSeekBytes()
|
||||
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "DQT ")
|
||||
|
||||
#
|
||||
def markerDRI(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
self.data_idx += 4
|
||||
self.jpg_fp.addImgData(marker_hex, fpos, 4, "DRI ")
|
||||
|
||||
#
|
||||
def markerSOF0(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
rec_len = self.calcSeekBytes()
|
||||
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "SOF0")
|
||||
|
||||
#
|
||||
def markerSOF2(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
rec_len = self.calcSeekBytes()
|
||||
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "SOF2")
|
||||
|
||||
def markerDHT(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
rec_len = self.calcSeekBytes()
|
||||
self.jpg_fp.addImgData(marker_hex, fpos, rec_len, "DHT ")
|
||||
|
||||
#
|
||||
def markerEOI(self, marker_hex):
|
||||
fpos = self.fh.tell()
|
||||
self.__addImgData(marker_hex, "EOI ")
|
||||
self.jpg_fp.addImgData(marker_hex, fpos, 2, "EOI ")
|
||||
|
||||
|
||||
# private helper function
|
||||
def __addImgData(self, mhex, mstr):
|
||||
fpos = self.fh.tell()
|
||||
cur_fpos = (fpos - (self.data_len - self.data_idx))
|
||||
if (self.prev_fpos > 0):
|
||||
rec_len = cur_fpos - self.prev_fpos
|
||||
self.jpg_fp.addImgMetadata(self.prev_mhex, self.prev_fpos, rec_len, self.prev_mstr)
|
||||
|
||||
self.prev_mhex = mhex
|
||||
self.prev_mstr = mstr
|
||||
self.prev_fpos = cur_fpos
|
||||
self.data_idx += 2
|
||||
|
||||
#
|
||||
def genHash(self, file_h, hash_h):
|
||||
self.hh = hash_h
|
||||
|
@ -106,6 +236,7 @@ class JpgBin:
|
|||
self.seekBytes(rec_diff)
|
||||
else:
|
||||
self.data_idx += rec_len
|
||||
return rec_len
|
||||
|
||||
#
|
||||
def seekBytes(self, num_bytes):
|
||||
|
@ -116,103 +247,11 @@ class JpgBin:
|
|||
|
||||
#
|
||||
def findMarker(self, marker):
|
||||
|
||||
pass
|
||||
|
||||
#
|
||||
def findAllMarkers(self):
|
||||
(word_b,) = struct.unpack('>H', self.data_buf[self.data_idx:self.data_idx+2])
|
||||
hex_str = word_b.to_bytes(2, 'big').hex()
|
||||
# RST 0xD(n) (n==0..7)
|
||||
if (0xffd0 == word_b or 0xffd1 == word_b or 0xffd2 == word_b or 0xffd3 == word_b or 0xffd4 == word_b or 0xffd5 == word_b or 0xffd6 == word_b or 0xffd7 == word_b):
|
||||
logging.info("[ {} : RST ]".format(hex_str))
|
||||
self.markerRST()
|
||||
# Comments section
|
||||
elif (0xfffe == word_b):
|
||||
logging.info("[ {} : Comment ]".format(hex_str))
|
||||
self.markerComment()
|
||||
# DQT
|
||||
elif (0xffdb == word_b):
|
||||
logging.info("[ {} : DQT ]".format(hex_str))
|
||||
self.markerDQT()
|
||||
# DRI
|
||||
elif (0xffdd == word_b):
|
||||
logging.info("[ {} : DRI ]".format(hex_str))
|
||||
self.markerDRI()
|
||||
# SOF0 - Start of Frame 0
|
||||
elif (0xffc0 == word_b):
|
||||
logging.info("[ {} : SOF0 ]".format(hex_str))
|
||||
self.markerSOF0()
|
||||
# SOF2 - Start of Frame 2
|
||||
elif (0xffc2 == word_b):
|
||||
logging.info("[ {} : SOF2 ]".format(hex_str))
|
||||
self.markerSOF2()
|
||||
# DHT
|
||||
elif (0xffC4 == word_b):
|
||||
logging.info("[ {} : DHT ]".format(hex_str))
|
||||
self.markerDHT()
|
||||
# SOS
|
||||
elif (0xffda == word_b):
|
||||
logging.info("[ {} : SOS ]".format(hex_str))
|
||||
self.markerSOS()
|
||||
# EOI
|
||||
elif (0xffd9 == word_b):
|
||||
logging.info("[ {} : EOI ]".format(hex_str))
|
||||
self.markerEOI()
|
||||
# APP 0xE(n) - App Exif Data
|
||||
# struct.pack(">H", intt).hex()
|
||||
elif ( re.search(r'ffe.', hex_str ) ):
|
||||
logging.info("[ {} : App Data ]".format(hex_str))
|
||||
self.markerAppData()
|
||||
else:
|
||||
self.data_idx += 1
|
||||
|
||||
|
||||
#
|
||||
# Image Metadata, Exif
|
||||
#
|
||||
#
|
||||
def markerAppData(self):
|
||||
self.calcSeekBytes()
|
||||
#
|
||||
def markerComment(self):
|
||||
self.calcSeekBytes()
|
||||
|
||||
#
|
||||
# Image Data
|
||||
#
|
||||
#
|
||||
def markerSOS(self):
|
||||
self.calcSeekBytes()
|
||||
|
||||
def markerRST(self):
|
||||
self.data_idx += 2
|
||||
|
||||
def markerDQT(self):
|
||||
self.calcSeekBytes()
|
||||
|
||||
#
|
||||
def markerDRI(self):
|
||||
self.data_idx += 4
|
||||
|
||||
#
|
||||
def markerSOF0(self):
|
||||
self.calcSeekBytes()
|
||||
|
||||
#
|
||||
def markerSOF2(self):
|
||||
self.calcSeekBytes()
|
||||
|
||||
def markerDHT(self):
|
||||
self.calcSeekBytes()
|
||||
|
||||
def markerEOI(self):
|
||||
self.data_idx += 2
|
||||
# self.continue_process = False
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
pass
|
||||
return repr(self.jpg_fp)
|
||||
|
||||
|
||||
def findMarkers222(self):
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
#
|
||||
#
|
||||
#
|
||||
class JpgFingerprint:
|
||||
|
||||
def __init__(self):
|
||||
self.markers_a = []
|
||||
self.markers_img = []
|
||||
self.markers_meta = []
|
||||
self.markers_h = {}
|
||||
|
||||
|
||||
def addImgMetadata(self, mhex, mpos, mlen, mstr):
|
||||
marker = JpgMarker(mhex, mpos, mlen, mstr)
|
||||
self.markers_a.append(marker)
|
||||
self.markers_meta.append(marker)
|
||||
|
||||
|
||||
def addImgData(self, mhex, mpos, mlen, mstr):
|
||||
marker = JpgMarker(mhex, mpos, mlen, mstr)
|
||||
self.markers_a.append(marker)
|
||||
self.markers_img.append(marker)
|
||||
|
||||
def printImgMarkers(self):
|
||||
pass
|
||||
|
||||
|
||||
def printMDMarkers(self):
|
||||
pass
|
||||
|
||||
#
|
||||
def __repr__(self):
|
||||
str = ""
|
||||
total = 0
|
||||
for marker in self.markers_a:
|
||||
str += repr(marker) + "\n"
|
||||
total += marker.marker_size
|
||||
str += "[TOT ] bytes=={}".format(total)
|
||||
str += "\n"
|
||||
return str
|
||||
|
||||
|
||||
#
|
||||
class JpgMarker:
|
||||
"""
|
||||
Marker Data Type
|
||||
"""
|
||||
def __init__(self, mhex, fpos, mlen, mstr):
|
||||
self.marker_hex = mhex
|
||||
self.marker_hexstr = self.marker_hex.to_bytes(2, 'big').hex()
|
||||
self.marker_filepos = fpos
|
||||
self.marker_size = mlen
|
||||
self.marker_cat = mstr
|
||||
|
||||
|
||||
def __repr__(self):
|
||||
return "[{}] {} {}-len {}-fpos".format(self.marker_cat, self.marker_hexstr, self.marker_size, self.marker_filepos)
|
||||
|
||||
|
||||
|
|
@ -1,22 +1,22 @@
|
|||
#
|
||||
#
|
||||
#
|
||||
import logging
|
||||
from PIL import Image
|
||||
from libs.jpg_bin import JpgBin
|
||||
|
||||
|
||||
class JpgTools:
|
||||
|
||||
fh = None
|
||||
|
||||
def __init__(self):
|
||||
self.fh = None
|
||||
pass
|
||||
|
||||
#
|
||||
# process a jpg file, read only
|
||||
#
|
||||
def getJpgBin(self, fname):
|
||||
self.processFile(fname)
|
||||
return self.processFile(fname)
|
||||
|
||||
#
|
||||
# process a jpg file, create new jpg with crypto keys
|
||||
|
@ -25,15 +25,15 @@ class JpgTools:
|
|||
self.fh = open(fname, "rb")
|
||||
self.jpg = JpgBin()
|
||||
retval = self.jpg.processFile(self.fh)
|
||||
|
||||
print("processFile()=={}".format(retval))
|
||||
logging.info("processFile()=={}".format(retval))
|
||||
|
||||
#
|
||||
def processFile(self, fname):
|
||||
self.fh = open(fname, "rb")
|
||||
self.jpg = JpgBin()
|
||||
retval = self.jpg.processFile(self.fh)
|
||||
print("processFile()=={}".format(retval))
|
||||
logging.info("processFile()=={}".format(retval))
|
||||
return self.jpg
|
||||
|
||||
#
|
||||
def process_OLD(self, fname):
|
||||
|
|
|
@ -20,8 +20,8 @@ def main():
|
|||
# export signature & public key to a new image file
|
||||
def processImage(image_fn):
|
||||
jpg = JpgTools()
|
||||
img_bin = jpg.getJpgBin(image_fn)
|
||||
|
||||
jpg_bin = jpg.getJpgBin(image_fn)
|
||||
print( str(jpg_bin) )
|
||||
# sig = Signature()
|
||||
# sig.genSig(img_bin)
|
||||
|
||||
|
|
Loading…
Reference in New Issue