dbfp_pub/libs/fingerprint.py

595 lines
19 KiB
Python

#
#
#
import os
import re
import json
import sqlite3
import hashlib
import time
import logging
from libs.toolbox import ToolBox
from libs.exceptions import FingerprintWrite, FingerprintMD5, FingerprintReadNoData
delimeter = "|"
#
# Database Schema
# The SQLite database schema is stored in page 1 of the database (root page).
# The sqlite_master table contains one row for each table, index, view, and trigger
# (collectively "objects") in the database schema.
# CREATE TABLE sqlite_master(
# type text,
# name text,
# tbl_name text,
# rootpage integer,
# sql text
# );
#
class FingerprintDB:
"""
This class represents a complete database schema
Helper functions:
Writing of the database schema as a "fingerprint"
Comparing of a database schema (fingerprint loaded from file)
"""
sqlmaster = "SELECT name, sql FROM sqlite_master WHERE type='table'"
# version of the scanner used to create the fingerprint
scanner_ver = "1.00b"
# version of the json file format, this # is inserted in the json fingerprint file and can be used to determine what is supported at the time of that version
format_ver = "0.92"
#
def __init__(self):
self.conn = None
self.cur = None
self.table_names = []
self.tables = {}
self.db_hash = None
self.table_hashes = None
self.filein = ""
self.init = False
# db file details
self.file_details = {}
self.file_details['db-name'] = ""
self.file_details['app-name'] = ""
self.file_details['app-ver'] = ""
self.file_details['notes'] = ""
self.file_details['scan-date'] = ""
#
def scanDBFile(self, filein):
""" read the database, populate the data into the class """
try:
(self.conn, self.cur) = self.__openDB(filein)
except Exception, ex:
raise
try:
# extract file name from path+filename
self.file_details['db-name'] = os.path.basename(filein)
except Exception as ex:
logging.warn(ex)
self.file_details['db-name'] = filein
try:
# read database schema, parse the schema
self.__readDatabase()
# concat all the table create statements, then md5
self.__createMD5DB()
except Exception as ex:
raise
# create and index of table hashes
self.table_hashes = {}
for key in self.tables.keys():
self.table_hashes[key] = self.tables[key].hash()
# flag is used to determine if the class has data
self.init = True
self.filein = filein
#
def writeFingerprint(self):
if (not self.init):
return
try:
filename = ToolBox.getTimestampFilename(self.filein)
fh = open(filename, "w")
try:
self.__writeFingerprint(fh)
finally:
fh.close()
except Exception, ex:
logging.error(ex)
raise FingerprintWrite("Problem writing the fingerprint to a file, file=={}".format(filename))
return filename
#
def writeFingerprintFile(self, filename):
if (not self.init):
return
try:
fh = open(filename, "w")
try:
self.__writeFingerprint(fh)
finally:
fh.close()
except Exception, ex:
logging.error(ex)
raise FingerprintWrite("Problem writing the fingerprint to a file, file=={}".format(filename))
# import fingerprint from a json file
def importJson(self, file_json):
""" import fingerprint from a json file """
self.__importJsonDBSchema(file_json)
#
# def importJsonIndex(self, file_json):
# """ import fingerprint from a json file, return the MD5 sums """
# self.__importJsonDBSchema(file_json)
# return (self.
#
def compareDB(self, filejson):
""" return the percentage of the match between two fingerprints """
if (not self.init):
return
fp = FingerprintDB();
fp.__importJsonDBSchema(filejson)
result = self.__DBSchemaCompare(fp)
return result
#
def getMD5DB(self):
return self.db_hash
def getMD5Tables(self):
return self.table_hashes
#
def __importJsonDBSchema(self, file_json):
""" import fingerprint from a json file """
self.__init__()
tables = {}
try:
fh = open(file_json, "r")
jsonData = json.load(fh)
tb = jsonData['tables']
dbmt = jsonData['db-metadata']
dbht = jsonData['db-metadata-hashes']
dbmd5 = jsonData['db-metadata-md5']
metadata = jsonData['_file-details']
all_tables = tb.keys()
for table_name in all_tables:
logging.debug("[[ Table <" + table_name + "> imported ]]")
newTable = TableSchema()
newTable.importTable(table_name, tb[table_name], dbmt[table_name], dbht[table_name])
tables[table_name] = newTable
self.tables = tables
self.db_hash = dbmd5
self.table_hashes = dbht
self.file_details = metadata
self.init = True
except Exception as ex:
logging.error("ERROR: problem loading json file: \n{}\n{}".format(file_json, ex))
#
def __DBMD5Compare(self):
pass
def __DBSchemaCompare(self, fp):
# the json database schema definition is what our tools is expecting...
# ...so we use it as the baseline
# look for table, if exists, compare each
# if exists, compare each field
# else, add to unknown tables...or do a fuzzy compare (look at number of fields, field names)
diff_num = 0
diff_total = 0 # total number of different properties (from within a table)
all_total = 0 # total number of properties (from the entire database comparison)
for tableName in fp.tables.keys():
try:
table = self.tables[tableName]
if (table):
logging.info("__DBMD5Compare:: hash1=={}, hash2=={}".format(fp.tables[tableName].hash(), table.hash()))
if not (fp.tables[tableName].hash() == table.hash()):
(total, diff_num) = self.__CompareTable(fp.tables[tableName], table)
all_total += total
diff_total += diff_num
else:
all_total += 10 # increment the total tables compared
logging.info("__DBMD5Compare:: tableName=={} IDENTICAL".format(tableName))
# table found in only one database (fingerprint)
except KeyError as ex:
# get the number of fields from the other table to add to the diff_total
logging.info("__DBMD5Compare:: tableName=={} NOT FOUND".format(tableName))
diff_total += 10 # increment the total of different properties
all_total += 10 # increment the total tables compared
self.__FuzzyTable() # TODO: try to detect table name changes, look for same properties
logging.info("__DBMD5Compare:: all_total=={}, diff_total=={}".format(all_total, diff_total))
if (diff_total > 0):
if (diff_total == all_total):
percentage = 0
else:
percentage = 100 * float(all_total-diff_total) / float(all_total)
else:
percentage = 100
return percentage
#
# Compare the Table Definitions.
# Compare Table 1 (Json table) to Table 2
#
def __CompareTable(self, tb1, tb2):
fields_total_count = 0
fields_diff_count = 0
prop_total_count = 0
prop_error_count = 0
totals = 0
diff_total = 0
fields1 = tb1.fields
fields2 = tb2.fields
for field in fields1.keys():
field1 = fields1[field]
fields_total_count += 1
if (fields2.has_key(field)):
field2 = fields1[field]
for properties in field1.keys():
prop_total_count += 1
if not field2.has_key(properties):
prop_error_count += 1
else:
fields_diff_count += 1
totals = prop_total_count + fields_total_count
diff_total = prop_error_count + fields_diff_count
logging.info("__CompareTable:: prop_total_count=={}, fields_total_count=={}, totals=={}".format(prop_total_count, fields_total_count, totals))
logging.info("__CompareTable:: prop_error_count=={}, fields_diff_count=={}, diff_total=={}".format(prop_error_count, fields_diff_count, diff_total))
return (totals, diff_total)
# look at un-identified tables and try to match fields by their properties
def __FuzzyTable(self):
return
#
def __openDB(self, filein):
conn = sqlite3.connect(filein)
cur = conn.cursor()
return (conn, cur)
# read a sqlite database by parsing the create table strings
# sqlmaster = "SELECT name, sql FROM sqlite_master WHERE type='table'"
def __readDatabase(self):
flag = False
rows = self.cur.execute(self.sqlmaster)
for row in rows:
flag = True
newTable = TableSchema()
newTable.loadTable(row[0], row[1])
self.table_names.append(newTable.name())
self.tables[newTable.name()] = newTable
if (not flag):
raise FingerprintReadNoData("No data, possible zero byte file")
#
def debugFingerprint(self):
if self.tables:
myDict = self.tables
elif self.tablesJson:
myDict = self.tablesJson
else:
return
#
keys = myDict.keys()
for key in keys:
print "[[ TABLE: <" + key + "> ]]"
tableDef = myDict[key]
tableDef.toJSON()
#
def __writeFingerprint(self, filehandle):
ahash = {}
thash = {}
dmhash = {}
shash = {}
mhash = {}
ahash['_file-details'] = mhash
ahash['db-metadata'] = dmhash
ahash['db-metadata-hashes'] = shash
ahash['db-metadata-md5'] = None
ahash['tables'] = thash
try:
timestr = time.strftime('%Y-%m-%d_%H%M%S', time.localtime(time.time()))
except:
timestr = ""
mhash['scan-date'] = timestr
mhash['format-ver'] = self.format_ver
mhash['scanner-ver'] = self.scanner_ver
mhash['scanner-name'] = 'dbfp'
mhash['db-name'] = self.file_details['db-name']
mhash['app-name'] = self.file_details['app-name']
mhash['app-ver'] = self.file_details['app-ver']
mhash['notes'] = self.file_details['notes']
# tables
tables = self.tables.keys()
for table in tables:
thash[table] = self.tables[table].fields
dmhash[table] = self.tables[table].SQLstr()
shash[table] = self.tables[table].sqlStrHash
ahash['db-metadata-md5'] = self.db_hash
json.dump(ahash, filehandle, sort_keys=True, indent=4)
#
def __createMD5DB(self):
retval = None
concat_str = ""
try:
bitchasskeys = self.tables.keys()
bitchasskeys.sort()
for key in bitchasskeys:
concat_str += self.tables[key].hash()
#print "---> {}".format(concat_str)
m = hashlib.md5()
m.update(concat_str)
retval = m.hexdigest()
self.db_hash = retval
except Exception, ex:
logging.error(ex)
raise FingerprintMD5("Problem creating a MD5 sum")
#
def setAppName(self, name):
self.file_details['app-name'] = name
#
def setAppVer(self, version):
self.file_details['app-ver'] = version
#
def setNotes(self, notes):
self.file_details['notes'] = notes
#
def getErrorString(self, errorCode):
retval = "ERROR: unknown error code: " + str(errorCode)
if (errorCode == -2):
retval = "ERROR: problem opening file, or not sqlite database"
elif (errorCode == -3):
retval = "ERROR: problem reading database"
return retval
#
#
#
class TableSchema:
"""
This class represents the definition of database table
"""
tableschemaregex = r'\((.*)\)'
#
def __init__(self):
self.tableName = ""
self.sqlStr = ""
self.sqlStrHash = ""
self.fields = {}
self.primarykeyFlag = False
self.uniqueFlag = False
#
def loadTable(self, tableName, sqlStr):
self.tableName = tableName
self.sqlStr = sqlStr
logging.info("[[ TABLE: <{}> ] processing...]".format(tableName))
# hash the sql create string for quicker fingerprint matching
try:
m = hashlib.md5()
m.update(self.sqlStr)
self.sqlStrHash = m.hexdigest()
except:
logging.warn('WARN: problem hashing sql string: "{}"'.format(self.sqlStr))
# parse the create string into a structured hash table
results = re.search(self.tableschemaregex, sqlStr)
if results:
colstr = results.group(1)
columns = colstr.split(',')
for col in columns:
newField = self.__parseCreateStr(col.strip())
if newField:
self.fields[newField['name']] = newField
del newField['name']
#
def importTable(self, tbName, fields, sqlStr, hashStr):
self.tableName = tbName
self.sqlStr = sqlStr
self.fields = fields
self.sqlStrHash = hashStr
# Table Definition
#
# CREATE TABLE contacts (_id INTEGER PRIMARY KEY AUTOINCREMENT,name_raw_contact_id INTEGER REFERENCES raw_contacts(_id),
# photo_id INTEGER REFERENCES data(_id),photo_file_id INTEGER REFERENCES photo_files(_id),
# custom_ringtone TEXT,send_to_voicemail INTEGER NOT NULL DEFAULT 0,
# times_contacted INTEGER NOT NULL DEFAULT 0,last_time_contacted INTEGER,
# starred INTEGER NOT NULL DEFAULT 0,pinned INTEGER NOT NULL DEFAULT 2147483647,
# has_phone_number INTEGER NOT NULL DEFAULT 0,lookup TEXT,
# status_update_id INTEGER REFERENCES data(_id),contact_last_updated_timestamp INTEGER)
#
# CREATE TABLE sent_files_v2 (uid INTEGER, phone TEXT, sphone TEXT, deleted INTEGER,
# PRIMARY KEY (uid, phone)
def __parseCreateStr(self, sqltext):
try:
newField = {}
# use for debug purposes
# print "sqltext=={}".format(sqltext)
# raw_contact_id INTEGER REFERENCES raw_contacts(_id) NOT NULL
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+REFERENCES\s+(.*)\s+NOT.NULL', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['references'] = True
newField['referencesdata'] = results.group(3)
newField['notnull'] = True
return newField
# photo_id INTEGER REFERENCES data(_id)
# name_raw_contact_id INTEGER REFERENCES raw_contacts(_id)
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+REFERENCES\s+(.*)', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['references'] = True
newField['referencesdata'] = results.group(3)
return newField
# pinned INTEGER NOT NULL DEFAULT 2147483647
# send_to_voicemail INTEGER NOT NULL DEFAULT 0
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+NOT\s+NULL\s+DEFAULT\s+(\w+)', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
newField['default'] = results.group(3)
return newField
# pinned INTEGER DEFAULT 2147483647
# send_to_voicemail INTEGER DEFAULT 0
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+DEFAULT\s+(\w+)', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['default'] = results.group(3)
return newField
# _id INTEGER PRIMARY KEY AUTOINCREMENT
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+PRIMARY\s+KEY\s+AUTOINCREMENT', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
newField['autoincrement'] = True
return newField
# _id INTEGER PRIMARY KEY
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+PRIMARY\s+KEY', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
return newField
# FileID INTEGER NOT NULL
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+NOT\s+NULL', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
return newField
# PRIMARY KEY (field_name,
results = re.match(r'PRIMARY.KEY\s*\((?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\,?', sqltext, re.IGNORECASE)
if results:
field = self.fields[results.group(1)]
field['primarykey'] = True
self.primarykeyFlag = True
return False
# UNIQUE (field_name,
results = re.match(r'UNIQUE\s*\((?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\,?', sqltext, re.IGNORECASE)
if results:
field = self.fields[results.group(1)]
field['unique'] = True
self.uniqueFlag = True;
return False
# custom_ringtone TEXT
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)', sqltext, re.IGNORECASE)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
return newField
# field_name)
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\)', sqltext, re.IGNORECASE)
if results:
field = self.fields[results.group(1)]
if (self.primarykeyFlag):
if (field):
field['primarykey'] = True
self.primarykeyFlag = False
elif (self.uniqueFlag):
if (field):
field['unique'] = True
self.uniqueFlag = False
return False
# field_name
results = re.match(r'^(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*$', sqltext, re.IGNORECASE)
if results:
if (self.primarykeyFlag):
field = self.fields[results.group(1)]
field['primarykey'] = True
elif (self.uniqueFlag):
field = self.fields[results.group(1)]
field['unique'] = True
else:
newField['name'] = results.group(1)
newField['datatype'] = "INTEGER"
return newField
return False
logging.warn('WARN: field definition not recognized: "{}"'.format(sqltext))
except Exception, e:
logging.warn('WARN: problem parsing sql create text: "{}"'.format(sqltext))
logging.warn('Exception: \n{}'.format(e))
return None
return None
#
def fields(self):
return self.fields
#
def toJSON(self):
print json.dumps(self.fields)
#
def toFile(self, filehandle):
json.dump(self.fields, filehandle, sort_keys=True, indent=4)
#
def __str__(self):
global delimeter
retstr = ""
retstr = json.dumps(self.fields)
return retstr
#
def name(self):
return self.tableName
#
def setSQLstr(self, str):
return self.sqlStr
#
def SQLstr(self):
return self.sqlStr
#
def hash(self):
return self.sqlStrHash