init checkin

This commit is contained in:
JohnE 2015-06-06 21:16:43 -07:00
commit bd7301eae5
4 changed files with 287 additions and 0 deletions

44
.gitignore vendored Normal file
View File

@ -0,0 +1,44 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/

0
libs/__init__.py Executable file
View File

210
libs/fingerprint.py Normal file
View File

@ -0,0 +1,210 @@
#
#
#
import re
import json
import sqlite3
delimeter = "|"
def scanDB(filein):
# try to open sqlite file
try:
(conn, cur) = __openDB(filein)
except Exception, e:
return -2
# read database schema
try:
dbSchema = DBSchema()
dbSchema.readDatabase(cur)
except Exception, e:
print e
return -3
return 1
def __openDB(filein):
conn = sqlite3.connect(filein)
cur = conn.cursor()
return (conn, cur)
def writeFingerprint():
return
def printDB():
return
def getErrorString(errorCode):
retval = "ERROR: unknown error code: " + str(errorCode)
if (errorCode == -2):
retval = "ERROR: problem opening file, or not sqlite database"
elif (errorCode == -3):
retval = "ERROR: problem reading database"
return retval
#
# Database Schema
#
class DBSchema:
"""
This class represents a complete database schema
"""
sqlmaster = "SELECT name, sql FROM sqlite_master WHERE type='table'"
def __init__(self):
self.tableNames = []
self.tables = {}
return
def readDatabase(self, cur):
for row in cur.execute(self.sqlmaster):
newTable = TableDefinition()
newTable.loadTable(row[0], row[1])
self.tableNames.append(newTable.name())
self.tables[newTable.name()] = newTable
return
#
#
#
class TableDefinition:
"""
This class represents the definition of database table
"""
tableschemaregex = r'\((.*)\)'
def __init__(self):
self.tableName = ""
self.sqlStr = ""
self.fields = {}
# self.pkeys = []
return
def loadTable(self, tableName, sqlStr):
self.tableName = tableName
self.sqlStr = sqlStr
results = re.search(self.tableschemaregex, sqlStr)
if results:
colstr = results.group(1)
print "[[ TABLE: <" + tableName + "> ]]"
print "FIELDS: " + colstr
columns = colstr.split(',')
for col in columns:
newField = self.__parseCreateStr(col.strip())
if newField:
self.fields[newField['name']] = newField
return
# Table Definition
#
# CREATE TABLE contacts (_id INTEGER PRIMARY KEY AUTOINCREMENT,name_raw_contact_id INTEGER REFERENCES raw_contacts(_id),
# photo_id INTEGER REFERENCES data(_id),photo_file_id INTEGER REFERENCES photo_files(_id),
# custom_ringtone TEXT,send_to_voicemail INTEGER NOT NULL DEFAULT 0,
# times_contacted INTEGER NOT NULL DEFAULT 0,last_time_contacted INTEGER,
# starred INTEGER NOT NULL DEFAULT 0,pinned INTEGER NOT NULL DEFAULT 2147483647,
# has_phone_number INTEGER NOT NULL DEFAULT 0,lookup TEXT,
# status_update_id INTEGER REFERENCES data(_id),contact_last_updated_timestamp INTEGER)
#
# CREATE TABLE sent_files_v2 (uid INTEGER, phone TEXT, sphone TEXT, deleted INTEGER,
# PRIMARY KEY (uid, phone)
def __parseCreateStr(self, sqltext):
try:
newField = {}
# photo_id INTEGER REFERENCES data(_id)
# name_raw_contact_id INTEGER REFERENCES raw_contacts(_id)
results = re.match(r'(\w+)\s+(\w+)\s+REFERENCESS\s+(\W+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['references'] = 1
newField['referencesdata'] = results.group(3)
return newField
# pinned INTEGER NOT NULL DEFAULT 2147483647
# send_to_voicemail INTEGER NOT NULL DEFAULT 0
results = re.match(r'(\w+)\s+(\w+)\s+NOT NULL\s+DEFAULT\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
newField['default'] = results.group(3)
return newField
# _id INTEGER PRIMARY KEY AUTOINCREMENT
results = re.match(r'(\w+)\s+(\w+)\s+PRIMARY KEY\s+AUTOINCREMENT', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['primarykey'] = True
newField['autoincrement'] = True
return newField
# FileID INTEGER NOT NULL
results = re.match(r'(\w+)\s+(\w+)\s+NOT NULL', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
newField['notnull'] = True
return newField
# custom_ringtone TEXT
results = re.match(r'(\w+)\s+(\w+)', sqltext)
if results:
newField['name'] = results.group(1)
newField['datatype'] = results.group(2)
return newField
# PRIMARY KEY (field_name,
results = re.match(r'PRIMARY KEY \((\w+)\,', sqltext)
if results:
field = self.fields[results.group(1)]
field['primarykey'] = True
return False
# field_name)
results = re.match(r'(\w+)\)', sqltext)
if results:
field = self.fields[results.group(1)]
field['primarykey'] = True
return False
print 'INFO: field definition not recognized: "' + sqltext + '"'
# photo_id INTEGER REFERENCES data(_id)
# results = re.match(r'', sqltext)
# if results:
# newField['name'] = results.group(1)
# newField['datatype'] = results.group(2)
# return newField
except Exception, e:
return None
return None
def toJSON(self):
def __str__(self):
global delimeter
retstr = ""
return retstr
def name(self):
return self.tableName
class TableField:
def __init__(self):
return

33
main.py Normal file
View File

@ -0,0 +1,33 @@
#
#
#
import argparse
import time
from libs import fingerprint
def main():
(filein, verbose) = parseArgs()
retVal = fingerprint.scanDB(filein)
if (retVal > 0):
fingerprint.writeFingerprint()
else:
print fingerprint.getErrorString(retVal)
def parseArgs():
verbose = False
timestr = time.strftime('%Y-%m-%d_%H%M%S', time.localtime(time.time()))
#outfile = "telegram-data_" + timestr
parser = argparse.ArgumentParser(description='Fingerprint a sqlite database based on its schema')
parser.add_argument('-f', '--file', required=True)
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
if (args.verbose):
verbose = args.verbose
return (args.file, verbose)
if __name__ == "__main__":
main()