source: trunk/lib/nanownlib/storage.py @ 24

Last change on this file since 24 was 22, checked in by tim, 9 years ago

Fixed a bug in processPackets where trim parameters weren't passed through

Reorganized analyzeProbes to work as a O(max(S,R)) algorithm, rather than O(S*R)

Sped up addTrimAnalyses by using a new _insertMany primative

File size: 11.7 KB
Line 
1#-*- mode: Python;-*-
2
3import sys
4import os
5import uuid
6import random
7import threading
8import sqlite3
9try:
10    import numpy
11except:
12    sys.stderr.write('ERROR: Could not import numpy module.  Ensure it is installed.\n')
13    sys.stderr.write('       Under Debian, the package name is "python3-numpy"\n.')
14    sys.exit(1)
15
16# Don't trust numpy's seeding
17numpy.random.seed(random.SystemRandom().randint(0,2**32-1))
18
19def _newid():
20    return uuid.uuid4().hex
21
22
23class db(threading.local):
24    conn = None
25    cursor = None
26    _population_sizes = None
27    _population_cache = None
28    _offset_cache = None
29    _cur_offsets = None
30   
31    def __init__(self, path):
32        exists = os.path.exists(path)
33        self.conn = sqlite3.connect(path)
34        self.conn.execute("PRAGMA foreign_keys = ON;")
35        self.conn.row_factory = sqlite3.Row
36        self._population_sizes = {}
37        self._population_cache = {}
38        self._offset_cache = {}
39        self._cur_offsets = {}
40       
41        if not exists:
42            self.conn.execute(
43                """CREATE TABLE meta (id BLOB PRIMARY KEY,
44                                      tcpts_mean REAL,
45                                      tcpts_stddev REAL,
46                                      tcpts_slopes TEXT,
47                                      unusual_case TEXT,
48                                      greater INTEGER)
49                """)
50
51            self.conn.execute(
52                """CREATE TABLE probes (id BLOB PRIMARY KEY,
53                                        sample INTEGER,
54                                        test_case TEXT,
55                                        type TEXT,
56                                        tc_order INTEGER,
57                                        time_of_day INTEGER,
58                                        local_port INTEGER,
59                                        reported INTEGER,
60                                        userspace_rtt INTEGER,
61                                        UNIQUE (sample, test_case))
62                """)
63
64            self.conn.execute(
65                """CREATE TABLE packets (id BLOB PRIMARY KEY,
66                                         probe_id REFERENCES probes(id) ON DELETE CASCADE,
67                                         sent INTEGER,
68                                         observed INTEGER,
69                                         tsval INTEGER,
70                                         payload_len INTEGER,
71                                         tcpseq INTEGER,
72                                         tcpack INTEGER)
73                """)
74
75            self.conn.execute(
76                """CREATE TABLE analysis (id BLOB PRIMARY KEY,
77                                          probe_id UNIQUE REFERENCES probes(id) ON DELETE CASCADE,
78                                          suspect TEXT,
79                                          packet_rtt INTEGER,
80                                          tsval_rtt INTEGER)
81                """)
82
83            self.conn.execute(
84                """CREATE TABLE trim_analysis (id BLOB PRIMARY KEY,
85                                               probe_id REFERENCES probes(id) ON DELETE CASCADE,
86                                               suspect TEXT,
87                                               packet_rtt INTEGER,
88                                               tsval_rtt INTEGER,
89                                               sent_trimmed INTEGER,
90                                               rcvd_trimmed INTEGER)
91                """)
92
93            self.conn.execute(
94                """CREATE TABLE classifier_results (id BLOB PRIMARY KEY,
95                                                    classifier TEXT,
96                                                    trial_type TEXT,
97                                                    num_observations INTEGER,
98                                                    num_trials INTEGER,
99                                                    params TEXT,
100                                                    false_positives REAL,
101                                                    false_negatives REAL)
102                """)
103
104    def __del__(self):
105        if self.conn:
106            self.conn.commit()
107            self.conn.close()
108
109   
110    def populationSize(self, probe_type):
111        if probe_type in self._population_sizes:
112            return self._population_sizes[probe_type]
113
114        try:
115            cursor = self.conn.cursor()
116            cursor.execute("SELECT max(c) FROM (SELECT count(sample) c FROM probes WHERE type=? GROUP BY test_case)", (probe_type,))
117            self._population_sizes[probe_type] = cursor.fetchone()[0]
118            return self._population_sizes[probe_type]
119        except Exception as e:
120            print(e)
121            return 0
122
123
124    def subseries(self, probe_type, unusual_case, size=None, offset=None):
125        cache_key = (probe_type,unusual_case)
126        if cache_key not in self._population_cache:
127            query="""
128            SELECT packet_rtt AS unusual_packet,
129                   (SELECT avg(packet_rtt) FROM probes,analysis
130                    WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_packet,
131
132                   tsval_rtt AS unusual_tsval,
133                   (SELECT avg(tsval_rtt) FROM probes,analysis
134                    WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_tsval,
135
136                   reported AS unusual_reported,
137                   (SELECT avg(reported) FROM probes,analysis
138                    WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_reported
139
140            FROM   (SELECT probes.sample,packet_rtt,tsval_rtt,reported FROM probes,analysis
141                    WHERE analysis.probe_id=probes.id AND probes.test_case =:unusual_case AND probes.type=:probe_type) u
142            """
143   
144            params = {"probe_type":probe_type, "unusual_case":unusual_case}
145            cursor = self.conn.cursor()
146            cursor.execute(query, params)
147            p = [dict(row) for row in cursor.fetchall()]
148            self._population_cache[cache_key] = p
149            self._offset_cache[cache_key] = tuple(numpy.random.random_integers(0,len(p)-1, len(p)/5))
150            self._cur_offsets[cache_key] = 0
151
152        population = self._population_cache[cache_key]
153
154        if size == None or size > len(population):
155            size = len(population)
156        if offset == None or offset >= len(population) or offset < 0:
157            offset = self._offset_cache[cache_key][self._cur_offsets[cache_key]]
158            self._cur_offsets[cache_key] = (offset + 1) % len(self._offset_cache[cache_key])
159       
160        try:
161            offset = int(offset)
162            size = int(size)
163        except Exception as e:
164            print(e, offset, size)
165            return None
166       
167        ret_val = population[offset:offset+size]
168        if len(ret_val) < size:
169            ret_val += population[0:size-len(ret_val)]
170       
171        return ret_val
172   
173   
174    def resetOffsets(self):
175        for k in self._cur_offsets.keys():
176            self._cur_offsets[k] = 0
177
178           
179    def clearCache(self):
180        self._population_cache = {}
181        self._offset_cache = {}
182        self._cur_offsets = {}
183
184       
185    def _insert(self, table, row):
186        rid = _newid()
187        keys = row.keys()
188        columns = ','.join(keys)
189        placeholders = ':'+', :'.join(keys)
190        query = "INSERT INTO %s (id,%s) VALUES ('%s',%s)" % (table, columns, rid, placeholders)
191        #print(query,row)
192        self.conn.execute(query, row)
193        return rid
194
195    def _insertMany(self, table, rows):
196        if len(rows) < 1:
197            return
198       
199        keys = rows[0].keys()
200        columns = ','.join(keys)
201        placeholders = ':'+', :'.join(keys)
202        query = "INSERT INTO %s (id,%s) VALUES (hex(randomblob(16)),%s)" % (table, columns, placeholders)
203        #print(query,row)
204        self.conn.executemany(query, rows)
205   
206    def addMeta(self, meta):
207        ret_val = self._insert('meta', meta)
208        self.conn.commit()
209        return ret_val
210   
211    def addProbes(self, p):
212        return [self._insert('probes', row) for row in p]
213
214    def addPackets(self, pkts, window_size):
215        query = ("INSERT INTO packets (id,probe_id,sent,observed,tsval,payload_len,tcpseq,tcpack)"
216                 " VALUES(hex(randomblob(16)),"
217                 "(SELECT id FROM probes WHERE local_port=:local_port AND :observed>time_of_day"
218                 " AND :observed<time_of_day+userspace_rtt+%d" 
219                 " ORDER BY time_of_day ASC LIMIT 1),"
220                 ":sent,:observed,:tsval,:payload_len,:tcpseq,:tcpack)") % window_size
221        self.conn.execute("PRAGMA foreign_keys = OFF;")
222        self.conn.execute("CREATE INDEX IF NOT EXISTS probes_port ON probes (local_port)")
223        cursor = self.conn.cursor()
224        #print(query, list(pkts)[0:3])
225        cursor.executemany(query, pkts)
226        self.conn.commit()
227        self.conn.execute("PRAGMA foreign_keys = ON;")
228
229    def addAnalyses(self, analyses):
230        return [self._insert('analysis', row) for row in analyses]
231
232    def addTrimAnalyses(self, analyses):
233        self._insertMany('trim_analysis', analyses)
234
235    def addClassifierResult(self, results):
236        ret_val = self._insert('classifier_results', results)
237        self.conn.commit()
238        return ret_val
239
240    def fetchClassifierResult(self, classifier, trial_type, num_observations, params=None):
241        query = """
242          SELECT * FROM classifier_results
243            WHERE classifier=:classifier
244                  AND trial_type=:trial_type
245                  AND num_observations=:num_observations"""
246        if params != None:
247            query += """
248                  AND params=:params"""
249        query += """
250            ORDER BY false_positives+false_negatives
251            LIMIT 1
252        """
253
254        qparams = {'classifier':classifier, 'trial_type':trial_type,
255                   'num_observations':num_observations,'params':params}
256        cursor = self.conn.cursor()
257        cursor.execute(query, qparams)
258        ret_val = cursor.fetchone()
259        if ret_val != None:
260            ret_val = dict(ret_val)
261        return ret_val
262   
263    def deleteClassifierResults(self, classifier, trial_type, num_observations=None):
264        params = {"classifier":classifier,"trial_type":trial_type,"num_observations":num_observations}
265        query = """
266          DELETE FROM classifier_results
267          WHERE classifier=:classifier AND trial_type=:trial_type
268        """
269        if num_observations != None:
270            query += " AND num_observations=:num_observations"
271       
272        self.conn.execute(query, params)
273        self.conn.commit()
274   
275    def setUnusualCase(self, unusual_case, greater):
276        query = """SELECT * FROM meta LIMIT 1"""
277        cursor = self.conn.cursor()
278        cursor.execute(query)
279        row = cursor.fetchone()
280        if row == None:
281            params = {"id":_newid()}
282        else:
283            params = dict(row)
284
285        params["unusual_case"]=unusual_case
286        params["greater"]=greater
287       
288        keys = params.keys()
289        columns = ','.join(keys)
290        placeholders = ':'+', :'.join(keys)
291       
292        query = """INSERT OR REPLACE INTO meta (%s) VALUES (%s)""" % (columns, placeholders)
293        cursor.execute(query, params)
294       
295       
296    def getUnusualCase(self):
297        query = """SELECT unusual_case,greater FROM meta LIMIT 1"""
298        cursor = self.conn.cursor()
299        cursor.execute(query)
300        row = cursor.fetchone()
301        if row == None or row[0] == None or row[1] == None:
302            return None
303        else:
304            return tuple(row)
Note: See TracBrowser for help on using the repository browser.