source: trunk/lib/nanownlib/__init__.py @ 10

Last change on this file since 10 was 10, checked in by tim, 9 years ago

.

File size: 16.2 KB
Line 
1#!/usr/bin/env python3
2#-*- mode: Python;-*-
3
4import sys
5import time
6import traceback
7import random
8import argparse
9import socket
10import datetime
11import http.client
12import threading
13import queue
14import subprocess
15import multiprocessing
16import csv
17import json
18import gzip
19import statistics
20import numpy
21import netifaces
22try:
23    import requests
24except:
25    sys.stderr.write('ERROR: Could not import requests module.  Ensure it is installed.\n')
26    sys.stderr.write('       Under Debian, the package name is "python3-requests"\n.')
27    sys.exit(1)
28
29from .stats import *
30
31
32def getLocalIP(remote_host, remote_port):
33    connection = socket.create_connection((remote_host, remote_port))
34    ret_val = connection.getsockname()[0]
35    connection.close()
36
37    return ret_val
38
39
40def getIfaceForIP(ip):
41    for iface in netifaces.interfaces():
42        addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, None)
43        if addrs:
44            for a in addrs:
45                if a.get('addr', None) == ip:
46                    return iface
47
48
49def setTCPTimestamps(enabled=True):
50    fh = open('/proc/sys/net/ipv4/tcp_timestamps', 'r+b')
51    ret_val = False
52    if fh.read(1) == b'1':
53        ret_val = True
54
55    fh.seek(0)
56    if enabled:
57        fh.write(b'1')
58    else:
59        fh.write(b'0')
60    fh.close()
61   
62    return ret_val
63
64
65def trickleHTTPRequest(ip,port,hostname):
66    my_port = None
67    try:
68        sock = socket.create_connection((ip, port))
69        my_port = sock.getsockname()[1]
70       
71        #print('.')
72        sock.sendall(b'GET / HTTP/1.1\r\n')
73        time.sleep(0.5)
74        rest = b'''Host: '''+hostname.encode('utf-8')+b'''\r\nUser-Agent: Secret Agent Man\r\nX-Extra: extra read all about it!\r\nConnection: close\r\n'''
75        for r in rest:
76            sock.sendall(bytearray([r]))
77            time.sleep(0.05)
78
79        time.sleep(0.5)
80        sock.sendall('\r\n')
81
82        r = None
83        while r != b'':
84            r = sock.recv(16)
85
86        sock.close()
87    except Exception as e:
88        pass
89
90    return my_port
91
92
93def runTimestampProbes(host_ip, port, hostname, num_trials, concurrency=4): 
94    myq = queue.Queue()
95    def threadWrapper(*args):
96        try:
97            myq.put(trickleHTTPRequest(*args))
98        except Exception as e:
99            sys.stderr.write("ERROR from trickleHTTPRequest: %s\n" % repr(e))
100            myq.put(None)
101
102    threads = []
103    ports = []
104    for i in range(num_trials):
105        if len(threads) >= concurrency:
106            ports.append(myq.get())
107        t = threading.Thread(target=threadWrapper, args=(host_ip, port, hostname))
108        t.start()
109        threads.append(t)
110
111    for t in threads:
112        t.join()
113
114    while myq.qsize() > 0:
115        ports.append(myq.get())
116
117    return ports
118
119
120def computeTimestampPrecision(sniffer_fp, ports):
121    rcvd = []
122    for line in sniffer_fp:
123        p = json.loads(line)
124        if p['sent']==0:
125            rcvd.append((p['observed'],p['tsval'],int(p['local_port'])))
126
127    slopes = []
128    for port in ports:
129        trcvd = [tr for tr in rcvd if tr[2]==port and tr[1]!=0]
130
131        if len(trcvd) < 2:
132            sys.stderr.write("WARN: Inadequate data points.\n")
133            continue
134       
135        if trcvd[0][1] > trcvd[-1][1]:
136            sys.stderr.write("WARN: TSval wrap.\n")
137            continue
138
139        x = [tr[1] for tr in trcvd]
140        y = [tr[0] for tr in trcvd]
141
142        slope,intercept = OLSRegression(x, y)
143        slopes.append(slope)
144
145    if len(slopes) == 0:
146        return None,None,None
147
148    m = statistics.mean(slopes)
149    if len(slopes) == 1:
150        return (m, None, slopes)
151    else:
152        return (m, statistics.stdev(slopes), slopes)
153
154   
155def OLSRegression(x,y):
156    #print(x,y)
157    x = numpy.array(x)
158    y = numpy.array(y)
159    #A = numpy.vstack([x, numpy.ones(len(x))]).T
160    #m, c = numpy.linalg.lstsq(A, y)[0] # broken
161    #c,m = numpy.polynomial.polynomial.polyfit(x, y, 1) # less accurate
162    c,m = numpy.polynomial.Polynomial.fit(x,y,1).convert().coef
163
164    #print(m,c)
165
166    #import matplotlib.pyplot as plt
167    #plt.clf()
168    #plt.scatter(x, y)
169    #plt.plot(x, m*x + c, 'r', label='Fitted line')
170    #plt.show()
171   
172    return (m,c)
173
174
175def startSniffer(target_ip, target_port, output_file):
176    my_ip = getLocalIP(target_ip, target_port)
177    my_iface = getIfaceForIP(my_ip)
178    return subprocess.Popen(['chrt', '-r', '99', './bin/csamp', my_iface, my_ip,
179                             target_ip, "%d" % target_port, output_file, '0'])
180
181def stopSniffer(sniffer):
182    sniffer.terminate()
183    sniffer.wait(2)
184    if sniffer.poll() == None:
185        sniffer.kill()
186        sniffer.wait(1)
187
188       
189def setCPUAffinity():
190    import ctypes
191    from ctypes import cdll,c_int,byref
192    cpus = multiprocessing.cpu_count()
193   
194    libc = cdll.LoadLibrary("libc.so.6")
195    #libc.sched_setaffinity(os.getpid(), 1, ctypes.byref(ctypes.c_int(0x01)))
196    return libc.sched_setaffinity(0, 4, byref(c_int(0x00000001<<(cpus-1))))
197
198
199# Monkey patching that instruments the HTTPResponse to collect connection source port info
200class MonitoredHTTPResponse(http.client.HTTPResponse):
201    local_address = None
202
203    def __init__(self, sock, *args, **kwargs):
204        self.local_address = sock.getsockname()
205        super(MonitoredHTTPResponse, self).__init__(sock,*args,**kwargs)
206           
207requests.packages.urllib3.connection.HTTPConnection.response_class = MonitoredHTTPResponse
208
209
210def removeDuplicatePackets(packets):
211    #return packets
212    suspect = ''
213    seen = {}
214    # XXX: Need to review this deduplication algorithm and make sure it is correct
215    for p in packets:
216        key = (p['sent'],p['tcpseq'],p['tcpack'],p['payload_len'])
217        if (key not in seen):
218            seen[key] = p
219            continue
220        if p['sent']==1 and (seen[key]['observed'] > p['observed']): #earliest sent
221            seen[key] = p
222            suspect += 's'
223            continue 
224        if p['sent']==0 and (seen[key]['observed'] > p['observed']): #earliest rcvd
225            seen[key] = p
226            suspect += 'r'
227            continue
228   
229    #if len(seen) < len(packets):
230    #   sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen)))
231
232    return suspect,seen.values()
233
234
235def analyzePackets(packets, timestamp_precision, trim_sent=0, trim_rcvd=0):
236    suspect,packets = removeDuplicatePackets(packets)
237
238    sort_key = lambda d: (d['tcpseq'],d['observed'])
239    sent = sorted((p for p in packets if p['sent']==1 and p['payload_len']>0), key=sort_key)
240    rcvd = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=sort_key)
241
242    alt_key = lambda d: (d['observed'],d['tcpseq'])
243    rcvd_alt = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=alt_key)
244
245    s_off = trim_sent
246    if s_off >= len(sent):
247        s_off = -1
248    last_sent = sent[s_off]
249
250    r_off = len(rcvd) - trim_rcvd - 1
251    if r_off <= 0:
252        r_off = 0
253    last_rcvd = rcvd[r_off]
254    if last_rcvd != rcvd_alt[r_off]:
255        suspect += 'R'
256   
257    packet_rtt = last_rcvd['observed'] - last_sent['observed']
258    if packet_rtt < 0:
259        sys.stderr.write("WARN: Negative packet_rtt. last_rcvd=%s,last_sent=%s\n" % (last_rcvd, last_sent))
260
261    last_sent_ack = None
262    try:
263        last_sent_ack = min(((p['observed'],p) for p in packets
264                             if p['sent']==0 and p['payload_len']+last_sent['tcpseq']==p['tcpack']))[1]
265    except Exception as e:
266        sys.stderr.write("WARN: Could not find last_sent_ack.\n")
267
268    tsval_rtt = None
269    if None not in (timestamp_precision, last_sent_ack):
270        tsval_rtt = int(round((last_rcvd['tsval'] - last_sent_ack['tsval'])*timestamp_precision))
271
272    return {'packet_rtt':packet_rtt,
273            'tsval_rtt':tsval_rtt,
274            'suspect':suspect,
275            'sent_trimmed':trim_sent,
276            'rcvd_trimmed':trim_rcvd},len(sent),len(rcvd)
277
278
279# trimean and mad for each dist of differences
280def evaluateTrim(db, unusual_case, strim, rtrim):
281    cursor = db.conn.cursor()
282    query="""
283      SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis
284                         WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test'))
285      FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test') AND 1 NOT IN (select 1 from probes p,trim_analysis t WHERE p.sample=s AND t.probe_id=p.id AND t.suspect LIKE '%R%')) u
286    """
287    query="""
288      SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis
289                         WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test'))
290      FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u
291    """
292
293    params = {"strim":strim,"rtrim":rtrim,"unusual_case":unusual_case}
294    cursor.execute(query, params)
295    differences = [row[0] for row in cursor]
296   
297    return ubersummary(differences),mad(differences)
298
299
300
301def analyzeProbes(db):
302    db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)")
303    pcursor = db.conn.cursor()
304    db.conn.commit()
305
306    pcursor.execute("SELECT tcpts_mean FROM meta")
307    try:
308        timestamp_precision = pcursor.fetchone()[0]
309    except:
310        timestamp_precision = None
311   
312    pcursor.execute("DELETE FROM trim_analysis")
313    db.conn.commit()
314
315    def loadPackets(db):
316        cursor = db.conn.cursor()
317        cursor.execute("SELECT * FROM packets ORDER BY probe_id")
318
319        probe_id = None
320        entry = []
321        ret_val = []
322        for p in cursor:
323            if probe_id == None:
324                probe_id = p['probe_id']
325            if p['probe_id'] != probe_id:
326                ret_val.append((probe_id,entry))
327                probe_id = p['probe_id']
328                entry = []
329            entry.append(dict(p))
330        ret_val.append((probe_id,entry))
331        return ret_val
332   
333    start = time.time()
334    packet_cache = loadPackets(db)
335    print("packets loaded in: %f" % (time.time()-start))
336   
337    count = 0
338    sent_tally = []
339    rcvd_tally = []
340    for probe_id,packets in packet_cache:
341        try:
342            analysis,s,r = analyzePackets(packets, timestamp_precision)
343            analysis['probe_id'] = probe_id
344            sent_tally.append(s)
345            rcvd_tally.append(r)
346            db.addTrimAnalyses([analysis])
347        except Exception as e:
348            traceback.print_exc()
349            sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
350       
351        #print(pid,analysis)
352        count += 1
353    db.conn.commit()
354    num_sent = statistics.mode(sent_tally)
355    num_rcvd = statistics.mode(rcvd_tally)
356    sent_tally = None
357    rcvd_tally = None
358    print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd))
359   
360    for strim in range(0,num_sent):
361        for rtrim in range(0,num_rcvd):
362            if strim == 0 and rtrim == 0:
363                continue # no point in doing 0,0 again
364            for probe_id,packets in packet_cache:
365                try:
366                    analysis,s,r = analyzePackets(packets, timestamp_precision, strim, rtrim)
367                    analysis['probe_id'] = probe_id
368                except Exception as e:
369                    print(e)
370                    sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % pid)
371                   
372                db.addTrimAnalyses([analysis])
373    db.conn.commit()
374
375    # Populate analysis table so findUnusualTestCase can give us a starting point
376    pcursor.execute("DELETE FROM analysis")
377    db.conn.commit()
378    pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=0 AND rcvd_trimmed=0")
379   
380    unusual_case,delta = findUnusualTestCase(db)
381    evaluations = {}
382    for strim in range(0,num_sent):
383        for rtrim in range(0,num_rcvd):
384            evaluations[(strim,rtrim)] = evaluateTrim(db, unusual_case, strim, rtrim)
385
386    import pprint
387    pprint.pprint(evaluations)
388
389    delta_margin = 0.15
390    best_strim = 0
391    best_rtrim = 0
392    good_delta,good_mad = evaluations[(0,0)]
393   
394    for strim in range(1,num_sent):
395        delta,mad = evaluations[(strim,0)]
396        if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
397            best_strim = strim
398        else:
399            break
400
401    good_delta,good_mad = evaluations[(best_strim,0)]
402    for rtrim in range(1,num_rcvd):
403        delta,mad = evaluations[(best_strim,rtrim)]
404        if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
405            best_rtrim = rtrim
406        else:
407            break
408
409    print("selected trim parameters:",(best_strim,best_rtrim))
410   
411    if best_strim != 0 or best_rtrim !=0:
412        pcursor.execute("DELETE FROM analysis")
413        db.conn.commit()
414        pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=? AND rcvd_trimmed=?",
415                        (best_strim,best_rtrim))
416
417    #pcursor.execute("DELETE FROM trim_analysis")
418    db.conn.commit()
419   
420    return count
421
422
423       
424def parseJSONLines(fp):
425    for line in fp:
426        yield json.loads(line)
427
428
429def associatePackets(sniffer_fp, db):
430    sniffer_fp.seek(0)
431
432    # now combine sampler data with packet data
433    buffered = []
434
435    cursor = db.conn.cursor()
436    cursor.execute("SELECT count(*) count,min(time_of_day) start,max(time_of_day+userspace_rtt) end from probes")
437    ptimes = cursor.fetchone()
438    window_size = 100*int((ptimes['end']-ptimes['start'])/ptimes['count'])
439    print("associate window_size:", window_size)
440
441    db.addPackets(parseJSONLines(sniffer_fp), window_size)
442
443    cursor.execute("SELECT count(*) count FROM packets WHERE probe_id is NULL")
444    unmatched = cursor.fetchone()['count']
445    if unmatched > 0:
446        sys.stderr.write("WARNING: %d observed packets didn't find a home...\n" % unmatched)
447 
448    return None
449
450
451def enumStoredTestCases(db):
452    cursor = db.conn.cursor()
453    cursor.execute("SELECT test_case FROM probes GROUP BY test_case")
454    return [tc[0] for tc in cursor]
455
456
457def findUnusualTestCase(db):
458    test_cases = enumStoredTestCases(db)
459
460    cursor = db.conn.cursor()
461    cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test')")
462    global_tm = quadsummary([row['packet_rtt'] for row in cursor])
463
464    tm_abs = []
465    tm_map = {}
466    # XXX: if more speed needed, percentile extension to sqlite might be handy...
467    for tc in test_cases:
468        cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case=?", (tc,))
469        tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor])
470        tm_abs.append((abs(tm_map[tc]-global_tm), tc))
471
472    magnitude,tc = max(tm_abs)
473    cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case<>?", (tc,))
474    remaining_tm = quadsummary([row['packet_rtt'] for row in cursor])
475
476    ret_val = (tc, tm_map[tc]-remaining_tm)
477    print("unusual_case: %s, delta: %f" % ret_val)
478    return ret_val
479
480
481def reportProgress(db, sample_types, start_time):
482    cursor = db.conn.cursor()
483    output = ''
484    total_completed = 0
485    total_requested = 0
486    for st in sample_types:
487        cursor.execute("SELECT count(id) c FROM (SELECT id FROM probes WHERE type=? AND time_of_day>? GROUP BY sample)", (st[0],int(start_time*1000000000)))
488        count = cursor.fetchone()[0]
489        output += " | %s remaining: %d" % (st[0], st[1]-count)
490        total_completed += count
491        total_requested += st[1]
492
493    rate = total_completed / (time.time() - start_time)
494    total_time = total_requested / rate       
495    eta = datetime.datetime.fromtimestamp(start_time+total_time)
496    print("STATUS:",output[3:],"| est. total_time: %s | est. ETA: %s" % (str(datetime.timedelta(seconds=total_time)), str(eta)))
Note: See TracBrowser for help on using the repository browser.