dbfp_pub/libs/fingerprint.py

452 lines
13 KiB
Python

#
#
#
import re
import json
import sqlite3
import hashlib
import time
from libs import toolbox
delimeter = "|"
#
# Database Schema
# The SQLite database schema is stored in page 1 of the database (root page).
# The sqlite_master table contains one row for each table, index, view, and trigger
# (collectively "objects") in the database schema.
# CREATE TABLE sqlite_master(
# type text,
# name text,
# tbl_name text,
# rootpage integer,
# sql text
# );
#
class DBSchema:
"""
This class represents a complete database schema
Helper functions:
Writing of the database schema as a "fingerprint"
Comparing of a database schema (fingerprint loaded from file)
"""
sqlmaster = "SELECT name, sql FROM sqlite_master WHERE type='table'"
scanner_ver = ".85"
format_ver = ".90"
def __init__(self):
self.conn = None
self.cur = None
self.dbName = ''
self.tableNames = []
self.tables = {}
self.tablesJson = {}
self.app_name = ""
self.app_ver = ""
self.notes = ""
# self.jsonData = None
return
def scanDBFile(self, filein):
# try to open sqlite file
try:
(self.conn, self.cur) = self.__openDB(filein)
except Exception, e:
print e
return -2
# extract file name from path+filename
try:
self.dbName = toolbox.ToolBox.parseFilenameIncExt(filein)
except:
self.dbName = filein
# read database schema
try:
self.__readDatabase()
except Exception, e:
print e
return -3
return 1
# import fingerprint from a json file
def importJson(self, filejson):
self.tablesJson = self.__importJsonDBSchema(filejson)
#
def compareDB(self, filejson):
self.tablesJson = self.__importJsonDBSchema(filejson)
result = self.__DBSchemaCompare()
print "[ Percetage == {}]".format(result)
return result
# import fingerprint from a json file
def __importJsonDBSchema(self, file_json):
tables = {}
try:
fh = open(file_json, "r")
jsonData = json.load(fh)
dbmt = jsonData['db-metadata']
tb = jsonData['tables']
all_tables = tb.keys()
for table_name in all_tables:
print "[[ Table <" + table_name + "> imported ]]"
newTable = TableDefinition()
newTable.importTable(table_name, dbmt[table_name], tb[table_name])
tables[table_name] = newTable
except Exception, e:
print "ERROR: problem loading json file: " + file_json
print e
return tables
def __DBSchemaCompare(self):
# the json database schema definition is what our tools is expecting...
# ...so we use it as the baseline
# look for table, if exists, compare each
# if exists, compare each field
# else, add to unknown tables...or do a fuzzy compare (look at number of fields, field names)
diff_num = 0
diff_total = 0
all_total = 0
for tableName in self.tablesJson.keys():
table = self.tables[tableName]
print "[[ Comparing Table: " + tableName + " ]]"
if (table):
if not (self.tablesJson[tableName].hash() == table.hash()):
(total, diff_num) = self.__CompareTable(self.tablesJson[tableName], table)
all_total += total
diff_total += diff_num
else:
self.__FuzzyTable()
percentage = 0
if (diff_total > 0):
percentage = float(diff_total) / all_total
return percentage
#
# Compare the Table Definitions.
# Compare Table 1 (Json table) to Table 2
#
def __CompareTable(self, tb1, tb2):
fields_total_count = 0
fields_diff_count = 0
prop_total_count = 0
prop_error_count = 0
totals = 0
diff_total = 0
fields1 = tb1.fields
fields2 = tb2.fields
for field in fields1.keys():
field1 = fields1[field]
fields_total_count += 1
if (fields2.has_key(field)):
field2 = fields1[field]
for properties in field1.keys():
prop_total_count += 1
if not field2.has_key(properties):
prop_error_count += 1
else:
fields_diff_count += 1
if (prop_error_count == 0 and fields_diff_count == 0):
print "100% compatible"
else:
totals = prop_total_count + fields_total_count
diff_total = prop_error_count + fields_diff_count
print "Table difference found: " + str(diff_total)
#print str((diff_total/totals) * 100) + '% compatible total == ' + str(totals) + " diff_total == " + str(diff_total)
return (totals, diff_total)
# look at un-identified tables and try to match fields by their properties
def __FuzzyTable():
return
def __openDB(self, filein):
conn = sqlite3.connect(filein)
cur = conn.cursor()
return (conn, cur)
# read a sqlite database by parsing the create table strings
# sqlmaster = "SELECT name, sql FROM sqlite_master WHERE type='table'"
def __readDatabase(self):
for row in self.cur.execute(self.sqlmaster):
newTable = TableDefinition()
newTable.loadTable(row[0], row[1])
self.tableNames.append(newTable.name())
self.tables[newTable.name()] = newTable
return
def debugFingerprint(self):
if self.tables:
myDict = self.tables
elif self.tablesJson:
myDict = self.tablesJson
else:
return
keys = myDict.keys()
for key in keys:
print "[[ TABLE: <" + key + "> ]]"
tableDef = myDict[key]
#print str(tableDef.SQLstr())
tableDef.toJSON()
def writeFingerprint(self, filehandle):
ahash = {}
thash = {}
dmhash = {}
shash = {}
mhash = {}
ahash['_file-metadata'] = mhash
ahash['db-metadata'] = dmhash
ahash['db-metadata-hashes'] = shash
ahash['tables'] = thash
try:
timestr = time.strftime('%Y-%m-%d_%H%M%S', time.localtime(time.time()))
except:
timestr = ""
mhash['scan-date'] = timestr
mhash['format-ver'] = self.format_ver
mhash['scanner-ver'] = self.scanner_ver
mhash['scanner-name'] = 'dbfp'
mhash['dn-name'] = self.dbName
mhash['app-name'] = self.app_name
mhash['app-ver'] = self.app_ver
mhash['notes'] = self.notes
# tables
tables = self.tables.keys()
for table in tables:
thash[table] = self.tables[table].fields
dmhash[table] = self.tables[table].SQLstr()
shash[table] = self.tables[table].sqlStrHash
json.dump(ahash, filehandle, sort_keys=True, indent=4)
#
def setAppName(self, name):
self.app_name = name
#
def setAppVer(self, version):
self.app_ver = version
#
def setNotes(self, notes):
self.notes = notes
#
def getErrorString(self, errorCode):
retval = "ERROR: unknown error code: " + str(errorCode)
if (errorCode == -2):
retval = "ERROR: problem opening file, or not sqlite database"
elif (errorCode == -3):
retval = "ERROR: problem reading database"
return retval
#
#
#
class TableDefinition:
"""
This class represents the definition of database table
"""
tableschemaregex = r'\((.*)\)'
#
def __init__(self):
self.tableName = ""
self.sqlStr = ""
self.sqlStrHash = ""
self.fields = {}
self.primarykeyFlag = False
self.uniqueFlag = False
#
def loadTable(self, tableName, sqlStr):
self.tableName = tableName
self.sqlStr = sqlStr
print "[[ TABLE: <{}> ] loading]".format(tableName)
# hash the sql create string for quicker fingerprint matching
try:
m = hashlib.md5()
m.update(self.sqlStr)
self.sqlStrHash = m.hexdigest()
except:
print 'WARN: problem hashing sql string: "{}"'.format(self.sqlStr)
# parse the create string into a structured hash table
results = re.search(self.tableschemaregex, sqlStr)
if results:
colstr = results.group(1)
columns = colstr.split(',')
for col in columns:
newField = self.__parseCreateStr(col.strip())
if newField:
self.fields[newField['name']] = newField
del newField['name']
else:
print "WARN: <{}> ] failed to parse]".format(tableName)
#
def importTable(self, tbName, sqlStr, fields):
self.tableName = tbName
self.sqlStr = sqlStr
self.fields = fields
# Table Definition
#
# CREATE TABLE contacts (_id INTEGER PRIMARY KEY AUTOINCREMENT,name_raw_contact_id INTEGER REFERENCES raw_contacts(_id),
# photo_id INTEGER REFERENCES data(_id),photo_file_id INTEGER REFERENCES photo_files(_id),
# custom_ringtone TEXT,send_to_voicemail INTEGER NOT NULL DEFAULT 0,
# times_contacted INTEGER NOT NULL DEFAULT 0,last_time_contacted INTEGER,
# starred INTEGER NOT NULL DEFAULT 0,pinned INTEGER NOT NULL DEFAULT 2147483647,
# has_phone_number INTEGER NOT NULL DEFAULT 0,lookup TEXT,
# status_update_id INTEGER REFERENCES data(_id),contact_last_updated_timestamp INTEGER)
#
# CREATE TABLE sent_files_v2 (uid INTEGER, phone TEXT, sphone TEXT, deleted INTEGER,
# PRIMARY KEY (uid, phone)
def __parseCreateStr(self, sqltext):
try:
newField = {}
# photo_id INTEGER REFERENCES data(_id)
# name_raw_contact_id INTEGER REFERENCES raw_contacts(_id)
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+REFERENCESS\s+(\W+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['references'] = 1
newField['referencesdata'] = results.group(3)
return newField
# pinned INTEGER NOT NULL DEFAULT 2147483647
# send_to_voicemail INTEGER NOT NULL DEFAULT 0
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+NOT NULL\s+DEFAULT\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
newField['default'] = results.group(3)
return newField
# pinned INTEGER DEFAULT 2147483647
# send_to_voicemail INTEGER DEFAULT 0
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+DEFAULT\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
newField['default'] = results.group(3)
return newField
# _id INTEGER PRIMARY KEY AUTOINCREMENT
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+PRIMARY KEY\s+AUTOINCREMENT', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
newField['autoincrement'] = True
return newField
# _id INTEGER PRIMARY KEY
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+PRIMARY KEY', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
return newField
# FileID INTEGER NOT NULL
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+NOT NULL', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
return newField
# PRIMARY KEY (field_name,
results = re.match(r'PRIMARY KEY \((?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\,?', sqltext)
if results:
field = self.fields[results.group(1)]
field['primarykey'] = True
self.primarykeyFlag = True
return False
# UNIQUE(field_name,
results = re.match(r'UNIQUE\((?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\,?', sqltext)
if results:
field = self.fields[results.group(1)]
field['unique'] = True
self.uniqueFlag = True;
return False
# custom_ringtone TEXT
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
return newField
# field_name)
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*(\)?)', sqltext)
if results:
field = self.fields[results.group(1)]
field = self.fields[results.group(2)]
if (self.primarykeyFlag):
field['primarykey'] = True
if (field):
self.primarykeyFlag = False
elif (self.uniqueFlag):
field['unique'] = True
if (field):
self.uniqueFlag = False
return False
print 'WARN: field definition not recognized: "{}"'.format(sqltext)
except Exception, e:
print 'WARN: problem parsing sql create text: "{}"'.format(sqltext)
return None
return None
#
def fields(self):
return self.fields
#
def toJSON(self):
print json.dumps(self.fields)
#
def toFile(self, filehandle):
json.dump(self.fields, filehandle, sort_keys=True, indent=4)
#
def __str__(self):
global delimeter
retstr = ""
retstr = json.dumps(self.fields)
return retstr
#
def name(self):
return self.tableName
#
def setSQLstr(self, str):
return self.sqlStr
#
def SQLstr(self):
return self.sqlStr
#
def hash(self):
return self.sqlStrHash