dbfp_pub/libs/fingerprint.py

389 lines
11 KiB
Python

#
#
#
import re
import json
import sqlite3
delimeter = "|"
#
# Database Schema
#
class DBSchema:
"""
This class represents a complete database schema
"""
sqlmaster = "SELECT name, sql FROM sqlite_master WHERE type='table'"
def __init__(self):
self.conn = None
self.cur = None
self.dbName = ''
self.tableNames = []
self.tables = {}
self.tablesJson = {}
# self.jsonData = None
return
def scanDBFile(self, filein):
# try to open sqlite file
try:
(self.conn, self.cur) = self.__openDB(filein)
except Exception, e:
print e
return -2
self.dbName = filein
# read database schema
try:
self.__readDatabase()
except Exception, e:
print e
return -3
return 1
def importJson(self, filejson):
self.tablesJson = self.__importDBSchema(filejson)
def compareDB(self, filejson):
self.tablesJson = self.__importDBSchema(filejson)
result = self.__DBSchemaCompare()
return result
def __importDBSchema(self, filein):
tables = {}
try:
fh = open(filein, "r")
jsonData = json.load(fh)
dbmt = jsonData['db-metadata']
tb = jsonData['tables']
keys = tb.keys()
for key in keys:
print "[[ Table <" + key + "> imported ]]"
newTable = TableDefinition()
newTable.importTable(key, dbmt[key], tb[key])
tables[key] = newTable
except Exception, e:
print "ERROR: problem loading json file: " + filein
print e
return tables
def __DBSchemaCompare(self):
# the json database schema definition is what our tools is expecting...
# ...so we use it as the baseline
# look for table, if exists, compare each
# if exists, compare each field
# else, add to unknown tables...or do a fuzzy compare (look at number of fields, field names)
for tableName in self.tablesJson.keys():
table = self.tables[tableName]
print "[[ Comparing Table: " + tableName + " ]]"
if (table):
self.__CompareTable(self.tablesJson[tableName], table)
else:
self.__FuzzyTable()
return
#
# Compare the Table Definitions.
# Compare Table 1 (Json table) to Table 2
#
def __CompareTable(self, tb1, tb2):
fieldsTotalCount = 0
fieldsErrorCount = 0
propTotalCount = 0
propErrorCount = 0
fields1 = tb1.fields
fields2 = tb2.fields
for field in fields1.keys():
field1 = fields1[field]
fieldsTotalCount += 1
if (fields2.has_key(field)):
field2 = fields1[field]
for properties in field1.keys():
propTotalCount += 1
if not field2.has_key(properties):
propErrorCount += 1
else:
fieldsErrorCount += 1
if (propErrorCount == 0 and fieldsErrorCount == 0):
print "100% compatible"
else:
totals = propTotalCount + fieldsTotalCount
errors = propErrorCount + fieldsErrorCount
print "Table difference found: " + str(errors)
#print str((errors/totals) * 100) + '% compatible total == ' + str(totals) + " errors == " + str(errors)
# look at un-identified tables and try to match fields by their properties
def __FuzzyTable():
return
def __openDB(self, filein):
conn = sqlite3.connect(filein)
cur = conn.cursor()
return (conn, cur)
def __readDatabase(self):
for row in self.cur.execute(self.sqlmaster):
newTable = TableDefinition()
newTable.loadTable(row[0], row[1])
self.tableNames.append(newTable.name())
self.tables[newTable.name()] = newTable
return
def debugFingerprint(self):
if self.tables:
myDict = self.tables
elif self.tablesJson:
myDict = self.tablesJson
else:
return
keys = myDict.keys()
for key in keys:
print "[[ TABLE: <" + key + "> ]]"
tableDef = myDict[key]
#print str(tableDef.SQLstr())
tableDef.toJSON()
def writeFingerprint(self, filehandle):
ahash = {}
thash = {}
mhash = {}
dhash = {}
dmhash = {}
ahash['tables'] = thash
ahash['file-metadata'] = mhash
ahash['db-config'] = dhash
ahash['db-metadata'] = dmhash
# metadata
mhash['scanner-name'] = 'dbfp'
mhash['scanner-ver'] = '0.50'
mhash['format-ver'] = '0.90'
# database configuration information
dhash['dn-name'] = self.dbName
# tables
keys = self.tables.keys()
for key in keys:
thash[key] = self.tables[key].fields
dmhash[key] = self.tables[key].SQLstr()
json.dump(ahash, filehandle, sort_keys=True, indent=4)
def getErrorString(self, errorCode):
retval = "ERROR: unknown error code: " + str(errorCode)
if (errorCode == -2):
retval = "ERROR: problem opening file, or not sqlite database"
elif (errorCode == -3):
retval = "ERROR: problem reading database"
return retval
#
#
#
class TableDefinition:
"""
This class represents the definition of database table
"""
tableschemaregex = r'\((.*)\)'
def __init__(self):
self.tableName = ""
self.sqlStr = ""
self.fields = {}
self.primarykeyFlag = False
self.uniqueFlag = False
def loadTable(self, tableName, sqlStr):
self.tableName = tableName
self.sqlStr = sqlStr
results = re.search(self.tableschemaregex, sqlStr)
if results:
colstr = results.group(1)
print "[[ TABLE: <" + tableName + "> ]]"
columns = colstr.split(',')
for col in columns:
newField = self.__parseCreateStr(col.strip())
if newField:
self.fields[newField['name']] = newField
def importTable(self, tbName, sqlStr, fields):
self.tableName = tbName
self.sqlStr = sqlStr
self.fields = fields
# Table Definition
#
# CREATE TABLE contacts (_id INTEGER PRIMARY KEY AUTOINCREMENT,name_raw_contact_id INTEGER REFERENCES raw_contacts(_id),
# photo_id INTEGER REFERENCES data(_id),photo_file_id INTEGER REFERENCES photo_files(_id),
# custom_ringtone TEXT,send_to_voicemail INTEGER NOT NULL DEFAULT 0,
# times_contacted INTEGER NOT NULL DEFAULT 0,last_time_contacted INTEGER,
# starred INTEGER NOT NULL DEFAULT 0,pinned INTEGER NOT NULL DEFAULT 2147483647,
# has_phone_number INTEGER NOT NULL DEFAULT 0,lookup TEXT,
# status_update_id INTEGER REFERENCES data(_id),contact_last_updated_timestamp INTEGER)
#
# CREATE TABLE sent_files_v2 (uid INTEGER, phone TEXT, sphone TEXT, deleted INTEGER,
# PRIMARY KEY (uid, phone)
def __parseCreateStr(self, sqltext):
try:
newField = {}
# photo_id INTEGER REFERENCES data(_id)
# name_raw_contact_id INTEGER REFERENCES raw_contacts(_id)
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+REFERENCESS\s+(\W+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['references'] = 1
newField['referencesdata'] = results.group(3)
return newField
# pinned INTEGER NOT NULL DEFAULT 2147483647
# send_to_voicemail INTEGER NOT NULL DEFAULT 0
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+NOT NULL\s+DEFAULT\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
newField['default'] = results.group(3)
return newField
# pinned INTEGER DEFAULT 2147483647
# send_to_voicemail INTEGER DEFAULT 0
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+DEFAULT\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
newField['default'] = results.group(3)
return newField
# _id INTEGER PRIMARY KEY AUTOINCREMENT
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+PRIMARY KEY\s+AUTOINCREMENT', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
newField['autoincrement'] = True
return newField
# _id INTEGER PRIMARY KEY
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+PRIMARY KEY', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
return newField
# FileID INTEGER NOT NULL
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)\s+NOT NULL', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
return newField
# PRIMARY KEY (field_name,
results = re.match(r'PRIMARY KEY \((?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\,?', sqltext)
if results:
field = self.fields[results.group(1)]
field['primarykey'] = True
self.primarykeyFlag = True
return False
# UNIQUE(field_name,
results = re.match(r'UNIQUE\((?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\,?', sqltext)
if results:
field = self.fields[results.group(1)]
field['unique'] = True
self.uniqueFlag = True;
return False
# custom_ringtone TEXT
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
return newField
# field_name)
results = re.match(r'(?:[`|\"|\'])*(\w+)(?:[`|\"|\'])*(\)?)', sqltext)
if results:
field = self.fields[results.group(1)]
field = self.fields[results.group(2)]
if (self.primarykeyFlag):
field['primarykey'] = True
if (field):
self.primarykeyFlag = False
elif (self.uniqueFlag):
field['unique'] = True
if (field):
self.uniqueFlag = False
return False
print 'INFO: field definition not recognized: "' + sqltext + '"'
# photo_id INTEGER REFERENCES data(_id)
# results = re.match(r'', sqltext)
# if results:
# newField['name'] = results.group(1)
# newField['datatype'] = results.group(2)
# return newField
except Exception, e:
return None
return None
def fields(self):
return self.fields
def toJSON(self):
print json.dumps(self.fields)
def toFile(self, filehandle):
json.dump(self.fields, filehandle, sort_keys=True, indent=4)
def __str__(self):
global delimeter
retstr = ""
retstr = json.dumps(self.fields)
return retstr
def name(self):
return self.tableName
def setSQLstr(self, str):
return self.sqlStr
def SQLstr(self):
return self.sqlStr