source: trunk/lib/nanownlib/storage.py @ 8

Last change on this file since 8 was 8, checked in by tim, 9 years ago

.

File size: 7.4 KB
Line 
1#-*- mode: Python;-*-
2
3import sys
4import os
5import uuid
6import threading
7import sqlite3
8
9import numpy
10
11def _newid():
12    return uuid.uuid4().hex
13
14
15class db(threading.local):
16    conn = None
17    cursor = None
18    _population_sizes = None
19    _population_cache = None
20   
21    def __init__(self, path):
22        exists = os.path.exists(path)
23        self.conn = sqlite3.connect(path)
24        self.conn.execute("PRAGMA foreign_keys = ON;")
25        self.conn.row_factory = sqlite3.Row
26        self._population_sizes = {}
27        self._population_cache = {}
28       
29        if not exists:
30            self.conn.execute(
31                """CREATE TABLE meta (id BLOB PRIMARY KEY,
32                                      tcpts_mean REAL,
33                                      tcpts_stddev REAL,
34                                      tcpts_slopes TEXT)
35                """)
36
37            self.conn.execute(
38                """CREATE TABLE probes (id BLOB PRIMARY KEY,
39                                        sample INTEGER,
40                                        test_case TEXT,
41                                        type TEXT,
42                                        tc_order INTEGER,
43                                        time_of_day INTEGER,
44                                        local_port INTEGER,
45                                        reported INTEGER,
46                                        userspace_rtt INTEGER,
47                                        UNIQUE (sample, test_case))
48                """)
49
50            self.conn.execute(
51                """CREATE TABLE packets (id BLOB PRIMARY KEY,
52                                         probe_id REFERENCES probes(id) ON DELETE CASCADE,
53                                         sent INTEGER,
54                                         observed INTEGER,
55                                         tsval INTEGER,
56                                         payload_len INTEGER,
57                                         tcpseq INTEGER,
58                                         tcpack INTEGER)
59                """)
60
61            self.conn.execute(
62                """CREATE TABLE analysis (id BLOB PRIMARY KEY,
63                                          probe_id UNIQUE REFERENCES probes(id) ON DELETE CASCADE,
64                                          suspect TEXT,
65                                          packet_rtt INTEGER,
66                                          tsval_rtt INTEGER)
67                """)
68
69            self.conn.execute(
70                """CREATE TABLE trim_analysis (id BLOB PRIMARY KEY,
71                                               probe_id REFERENCES probes(id) ON DELETE CASCADE,
72                                               suspect TEXT,
73                                               packet_rtt INTEGER,
74                                               tsval_rtt INTEGER,
75                                               sent_trimmed INTEGER,
76                                               rcvd_trimmed INTEGER)
77                """)
78
79            self.conn.execute(
80                """CREATE TABLE classifier_results (id BLOB PRIMARY KEY,
81                                                    algorithm TEXT,
82                                                    params TEXT,
83                                                    sample_size INTEGER,
84                                                    num_trials INTEGER,
85                                                    trial_type TEXT,
86                                                    false_positives REAL,
87                                                    false_negatives REAL)
88                """)
89
90    def __del__(self):
91        if self.conn:
92            self.conn.commit()
93            self.conn.close()
94
95   
96    def populationSize(self, probe_type):
97        if probe_type in self._population_sizes:
98            return self._population_sizes[probe_type]
99
100        try:
101            cursor = self.conn.cursor()
102            cursor.execute("SELECT max(c) FROM (SELECT count(sample) c FROM probes WHERE type=? GROUP BY test_case)", (probe_type,))
103            self._population_sizes[probe_type] = cursor.fetchone()[0]
104            return self._population_sizes[probe_type]
105        except Exception as e:
106            print(e)
107            return 0
108
109
110    def subseries(self, probe_type, unusual_case, size=None, offset=None, field='packet_rtt'):
111        if (probe_type,unusual_case,field) not in self._population_cache:
112            query="""
113            SELECT %(field)s AS unusual_case,
114                   (SELECT avg(%(field)s) FROM probes,analysis
115                    WHERE analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND probes.type=:probe_type AND sample=u.sample) AS other_cases
116            FROM   (SELECT probes.sample,%(field)s FROM probes,analysis
117                    WHERE analysis.probe_id=probes.id AND probes.test_case =:unusual_case AND probes.type=:probe_type) u
118            """ % {"field":field}
119   
120            params = {"probe_type":probe_type, "unusual_case":unusual_case}
121            cursor = self.conn.cursor()
122            cursor.execute(query, params)
123            self._population_cache[(probe_type,unusual_case,field)] = [dict(row) for row in cursor.fetchall()]
124
125        population = self._population_cache[(probe_type,unusual_case,field)]
126
127        if size == None or size > len(population):
128            size = len(population)
129        if offset == None or offset >= len(population) or offset < 0:
130            offset = numpy.random.random_integers(0,len(population)-1)
131
132        ret_val = population[offset:offset+size]
133        if len(ret_val) < size:
134            ret_val += population[0:size-len(ret_val)]
135       
136        return ret_val
137
138   
139    def clearCache(self):
140        self._population_cache = {}
141
142       
143    def _insert(self, table, row):
144        rid = _newid()
145        keys = row.keys()
146        columns = ','.join(keys)
147        placeholders = ':'+', :'.join(keys)
148        query = "INSERT INTO %s (id,%s) VALUES ('%s',%s)" % (table, columns, rid, placeholders)
149        #print(row)
150        self.conn.execute(query, row)
151        return rid
152
153    def addMeta(self, meta):
154        ret_val = self._insert('meta', meta)
155        self.conn.commit()
156        return ret_val
157   
158    def addProbes(self, p):
159        return [self._insert('probes', row) for row in p]
160
161    def addPackets(self, pkts, window_size):
162        query = ("INSERT INTO packets (id,probe_id,sent,observed,tsval,payload_len,tcpseq,tcpack)"
163                 " VALUES(randomblob(16),"
164                 "(SELECT id FROM probes WHERE local_port=:local_port AND :observed>time_of_day"
165                 " AND :observed<time_of_day+userspace_rtt+%d" 
166                 " ORDER BY time_of_day ASC LIMIT 1),"
167                 ":sent,:observed,:tsval,:payload_len,:tcpseq,:tcpack)") % window_size
168        self.conn.execute("PRAGMA foreign_keys = OFF;")
169        self.conn.execute("CREATE INDEX IF NOT EXISTS probes_port ON probes (local_port)")
170        cursor = self.conn.cursor()
171        #print(query, list(pkts)[0:3])
172        cursor.executemany(query, pkts)
173        self.conn.commit()
174        self.conn.execute("PRAGMA foreign_keys = ON;")
175
176    def addAnalyses(self, analyses):
177        return [self._insert('analysis', row) for row in analyses]
178
179    def addTrimAnalyses(self, analyses):
180        return [self._insert('trim_analysis', row) for row in analyses]
181
182    def addClassifierResults(self, results):
183        ret_val = self._insert('classifier_results', results)
184        self.conn.commit()
185        return ret_val
Note: See TracBrowser for help on using the repository browser.