dbfp_pub/libs/fingerprint.py

217 lines
5.9 KiB
Python

#
#
#
import re
import json
import sqlite3
delimeter = "|"
#
# Database Schema
#
class DBSchema:
"""
This class represents a complete database schema
"""
sqlmaster = "SELECT name, sql FROM sqlite_master WHERE type='table'"
def __init__(self):
self.conn = None
self.cur = None
self.tableNames = []
self.tables = {}
return
def scanDBFile(self, filein):
# try to open sqlite file
try:
(self.conn, self.cur) = self.__openDB(filein)
except Exception, e:
print e
return -2
# read database schema
try:
self.__readDatabase()
except Exception, e:
print e
return -3
return 1
def __openDB(self, filein):
conn = sqlite3.connect(filein)
cur = conn.cursor()
return (conn, cur)
def __readDatabase(self):
for row in self.cur.execute(self.sqlmaster):
newTable = TableDefinition()
newTable.loadTable(row[0], row[1])
self.tableNames.append(newTable.name())
self.tables[newTable.name()] = newTable
return
def writeFingerprint(self):
keys = self.tables.keys()
for key in keys:
print "[[ TABLE: <" + key + "> ]]"
tableDef = self.tables[key]
# hehehe = tableDef.SQLstr()
print str(tableDef.SQLstr())
tableDef.toJSON()
return
def getErrorString(self, errorCode):
retval = "ERROR: unknown error code: " + str(errorCode)
if (errorCode == -2):
retval = "ERROR: problem opening file, or not sqlite database"
elif (errorCode == -3):
retval = "ERROR: problem reading database"
return retval
#
#
#
class TableDefinition:
"""
This class represents the definition of database table
"""
tableschemaregex = r'\((.*)\)'
def __init__(self):
self.tableName = ""
self.sqlStr = ""
self.fields = {}
# self.pkeys = []
return
def loadTable(self, tableName, sqlStr):
self.tableName = tableName
self.sqlStr = sqlStr
results = re.search(self.tableschemaregex, sqlStr)
if results:
colstr = results.group(1)
print "[[ TABLE: <" + tableName + "> ]]"
# print "FIELDS: " + colstr
columns = colstr.split(',')
for col in columns:
newField = self.__parseCreateStr(col.strip())
if newField:
self.fields[newField['name']] = newField
return
# Table Definition
#
# CREATE TABLE contacts (_id INTEGER PRIMARY KEY AUTOINCREMENT,name_raw_contact_id INTEGER REFERENCES raw_contacts(_id),
# photo_id INTEGER REFERENCES data(_id),photo_file_id INTEGER REFERENCES photo_files(_id),
# custom_ringtone TEXT,send_to_voicemail INTEGER NOT NULL DEFAULT 0,
# times_contacted INTEGER NOT NULL DEFAULT 0,last_time_contacted INTEGER,
# starred INTEGER NOT NULL DEFAULT 0,pinned INTEGER NOT NULL DEFAULT 2147483647,
# has_phone_number INTEGER NOT NULL DEFAULT 0,lookup TEXT,
# status_update_id INTEGER REFERENCES data(_id),contact_last_updated_timestamp INTEGER)
#
# CREATE TABLE sent_files_v2 (uid INTEGER, phone TEXT, sphone TEXT, deleted INTEGER,
# PRIMARY KEY (uid, phone)
def __parseCreateStr(self, sqltext):
try:
newField = {}
# photo_id INTEGER REFERENCES data(_id)
# name_raw_contact_id INTEGER REFERENCES raw_contacts(_id)
results = re.match(r'(\w+)\s+(\w+)\s+REFERENCESS\s+(\W+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['references'] = 1
newField['referencesdata'] = results.group(3)
return newField
# pinned INTEGER NOT NULL DEFAULT 2147483647
# send_to_voicemail INTEGER NOT NULL DEFAULT 0
results = re.match(r'(\w+)\s+(\w+)\s+NOT NULL\s+DEFAULT\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
newField['default'] = results.group(3)
return newField
# _id INTEGER PRIMARY KEY AUTOINCREMENT
results = re.match(r'(\w+)\s+(\w+)\s+PRIMARY KEY\s+AUTOINCREMENT', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
newField['autoincrement'] = True
return newField
# FileID INTEGER NOT NULL
results = re.match(r'(\w+)\s+(\w+)\s+NOT NULL', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
return newField
# PRIMARY KEY (field_name,
results = re.match(r'PRIMARY KEY \((\w+)\,?', sqltext)
if results:
field = self.fields[results.group(1)]
field['primarykey'] = True
return False
# custom_ringtone TEXT
results = re.match(r'(\w+)\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
return newField
# field_name)
results = re.match(r'\,?(\w+)\)', sqltext)
if results:
field = self.fields[results.group(1)]
field['primarykey'] = True
return False
print 'INFO: field definition not recognized: "' + sqltext + '"'
# photo_id INTEGER REFERENCES data(_id)
# results = re.match(r'', sqltext)
# if results:
# newField['name'] = results.group(1)
# newField['datatype'] = results.group(2)
# return newField
except Exception, e:
return None
return None
def toJSON(self):
print json.dumps(self.fields)
def __str__(self):
global delimeter
retstr = ""
retstr = json.dumps(self.fields)
return retstr
def name(self):
return self.tableName
def SQLstr(self):
return self.sqlStr