source: trunk/lib/nanownlib/storage.py @ 7

Last change on this file since 7 was 7, checked in by tim, 9 years ago

.

File size: 5.8 KB
Line 
1#-*- mode: Python;-*-
2
3import sys
4import os
5import uuid
6import threading
7import sqlite3
8
9def _newid():
10    return uuid.uuid4().hex
11
12
13class db(threading.local):
14    conn = None
15    cursor = None
16    _population_sizes = None
17   
18    def __init__(self, path):
19        exists = os.path.exists(path)
20        self.conn = sqlite3.connect(path)
21        self.conn.execute("PRAGMA foreign_keys = ON;")
22        self.conn.row_factory = sqlite3.Row
23        self._population_sizes = {}
24       
25        if not exists:
26            self.conn.execute(
27                """CREATE TABLE meta (id BLOB PRIMARY KEY,
28                                      tcpts_mean REAL,
29                                      tcpts_stddev REAL,
30                                      tcpts_slopes TEXT)
31                """)
32
33            self.conn.execute(
34                """CREATE TABLE probes (id BLOB PRIMARY KEY,
35                                        sample INTEGER,
36                                        test_case TEXT,
37                                        type TEXT,
38                                        tc_order INTEGER,
39                                        time_of_day INTEGER,
40                                        local_port INTEGER,
41                                        reported INTEGER,
42                                        userspace_rtt INTEGER,
43                                        UNIQUE (sample, test_case))
44                """)
45
46            self.conn.execute(
47                """CREATE TABLE packets (id BLOB PRIMARY KEY,
48                                         probe_id REFERENCES probes(id) ON DELETE CASCADE,
49                                         sent INTEGER,
50                                         observed INTEGER,
51                                         tsval INTEGER,
52                                         payload_len INTEGER,
53                                         tcpseq INTEGER,
54                                         tcpack INTEGER)
55                """)
56
57            self.conn.execute(
58                """CREATE TABLE analysis (id BLOB PRIMARY KEY,
59                                          probe_id UNIQUE REFERENCES probes(id) ON DELETE CASCADE,
60                                          suspect TEXT,
61                                          packet_rtt INTEGER,
62                                          tsval_rtt INTEGER)
63                """)
64
65            self.conn.execute(
66                """CREATE TABLE trim_analysis (id BLOB PRIMARY KEY,
67                                               probe_id REFERENCES probes(id) ON DELETE CASCADE,
68                                               suspect TEXT,
69                                               packet_rtt INTEGER,
70                                               tsval_rtt INTEGER,
71                                               sent_trimmed INTEGER,
72                                               rcvd_trimmed INTEGER)
73                """)
74
75            self.conn.execute(
76                """CREATE TABLE classifier_results (id BLOB PRIMARY KEY,
77                                                    algorithm TEXT,
78                                                    params TEXT,
79                                                    sample_size INTEGER,
80                                                    num_trials INTEGER,
81                                                    trial_type TEXT,
82                                                    false_positives REAL,
83                                                    false_negatives REAL)
84                """)
85
86    def __del__(self):
87        if self.conn:
88            self.conn.commit()
89            self.conn.close()
90
91    def populationSize(self, probe_type):
92        if probe_type in self._population_sizes:
93            return self._population_sizes[probe_type]
94
95        try:
96            cursor = self.conn.cursor()
97            cursor.execute("SELECT max(c) FROM (SELECT count(sample) c FROM probes WHERE type=? GROUP BY test_case)", (probe_type,))
98            self._population_sizes[probe_type] = cursor.fetchone()[0]
99            return self._population_sizes[probe_type]
100        except Exception as e:
101            print(e)
102            return 0
103       
104    def _insert(self, table, row):
105        rid = _newid()
106        keys = row.keys()
107        columns = ','.join(keys)
108        placeholders = ':'+', :'.join(keys)
109        query = "INSERT INTO %s (id,%s) VALUES ('%s',%s)" % (table, columns, rid, placeholders)
110        #print(row)
111        self.conn.execute(query, row)
112        return rid
113
114    def addMeta(self, meta):
115        ret_val = self._insert('meta', meta)
116        self.conn.commit()
117        return ret_val
118   
119    def addProbes(self, p):
120        return [self._insert('probes', row) for row in p]
121
122    def addPackets(self, pkts, window_size):
123        query = ("INSERT INTO packets (id,probe_id,sent,observed,tsval,payload_len,tcpseq,tcpack)"
124                 " VALUES(randomblob(16),"
125                 "(SELECT id FROM probes WHERE local_port=:local_port AND :observed>time_of_day"
126                 " AND :observed<time_of_day+userspace_rtt+%d" 
127                 " ORDER BY time_of_day ASC LIMIT 1),"
128                 ":sent,:observed,:tsval,:payload_len,:tcpseq,:tcpack)") % window_size
129        self.conn.execute("PRAGMA foreign_keys = OFF;")
130        self.conn.execute("CREATE INDEX IF NOT EXISTS probes_port ON probes (local_port)")
131        cursor = self.conn.cursor()
132        #print(query, list(pkts)[0:3])
133        cursor.executemany(query, pkts)
134        self.conn.commit()
135        self.conn.execute("PRAGMA foreign_keys = ON;")
136
137    def addAnalyses(self, analyses):
138        return [self._insert('analysis', row) for row in analyses]
139
140    def addTrimAnalyses(self, analyses):
141        return [self._insert('trim_analysis', row) for row in analyses]
142
143    def addClassifierResults(self, results):
144        ret_val = self._insert('classifier_results', results)
145        self.conn.commit()
146        return ret_val
Note: See TracBrowser for help on using the repository browser.