1 | #-*- mode: Python;-*- |
---|
2 | |
---|
3 | import sys |
---|
4 | import os |
---|
5 | import uuid |
---|
6 | import random |
---|
7 | import threading |
---|
8 | import sqlite3 |
---|
9 | |
---|
10 | import numpy |
---|
11 | # Don't trust numpy's seeding |
---|
12 | numpy.random.seed(random.SystemRandom().randint(0,2**32-1)) |
---|
13 | |
---|
14 | def _newid(): |
---|
15 | return uuid.uuid4().hex |
---|
16 | |
---|
17 | |
---|
18 | class db(threading.local): |
---|
19 | conn = None |
---|
20 | cursor = None |
---|
21 | _population_sizes = None |
---|
22 | _population_cache = None |
---|
23 | _offset_cache = None |
---|
24 | _cur_offsets = None |
---|
25 | |
---|
26 | def __init__(self, path): |
---|
27 | exists = os.path.exists(path) |
---|
28 | self.conn = sqlite3.connect(path) |
---|
29 | self.conn.execute("PRAGMA foreign_keys = ON;") |
---|
30 | self.conn.row_factory = sqlite3.Row |
---|
31 | self._population_sizes = {} |
---|
32 | self._population_cache = {} |
---|
33 | self._offset_cache = {} |
---|
34 | self._cur_offsets = {} |
---|
35 | |
---|
36 | if not exists: |
---|
37 | self.conn.execute( |
---|
38 | """CREATE TABLE meta (id BLOB PRIMARY KEY, |
---|
39 | tcpts_mean REAL, |
---|
40 | tcpts_stddev REAL, |
---|
41 | tcpts_slopes TEXT) |
---|
42 | """) |
---|
43 | |
---|
44 | self.conn.execute( |
---|
45 | """CREATE TABLE probes (id BLOB PRIMARY KEY, |
---|
46 | sample INTEGER, |
---|
47 | test_case TEXT, |
---|
48 | type TEXT, |
---|
49 | tc_order INTEGER, |
---|
50 | time_of_day INTEGER, |
---|
51 | local_port INTEGER, |
---|
52 | reported INTEGER, |
---|
53 | userspace_rtt INTEGER, |
---|
54 | UNIQUE (sample, test_case)) |
---|
55 | """) |
---|
56 | |
---|
57 | self.conn.execute( |
---|
58 | """CREATE TABLE packets (id BLOB PRIMARY KEY, |
---|
59 | probe_id REFERENCES probes(id) ON DELETE CASCADE, |
---|
60 | sent INTEGER, |
---|
61 | observed INTEGER, |
---|
62 | tsval INTEGER, |
---|
63 | payload_len INTEGER, |
---|
64 | tcpseq INTEGER, |
---|
65 | tcpack INTEGER) |
---|
66 | """) |
---|
67 | |
---|
68 | self.conn.execute( |
---|
69 | """CREATE TABLE analysis (id BLOB PRIMARY KEY, |
---|
70 | probe_id UNIQUE REFERENCES probes(id) ON DELETE CASCADE, |
---|
71 | suspect TEXT, |
---|
72 | packet_rtt INTEGER, |
---|
73 | tsval_rtt INTEGER) |
---|
74 | """) |
---|
75 | |
---|
76 | self.conn.execute( |
---|
77 | """CREATE TABLE trim_analysis (id BLOB PRIMARY KEY, |
---|
78 | probe_id REFERENCES probes(id) ON DELETE CASCADE, |
---|
79 | suspect TEXT, |
---|
80 | packet_rtt INTEGER, |
---|
81 | tsval_rtt INTEGER, |
---|
82 | sent_trimmed INTEGER, |
---|
83 | rcvd_trimmed INTEGER) |
---|
84 | """) |
---|
85 | |
---|
86 | self.conn.execute( |
---|
87 | """CREATE TABLE classifier_results (id BLOB PRIMARY KEY, |
---|
88 | classifier TEXT, |
---|
89 | trial_type TEXT, |
---|
90 | num_observations INTEGER, |
---|
91 | num_trials INTEGER, |
---|
92 | params TEXT, |
---|
93 | false_positives REAL, |
---|
94 | false_negatives REAL) |
---|
95 | """) |
---|
96 | |
---|
97 | def __del__(self): |
---|
98 | if self.conn: |
---|
99 | self.conn.commit() |
---|
100 | self.conn.close() |
---|
101 | |
---|
102 | |
---|
103 | def populationSize(self, probe_type): |
---|
104 | if probe_type in self._population_sizes: |
---|
105 | return self._population_sizes[probe_type] |
---|
106 | |
---|
107 | try: |
---|
108 | cursor = self.conn.cursor() |
---|
109 | cursor.execute("SELECT max(c) FROM (SELECT count(sample) c FROM probes WHERE type=? GROUP BY test_case)", (probe_type,)) |
---|
110 | self._population_sizes[probe_type] = cursor.fetchone()[0] |
---|
111 | return self._population_sizes[probe_type] |
---|
112 | except Exception as e: |
---|
113 | print(e) |
---|
114 | return 0 |
---|
115 | |
---|
116 | |
---|
117 | def subseries(self, probe_type, unusual_case, size=None, offset=None): |
---|
118 | cache_key = (probe_type,unusual_case) |
---|
119 | if cache_key not in self._population_cache: |
---|
120 | query=""" |
---|
121 | SELECT packet_rtt AS unusual_packet, |
---|
122 | (SELECT avg(packet_rtt) FROM probes,analysis |
---|
123 | WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_packet, |
---|
124 | |
---|
125 | tsval_rtt AS unusual_tsval, |
---|
126 | (SELECT avg(tsval_rtt) FROM probes,analysis |
---|
127 | WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_tsval, |
---|
128 | |
---|
129 | reported AS unusual_reported, |
---|
130 | (SELECT avg(reported) FROM probes,analysis |
---|
131 | WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_reported |
---|
132 | |
---|
133 | FROM (SELECT probes.sample,packet_rtt,tsval_rtt,reported FROM probes,analysis |
---|
134 | WHERE analysis.probe_id=probes.id AND probes.test_case =:unusual_case AND probes.type=:probe_type) u |
---|
135 | """ |
---|
136 | |
---|
137 | params = {"probe_type":probe_type, "unusual_case":unusual_case} |
---|
138 | cursor = self.conn.cursor() |
---|
139 | cursor.execute(query, params) |
---|
140 | p = [dict(row) for row in cursor.fetchall()] |
---|
141 | self._population_cache[cache_key] = p |
---|
142 | self._offset_cache[cache_key] = tuple(numpy.random.random_integers(0,len(p)-1, len(p)/5)) |
---|
143 | self._cur_offsets[cache_key] = 0 |
---|
144 | |
---|
145 | population = self._population_cache[cache_key] |
---|
146 | |
---|
147 | if size == None or size > len(population): |
---|
148 | size = len(population) |
---|
149 | if offset == None or offset >= len(population) or offset < 0: |
---|
150 | offset = self._offset_cache[cache_key][self._cur_offsets[cache_key]] |
---|
151 | self._cur_offsets[cache_key] = (offset + 1) % len(self._offset_cache[cache_key]) |
---|
152 | |
---|
153 | try: |
---|
154 | offset = int(offset) |
---|
155 | size = int(size) |
---|
156 | except Exception as e: |
---|
157 | print(e, offset, size) |
---|
158 | return None |
---|
159 | |
---|
160 | ret_val = population[offset:offset+size] |
---|
161 | if len(ret_val) < size: |
---|
162 | ret_val += population[0:size-len(ret_val)] |
---|
163 | |
---|
164 | return ret_val |
---|
165 | |
---|
166 | |
---|
167 | def resetOffsets(self): |
---|
168 | for k in self._cur_offsets.keys(): |
---|
169 | self._cur_offsets[k] = 0 |
---|
170 | |
---|
171 | |
---|
172 | def clearCache(self): |
---|
173 | self._population_cache = {} |
---|
174 | self._offset_cache = {} |
---|
175 | self._cur_offsets = {} |
---|
176 | |
---|
177 | |
---|
178 | def _insert(self, table, row): |
---|
179 | rid = _newid() |
---|
180 | keys = row.keys() |
---|
181 | columns = ','.join(keys) |
---|
182 | placeholders = ':'+', :'.join(keys) |
---|
183 | query = "INSERT INTO %s (id,%s) VALUES ('%s',%s)" % (table, columns, rid, placeholders) |
---|
184 | #print(row) |
---|
185 | self.conn.execute(query, row) |
---|
186 | return rid |
---|
187 | |
---|
188 | def addMeta(self, meta): |
---|
189 | ret_val = self._insert('meta', meta) |
---|
190 | self.conn.commit() |
---|
191 | return ret_val |
---|
192 | |
---|
193 | def addProbes(self, p): |
---|
194 | return [self._insert('probes', row) for row in p] |
---|
195 | |
---|
196 | def addPackets(self, pkts, window_size): |
---|
197 | query = ("INSERT INTO packets (id,probe_id,sent,observed,tsval,payload_len,tcpseq,tcpack)" |
---|
198 | " VALUES(randomblob(16)," |
---|
199 | "(SELECT id FROM probes WHERE local_port=:local_port AND :observed>time_of_day" |
---|
200 | " AND :observed<time_of_day+userspace_rtt+%d" |
---|
201 | " ORDER BY time_of_day ASC LIMIT 1)," |
---|
202 | ":sent,:observed,:tsval,:payload_len,:tcpseq,:tcpack)") % window_size |
---|
203 | self.conn.execute("PRAGMA foreign_keys = OFF;") |
---|
204 | self.conn.execute("CREATE INDEX IF NOT EXISTS probes_port ON probes (local_port)") |
---|
205 | cursor = self.conn.cursor() |
---|
206 | #print(query, list(pkts)[0:3]) |
---|
207 | cursor.executemany(query, pkts) |
---|
208 | self.conn.commit() |
---|
209 | self.conn.execute("PRAGMA foreign_keys = ON;") |
---|
210 | |
---|
211 | def addAnalyses(self, analyses): |
---|
212 | return [self._insert('analysis', row) for row in analyses] |
---|
213 | |
---|
214 | def addTrimAnalyses(self, analyses): |
---|
215 | return [self._insert('trim_analysis', row) for row in analyses] |
---|
216 | |
---|
217 | def addClassifierResult(self, results): |
---|
218 | ret_val = self._insert('classifier_results', results) |
---|
219 | self.conn.commit() |
---|
220 | return ret_val |
---|
221 | |
---|
222 | def fetchClassifierResult(self, classifier, trial_type, num_observations, params=None): |
---|
223 | query = """ |
---|
224 | SELECT * FROM classifier_results |
---|
225 | WHERE classifier=:classifier |
---|
226 | AND trial_type=:trial_type |
---|
227 | AND num_observations=:num_observations""" |
---|
228 | if params != None: |
---|
229 | query += """ |
---|
230 | AND params=:params""" |
---|
231 | query += """ |
---|
232 | ORDER BY false_positives+false_negatives |
---|
233 | LIMIT 1 |
---|
234 | """ |
---|
235 | |
---|
236 | qparams = {'classifier':classifier, 'trial_type':trial_type, |
---|
237 | 'num_observations':num_observations,'params':params} |
---|
238 | cursor = self.conn.cursor() |
---|
239 | cursor.execute(query, qparams) |
---|
240 | ret_val = cursor.fetchone() |
---|
241 | if ret_val != None: |
---|
242 | ret_val = dict(ret_val) |
---|
243 | return ret_val |
---|
244 | |
---|
245 | def deleteClassifierResults(self, classifier, trial_type, num_observations=None): |
---|
246 | params = {"classifier":classifier,"trial_type":trial_type,"num_observations":num_observations} |
---|
247 | query = """ |
---|
248 | DELETE FROM classifier_results |
---|
249 | WHERE classifier=:classifier AND trial_type=:trial_type |
---|
250 | """ |
---|
251 | if num_observations != None: |
---|
252 | query += " AND num_observations=:num_observations" |
---|
253 | |
---|
254 | self.conn.execute(query, params) |
---|
255 | self.conn.commit() |
---|
256 | |
---|