source: trunk/lib/nanownlib/storage.py @ 11

Last change on this file since 11 was 11, checked in by tim, 9 years ago

.

File size: 10.0 KB
Line 
1#-*- mode: Python;-*-
2
3import sys
4import os
5import uuid
6import random
7import threading
8import sqlite3
9
10import numpy
11# Don't trust numpy's seeding
12numpy.random.seed(random.SystemRandom().randint(0,2**32-1))
13
14def _newid():
15    return uuid.uuid4().hex
16
17
18class db(threading.local):
19    conn = None
20    cursor = None
21    _population_sizes = None
22    _population_cache = None
23    _offset_cache = None
24    _cur_offsets = None
25   
26    def __init__(self, path):
27        exists = os.path.exists(path)
28        self.conn = sqlite3.connect(path)
29        self.conn.execute("PRAGMA foreign_keys = ON;")
30        self.conn.row_factory = sqlite3.Row
31        self._population_sizes = {}
32        self._population_cache = {}
33        self._offset_cache = {}
34        self._cur_offsets = {}
35       
36        if not exists:
37            self.conn.execute(
38                """CREATE TABLE meta (id BLOB PRIMARY KEY,
39                                      tcpts_mean REAL,
40                                      tcpts_stddev REAL,
41                                      tcpts_slopes TEXT)
42                """)
43
44            self.conn.execute(
45                """CREATE TABLE probes (id BLOB PRIMARY KEY,
46                                        sample INTEGER,
47                                        test_case TEXT,
48                                        type TEXT,
49                                        tc_order INTEGER,
50                                        time_of_day INTEGER,
51                                        local_port INTEGER,
52                                        reported INTEGER,
53                                        userspace_rtt INTEGER,
54                                        UNIQUE (sample, test_case))
55                """)
56
57            self.conn.execute(
58                """CREATE TABLE packets (id BLOB PRIMARY KEY,
59                                         probe_id REFERENCES probes(id) ON DELETE CASCADE,
60                                         sent INTEGER,
61                                         observed INTEGER,
62                                         tsval INTEGER,
63                                         payload_len INTEGER,
64                                         tcpseq INTEGER,
65                                         tcpack INTEGER)
66                """)
67
68            self.conn.execute(
69                """CREATE TABLE analysis (id BLOB PRIMARY KEY,
70                                          probe_id UNIQUE REFERENCES probes(id) ON DELETE CASCADE,
71                                          suspect TEXT,
72                                          packet_rtt INTEGER,
73                                          tsval_rtt INTEGER)
74                """)
75
76            self.conn.execute(
77                """CREATE TABLE trim_analysis (id BLOB PRIMARY KEY,
78                                               probe_id REFERENCES probes(id) ON DELETE CASCADE,
79                                               suspect TEXT,
80                                               packet_rtt INTEGER,
81                                               tsval_rtt INTEGER,
82                                               sent_trimmed INTEGER,
83                                               rcvd_trimmed INTEGER)
84                """)
85
86            self.conn.execute(
87                """CREATE TABLE classifier_results (id BLOB PRIMARY KEY,
88                                                    classifier TEXT,
89                                                    trial_type TEXT,
90                                                    num_observations INTEGER,
91                                                    num_trials INTEGER,
92                                                    params TEXT,
93                                                    false_positives REAL,
94                                                    false_negatives REAL)
95                """)
96
97    def __del__(self):
98        if self.conn:
99            self.conn.commit()
100            self.conn.close()
101
102   
103    def populationSize(self, probe_type):
104        if probe_type in self._population_sizes:
105            return self._population_sizes[probe_type]
106
107        try:
108            cursor = self.conn.cursor()
109            cursor.execute("SELECT max(c) FROM (SELECT count(sample) c FROM probes WHERE type=? GROUP BY test_case)", (probe_type,))
110            self._population_sizes[probe_type] = cursor.fetchone()[0]
111            return self._population_sizes[probe_type]
112        except Exception as e:
113            print(e)
114            return 0
115
116
117    def subseries(self, probe_type, unusual_case, size=None, offset=None):
118        cache_key = (probe_type,unusual_case)
119        if cache_key not in self._population_cache:
120            query="""
121            SELECT packet_rtt AS unusual_packet,
122                   (SELECT avg(packet_rtt) FROM probes,analysis
123                    WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_packet,
124
125                   tsval_rtt AS unusual_tsval,
126                   (SELECT avg(tsval_rtt) FROM probes,analysis
127                    WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_tsval,
128
129                   reported AS unusual_reported,
130                   (SELECT avg(reported) FROM probes,analysis
131                    WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_reported
132
133            FROM   (SELECT probes.sample,packet_rtt,tsval_rtt,reported FROM probes,analysis
134                    WHERE analysis.probe_id=probes.id AND probes.test_case =:unusual_case AND probes.type=:probe_type) u
135            """
136   
137            params = {"probe_type":probe_type, "unusual_case":unusual_case}
138            cursor = self.conn.cursor()
139            cursor.execute(query, params)
140            p = [dict(row) for row in cursor.fetchall()]
141            self._population_cache[cache_key] = p
142            self._offset_cache[cache_key] = tuple(numpy.random.random_integers(0,len(p)-1, len(p)/5))
143            self._cur_offsets[cache_key] = 0
144
145        population = self._population_cache[cache_key]
146
147        if size == None or size > len(population):
148            size = len(population)
149        if offset == None or offset >= len(population) or offset < 0:
150            offset = self._offset_cache[cache_key][self._cur_offsets[cache_key]]
151            self._cur_offsets[cache_key] = (offset + 1) % len(self._offset_cache[cache_key])
152       
153        try:
154            offset = int(offset)
155            size = int(size)
156        except Exception as e:
157            print(e, offset, size)
158            return None
159       
160        ret_val = population[offset:offset+size]
161        if len(ret_val) < size:
162            ret_val += population[0:size-len(ret_val)]
163       
164        return ret_val
165   
166   
167    def resetOffsets(self):
168        for k in self._cur_offsets.keys():
169            self._cur_offsets[k] = 0
170
171           
172    def clearCache(self):
173        self._population_cache = {}
174        self._offset_cache = {}
175        self._cur_offsets = {}
176
177       
178    def _insert(self, table, row):
179        rid = _newid()
180        keys = row.keys()
181        columns = ','.join(keys)
182        placeholders = ':'+', :'.join(keys)
183        query = "INSERT INTO %s (id,%s) VALUES ('%s',%s)" % (table, columns, rid, placeholders)
184        #print(row)
185        self.conn.execute(query, row)
186        return rid
187
188    def addMeta(self, meta):
189        ret_val = self._insert('meta', meta)
190        self.conn.commit()
191        return ret_val
192   
193    def addProbes(self, p):
194        return [self._insert('probes', row) for row in p]
195
196    def addPackets(self, pkts, window_size):
197        query = ("INSERT INTO packets (id,probe_id,sent,observed,tsval,payload_len,tcpseq,tcpack)"
198                 " VALUES(randomblob(16),"
199                 "(SELECT id FROM probes WHERE local_port=:local_port AND :observed>time_of_day"
200                 " AND :observed<time_of_day+userspace_rtt+%d" 
201                 " ORDER BY time_of_day ASC LIMIT 1),"
202                 ":sent,:observed,:tsval,:payload_len,:tcpseq,:tcpack)") % window_size
203        self.conn.execute("PRAGMA foreign_keys = OFF;")
204        self.conn.execute("CREATE INDEX IF NOT EXISTS probes_port ON probes (local_port)")
205        cursor = self.conn.cursor()
206        #print(query, list(pkts)[0:3])
207        cursor.executemany(query, pkts)
208        self.conn.commit()
209        self.conn.execute("PRAGMA foreign_keys = ON;")
210
211    def addAnalyses(self, analyses):
212        return [self._insert('analysis', row) for row in analyses]
213
214    def addTrimAnalyses(self, analyses):
215        return [self._insert('trim_analysis', row) for row in analyses]
216
217    def addClassifierResult(self, results):
218        ret_val = self._insert('classifier_results', results)
219        self.conn.commit()
220        return ret_val
221
222    def fetchClassifierResult(self, classifier, trial_type, num_observations, params=None):
223        query = """
224          SELECT * FROM classifier_results
225            WHERE classifier=:classifier
226                  AND trial_type=:trial_type
227                  AND num_observations=:num_observations"""
228        if params != None:
229            query += """
230                  AND params=:params"""
231        query += """
232            ORDER BY false_positives+false_negatives
233            LIMIT 1
234        """
235
236        qparams = {'classifier':classifier, 'trial_type':trial_type,
237                   'num_observations':num_observations,'params':params}
238        cursor = self.conn.cursor()
239        cursor.execute(query, qparams)
240        ret_val = cursor.fetchone()
241        if ret_val != None:
242            ret_val = dict(ret_val)
243        return ret_val
244   
245    def deleteClassifierResults(self, classifier, trial_type, num_observations=None):
246        params = {"classifier":classifier,"trial_type":trial_type,"num_observations":num_observations}
247        query = """
248          DELETE FROM classifier_results
249          WHERE classifier=:classifier AND trial_type=:trial_type
250        """
251        if num_observations != None:
252            query += " AND num_observations=:num_observations"
253       
254        self.conn.execute(query, params)
255        self.conn.commit()
256       
Note: See TracBrowser for help on using the repository browser.