source: trunk/lib/nanownlib/__init__.py @ 24

Last change on this file since 24 was 22, checked in by tim, 9 years ago

Fixed a bug in processPackets where trim parameters weren't passed through

Reorganized analyzeProbes to work as a O(max(S,R)) algorithm, rather than O(S*R)

Sped up addTrimAnalyses by using a new _insertMany primative

File size: 16.7 KB
RevLine 
[4]1#!/usr/bin/env python3
2#-*- mode: Python;-*-
3
4import sys
5import time
[10]6import traceback
[4]7import socket
8import datetime
9import http.client
10import subprocess
[20]11import tempfile
[4]12import json
13import gzip
14import statistics
[16]15
16try:
[4]17    import requests
18except:
19    sys.stderr.write('ERROR: Could not import requests module.  Ensure it is installed.\n')
20    sys.stderr.write('       Under Debian, the package name is "python3-requests"\n.')
21    sys.exit(1)
22
23from .stats import *
24
25
26def getLocalIP(remote_host, remote_port):
27    connection = socket.create_connection((remote_host, remote_port))
28    ret_val = connection.getsockname()[0]
29    connection.close()
30
31    return ret_val
32
33
34def getIfaceForIP(ip):
[16]35    try:
36        import netifaces
37    except:
38        sys.stderr.write('ERROR: Could not import netifaces module.  Ensure it is installed.\n')
39        sys.stderr.write('       Try: pip3 install netifaces\n.')
40        sys.exit(1)
41   
[4]42    for iface in netifaces.interfaces():
43        addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, None)
44        if addrs:
45            for a in addrs:
46                if a.get('addr', None) == ip:
47                    return iface
48
49
[20]50class snifferProcess(object):
51    my_ip = None
52    my_iface = None
53    target_ip = None
54    target_port = None
55    _proc = None
56    _spool = None
[4]57   
[20]58    def __init__(self, target_ip, target_port):
59        self.target_ip = target_ip
60        self.target_port = target_port
61        self.my_ip = getLocalIP(target_ip, target_port)
62        self.my_iface = getIfaceForIP(self.my_ip)
63        print(self.my_ip, self.my_iface)
[4]64
[20]65    def start(self):
66        self._spool = tempfile.NamedTemporaryFile('w+t')
67        self._proc = subprocess.Popen(['chrt', '-r', '99', 'nanown-listen',
68                                       self.my_iface, self.my_ip,
69                                       self.target_ip, "%d" % self.target_port,
70                                       self._spool.name, '0'])
71        time.sleep(0.25)
[4]72
[20]73    def openPacketLog(self):
74        return open(self._spool.name, 'rt')
[4]75       
[20]76    def stop(self):
77        if self._proc:
78            self._proc.terminate()
79            self._proc.wait(2)
80            if self._proc.poll() == None:
81                self._proc.kill()
82                self._proc.wait(1)
83            self._proc = None
[4]84   
[20]85    def is_running(self):
86        return (self._proc.poll() == None)
87           
88    def __del__(self):
89        self.stop()
[4]90
[20]91           
[4]92def startSniffer(target_ip, target_port, output_file):
93    my_ip = getLocalIP(target_ip, target_port)
94    my_iface = getIfaceForIP(my_ip)
[16]95    return subprocess.Popen(['chrt', '-r', '99', 'nanown-listen', my_iface, my_ip,
[4]96                             target_ip, "%d" % target_port, output_file, '0'])
97
98def stopSniffer(sniffer):
99    sniffer.terminate()
100    sniffer.wait(2)
101    if sniffer.poll() == None:
102        sniffer.kill()
103        sniffer.wait(1)
104
105
106# Monkey patching that instruments the HTTPResponse to collect connection source port info
107class MonitoredHTTPResponse(http.client.HTTPResponse):
108    local_address = None
109
110    def __init__(self, sock, *args, **kwargs):
111        self.local_address = sock.getsockname()
[20]112        #print(self.local_address)
[4]113        super(MonitoredHTTPResponse, self).__init__(sock,*args,**kwargs)
114           
115requests.packages.urllib3.connection.HTTPConnection.response_class = MonitoredHTTPResponse
116
117
118def removeDuplicatePackets(packets):
119    #return packets
[10]120    suspect = ''
[4]121    seen = {}
122    # XXX: Need to review this deduplication algorithm and make sure it is correct
123    for p in packets:
124        key = (p['sent'],p['tcpseq'],p['tcpack'],p['payload_len'])
[10]125        if (key not in seen):
[4]126            seen[key] = p
[10]127            continue
128        if p['sent']==1 and (seen[key]['observed'] > p['observed']): #earliest sent
129            seen[key] = p
[11]130            suspect += 's' # duplicated sent packets
[10]131            continue 
132        if p['sent']==0 and (seen[key]['observed'] > p['observed']): #earliest rcvd
133            seen[key] = p
[11]134            suspect += 'r' # duplicated received packets
[10]135            continue
[4]136   
[10]137    #if len(seen) < len(packets):
138    #   sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen)))
[4]139
140    return suspect,seen.values()
141
142
143def analyzePackets(packets, timestamp_precision, trim_sent=0, trim_rcvd=0):
144    suspect,packets = removeDuplicatePackets(packets)
145
[11]146    sort_key = lambda d: (d['observed'],d['tcpseq'])
147    alt_key = lambda d: (d['tcpseq'],d['observed'])
[4]148    sent = sorted((p for p in packets if p['sent']==1 and p['payload_len']>0), key=sort_key)
149    rcvd = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=sort_key)
[10]150    rcvd_alt = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=alt_key)
[4]151
[10]152    s_off = trim_sent
153    if s_off >= len(sent):
[11]154        suspect += 'd' # dropped packet?
[10]155        s_off = -1
156    last_sent = sent[s_off]
157
158    r_off = len(rcvd) - trim_rcvd - 1
[11]159    if r_off < 0:
160        suspect += 'd' # dropped packet?
[10]161        r_off = 0
162    last_rcvd = rcvd[r_off]
163    if last_rcvd != rcvd_alt[r_off]:
[11]164        suspect += 'R' # reordered received packets
[4]165   
166    last_sent_ack = None
167    try:
[13]168        last_sent_ack = min(((p['tcpack'],p['observed'],p) for p in packets
169                             if p['sent']==0 and p['payload_len']+last_sent['tcpseq']>=p['tcpack']))[2]
[11]170       
[4]171    except Exception as e:
172        sys.stderr.write("WARN: Could not find last_sent_ack.\n")
173
[13]174    packet_rtt = last_rcvd['observed'] - last_sent['observed']
[4]175    tsval_rtt = None
176    if None not in (timestamp_precision, last_sent_ack):
177        tsval_rtt = int(round((last_rcvd['tsval'] - last_sent_ack['tsval'])*timestamp_precision))
178
[13]179    if packet_rtt < 0 or (tsval_rtt != None and tsval_rtt < 0):
180        #sys.stderr.write("WARN: Negative packet or tsval RTT. last_rcvd=%s,last_sent=%s\n" % (last_rcvd, last_sent))
181        suspect += 'N'
182       
[4]183    return {'packet_rtt':packet_rtt,
184            'tsval_rtt':tsval_rtt,
185            'suspect':suspect,
186            'sent_trimmed':trim_sent,
187            'rcvd_trimmed':trim_rcvd},len(sent),len(rcvd)
188
189
[13]190# septasummary and mad for each dist of differences
[4]191def evaluateTrim(db, unusual_case, strim, rtrim):
192    cursor = db.conn.cursor()
193    query="""
194      SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis
[10]195                         WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test'))
196      FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test') AND 1 NOT IN (select 1 from probes p,trim_analysis t WHERE p.sample=s AND t.probe_id=p.id AND t.suspect LIKE '%R%')) u
[4]197    """
[10]198    query="""
199      SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis
200                         WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test'))
201      FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u
202    """
[13]203    #TODO: check for "N" in suspect field and return a flag
204   
[4]205    params = {"strim":strim,"rtrim":rtrim,"unusual_case":unusual_case}
206    cursor.execute(query, params)
207    differences = [row[0] for row in cursor]
[22]208
[13]209    return septasummary(differences),mad(differences)
[4]210
211
212
[16]213def analyzeProbes(db, trim=None, recompute=False):
[4]214    db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)")
[10]215    db.conn.commit()
[4]216
[11]217    pcursor = db.conn.cursor()
[4]218    pcursor.execute("SELECT tcpts_mean FROM meta")
219    try:
220        timestamp_precision = pcursor.fetchone()[0]
221    except:
222        timestamp_precision = None
223   
224    pcursor.execute("DELETE FROM trim_analysis")
225    db.conn.commit()
[16]226    if recompute:
227        pcursor.execute("DELETE FROM analysis")
228        db.conn.commit()
[10]229
230    def loadPackets(db):
231        cursor = db.conn.cursor()
[16]232        #cursor.execute("SELECT * FROM packets ORDER BY probe_id")
233        cursor.execute("SELECT * FROM packets WHERE probe_id NOT IN (SELECT probe_id FROM analysis) ORDER BY probe_id")
[10]234
235        probe_id = None
236        entry = []
237        ret_val = []
238        for p in cursor:
239            if probe_id == None:
240                probe_id = p['probe_id']
241            if p['probe_id'] != probe_id:
242                ret_val.append((probe_id,entry))
243                probe_id = p['probe_id']
244                entry = []
245            entry.append(dict(p))
246        ret_val.append((probe_id,entry))
247        return ret_val
[16]248
249    def processPackets(packet_cache, strim, rtrim):
250        sent_tally = []
251        rcvd_tally = []
252        analyses = []
253        for probe_id,packets in packet_cache:
254            try:
[22]255                analysis,s,r = analyzePackets(packets, timestamp_precision,strim,rtrim)
[16]256                analysis['probe_id'] = probe_id
257                analyses.append(analysis)
258                sent_tally.append(s)
259                rcvd_tally.append(r)
260            except Exception as e:
261                #traceback.print_exc()
262                sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
[22]263
264        start = time.time()   
[16]265        db.addTrimAnalyses(analyses)
266        db.conn.commit()
[22]267        print("addTrimAnalyses: %f" % (time.time()-start))
268
[16]269        return statistics.mode(sent_tally),statistics.mode(rcvd_tally)
[4]270   
[16]271    #start = time.time()
[10]272    packet_cache = loadPackets(db)
[16]273    #print("packets loaded in: %f" % (time.time()-start))
274
275    if trim != None:
276        best_strim,best_rtrim = trim
277        processPackets(packet_cache, best_strim, best_rtrim)
278    else:
279        num_sent,num_rcvd = processPackets(packet_cache, 0, 0)
280        print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd))
281        unusual_case,delta = findUnusualTestCase(db, (0,0))
[22]282        print("unusual_case: %s, delta: %f" % (unusual_case,delta))
283       
[16]284        delta_margin = 0.15
285        best_strim = 0
286        best_rtrim = 0
[22]287       
288        good_delta,good_mad = evaluateTrim(db, unusual_case, best_strim, best_rtrim)
289        print("trim (%d,%d): delta=%f, mad=%f" % (best_strim,best_rtrim, good_delta, good_mad))
290       
[16]291        for strim in range(1,num_sent):
[22]292            processPackets(packet_cache, strim, best_rtrim)
293            delta,mad = evaluateTrim(db, unusual_case, strim, best_rtrim)
294            print("trim (%d,%d): delta=%f, mad=%f" % (strim,best_rtrim, delta, mad))
[16]295            if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
296                best_strim = strim
[22]297                good_delta,good_mad = delta,mad
[16]298            else:
299                break
[4]300
[16]301        for rtrim in range(1,num_rcvd):
[22]302            processPackets(packet_cache, best_strim, rtrim)
303            delta,mad = evaluateTrim(db, unusual_case, best_strim, rtrim)
304            print("trim (%d,%d): delta=%f, mad=%f" % (best_strim, rtrim, delta, mad))           
[16]305            if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
306                best_rtrim = rtrim
307            else:
308                break
[4]309
[16]310        print("selected trim parameters:",(best_strim,best_rtrim))
[4]311   
[16]312    pcursor.execute("""INSERT OR IGNORE INTO analysis
313                         SELECT id,probe_id,suspect,packet_rtt,tsval_rtt
314                           FROM trim_analysis
315                           WHERE sent_trimmed=? AND rcvd_trimmed=?""",
316                    (best_strim,best_rtrim))
[5]317    db.conn.commit()
[4]318   
[16]319    return len(packet_cache)
[4]320
321
322       
323def parseJSONLines(fp):
324    for line in fp:
325        yield json.loads(line)
326
327
328def associatePackets(sniffer_fp, db):
329    sniffer_fp.seek(0)
330
331    # now combine sampler data with packet data
332    buffered = []
333
334    cursor = db.conn.cursor()
335    cursor.execute("SELECT count(*) count,min(time_of_day) start,max(time_of_day+userspace_rtt) end from probes")
336    ptimes = cursor.fetchone()
337    window_size = 100*int((ptimes['end']-ptimes['start'])/ptimes['count'])
[16]338    #print("associate window_size:", window_size)
[4]339
340    db.addPackets(parseJSONLines(sniffer_fp), window_size)
341
342    cursor.execute("SELECT count(*) count FROM packets WHERE probe_id is NULL")
343    unmatched = cursor.fetchone()['count']
344    if unmatched > 0:
345        sys.stderr.write("WARNING: %d observed packets didn't find a home...\n" % unmatched)
346 
347    return None
348
349
350def enumStoredTestCases(db):
351    cursor = db.conn.cursor()
352    cursor.execute("SELECT test_case FROM probes GROUP BY test_case")
353    return [tc[0] for tc in cursor]
354
355
[16]356def findUnusualTestCase(db, trim=None):
[4]357    test_cases = enumStoredTestCases(db)
[16]358    if trim != None:
359        params = {'strim':trim[0], 'rtrim':trim[1]}
360        qsuffix = " AND sent_trimmed=:strim AND rcvd_trimmed=:rtrim"
361        table = "trim_analysis"
362    else:
363        params = {}
364        qsuffix = ""
365        table = "analysis"
366   
[4]367    cursor = db.conn.cursor()
[16]368    cursor.execute("SELECT packet_rtt FROM probes,"+table+" a WHERE probes.id=a.probe_id AND probes.type in ('train','test')"+qsuffix, params)
[10]369    global_tm = quadsummary([row['packet_rtt'] for row in cursor])
[4]370
371    tm_abs = []
372    tm_map = {}
[16]373
[4]374    # XXX: if more speed needed, percentile extension to sqlite might be handy...
375    for tc in test_cases:
[16]376        params['test_case']=tc
377        query = """SELECT packet_rtt FROM probes,"""+table+""" a
378                   WHERE probes.id=a.probe_id AND probes.type in ('train','test')
379                   AND probes.test_case=:test_case""" + qsuffix
380        cursor.execute(query, params)
[10]381        tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor])
[4]382        tm_abs.append((abs(tm_map[tc]-global_tm), tc))
383
384    magnitude,tc = max(tm_abs)
[16]385    params['test_case']=tc
386    query = """SELECT packet_rtt FROM probes,"""+table+""" a
387               WHERE probes.id=a.probe_id AND probes.type in ('train','test')
388               AND probes.test_case<>:test_case""" + qsuffix
389    cursor.execute(query,params)
[10]390    remaining_tm = quadsummary([row['packet_rtt'] for row in cursor])
[4]391
[16]392    delta = tm_map[tc]-remaining_tm
393    # Hack to make the chosen unusual_case more intuitive to the user
394    if len(test_cases) == 2 and delta < 0.0:
395        tc = [t for t in test_cases if t != tc][0]
396        delta = abs(delta)
[4]397
[16]398    return tc,delta
[4]399
[16]400
[4]401def reportProgress(db, sample_types, start_time):
402    cursor = db.conn.cursor()
403    output = ''
404    total_completed = 0
405    total_requested = 0
406    for st in sample_types:
407        cursor.execute("SELECT count(id) c FROM (SELECT id FROM probes WHERE type=? AND time_of_day>? GROUP BY sample)", (st[0],int(start_time*1000000000)))
408        count = cursor.fetchone()[0]
[16]409        output += " | %s remaining: %6d" % (st[0], st[1]-count)
[4]410        total_completed += count
411        total_requested += st[1]
412
413    rate = total_completed / (time.time() - start_time)
[16]414    total_time = total_requested / rate
[4]415    eta = datetime.datetime.fromtimestamp(start_time+total_time)
[16]416    print("STATUS:",output[3:],"| est. total_time: %s | ETA: %s" % (str(datetime.timedelta(seconds=total_time)), eta.strftime("%Y-%m-%d %X")))
417
418
419
420def evaluateTestResults(db):
421    cursor = db.conn.cursor()
422    query = """
423      SELECT classifier FROM classifier_results GROUP BY classifier ORDER BY classifier;
424    """
425    cursor.execute(query)
426    classifiers = []
427    for c in cursor:
428        classifiers.append(c[0])
429
430    best_obs = []
431    best_error = []
432    max_obs = 0
433    for classifier in classifiers:
434        query="""
435        SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error
436        FROM classifier_results
437        WHERE trial_type='test'
438         AND classifier=:classifier
439         AND (false_positives+false_negatives)/2.0 < 5.0
440        ORDER BY num_observations,(false_positives+false_negatives)
441        LIMIT 1
442        """
443        cursor.execute(query, {'classifier':classifier})
444        row = cursor.fetchone()
445        if row == None:
446            query="""
447            SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error
448            FROM classifier_results
449            WHERE trial_type='test' and classifier=:classifier
450            ORDER BY (false_positives+false_negatives),num_observations
451            LIMIT 1
452            """
453            cursor.execute(query, {'classifier':classifier})
454            row = cursor.fetchone()
455            if row == None:
456                sys.stderr.write("WARN: couldn't find test results for classifier '%s'.\n" % classifier)
457                continue
458            row = dict(row)
459
460            best_error.append(dict(row))
461        else:
462            best_obs.append(dict(row))
463
464
465    return best_obs,best_error
Note: See TracBrowser for help on using the repository browser.