Ignore:
Timestamp:
08/18/15 22:09:24 (9 years ago)
Author:
tim
Message:

major code refactoring, better organizing location of library functions

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/nanownlib/__init__.py

    r16 r20  
    55import time
    66import traceback
    7 import random
    8 import argparse
    97import socket
    108import datetime
    119import http.client
    12 import threading
    13 import queue
    1410import subprocess
    15 import multiprocessing
    16 import csv
     11import tempfile
    1712import json
    1813import gzip
    1914import statistics
    20 try:
    21     import numpy
    22 except:
    23     sys.stderr.write('ERROR: Could not import numpy module.  Ensure it is installed.\n')
    24     sys.stderr.write('       Under Debian, the package name is "python3-numpy"\n.')
    25     sys.exit(1)
    2615
    2716try:
     
    5948
    6049
    61 def setTCPTimestamps(enabled=True):
    62     fh = open('/proc/sys/net/ipv4/tcp_timestamps', 'r+b')
    63     ret_val = False
    64     if fh.read(1) == b'1':
    65         ret_val = True
    66 
    67     fh.seek(0)
    68     if enabled:
    69         fh.write(b'1')
    70     else:
    71         fh.write(b'0')
    72     fh.close()
    73    
    74     return ret_val
    75 
    76 
    77 def trickleHTTPRequest(ip,port,hostname):
    78     my_port = None
    79     try:
    80         sock = socket.create_connection((ip, port))
    81         my_port = sock.getsockname()[1]
     50class snifferProcess(object):
     51    my_ip = None
     52    my_iface = None
     53    target_ip = None
     54    target_port = None
     55    _proc = None
     56    _spool = None
     57   
     58    def __init__(self, target_ip, target_port):
     59        self.target_ip = target_ip
     60        self.target_port = target_port
     61        self.my_ip = getLocalIP(target_ip, target_port)
     62        self.my_iface = getIfaceForIP(self.my_ip)
     63        print(self.my_ip, self.my_iface)
     64
     65    def start(self):
     66        self._spool = tempfile.NamedTemporaryFile('w+t')
     67        self._proc = subprocess.Popen(['chrt', '-r', '99', 'nanown-listen',
     68                                       self.my_iface, self.my_ip,
     69                                       self.target_ip, "%d" % self.target_port,
     70                                       self._spool.name, '0'])
     71        time.sleep(0.25)
     72
     73    def openPacketLog(self):
     74        return open(self._spool.name, 'rt')
    8275       
    83         #print('.')
    84         sock.sendall(b'GET / HTTP/1.1\r\n')
    85         time.sleep(0.5)
    86         rest = b'''Host: '''+hostname.encode('utf-8')+b'''\r\nUser-Agent: Secret Agent Man\r\nX-Extra: extra read all about it!\r\nConnection: close\r\n'''
    87         for r in rest:
    88             sock.sendall(bytearray([r]))
    89             time.sleep(0.05)
    90 
    91         time.sleep(0.5)
    92         sock.sendall('\r\n')
    93 
    94         r = None
    95         while r != b'':
    96             r = sock.recv(16)
    97 
    98         sock.close()
    99     except Exception as e:
    100         pass
    101 
    102     return my_port
    103 
    104 
    105 def runTimestampProbes(host_ip, port, hostname, num_trials, concurrency=4):
    106     myq = queue.Queue()
    107     def threadWrapper(*args):
    108         try:
    109             myq.put(trickleHTTPRequest(*args))
    110         except Exception as e:
    111             sys.stderr.write("ERROR from trickleHTTPRequest: %s\n" % repr(e))
    112             myq.put(None)
    113 
    114     threads = []
    115     ports = []
    116     for i in range(num_trials):
    117         if len(threads) >= concurrency:
    118             ports.append(myq.get())
    119         t = threading.Thread(target=threadWrapper, args=(host_ip, port, hostname))
    120         t.start()
    121         threads.append(t)
    122 
    123     for t in threads:
    124         t.join()
    125 
    126     while myq.qsize() > 0:
    127         ports.append(myq.get())
    128 
    129     return ports
    130 
    131 
    132 def computeTimestampPrecision(sniffer_fp, ports):
    133     rcvd = []
    134     for line in sniffer_fp:
    135         p = json.loads(line)
    136         if p['sent']==0:
    137             rcvd.append((p['observed'],p['tsval'],int(p['local_port'])))
    138 
    139     slopes = []
    140     for port in ports:
    141         trcvd = [tr for tr in rcvd if tr[2]==port and tr[1]!=0]
    142 
    143         if len(trcvd) < 2:
    144             sys.stderr.write("WARN: Inadequate data points.\n")
    145             continue
    146        
    147         if trcvd[0][1] > trcvd[-1][1]:
    148             sys.stderr.write("WARN: TSval wrap.\n")
    149             continue
    150 
    151         x = [tr[1] for tr in trcvd]
    152         y = [tr[0] for tr in trcvd]
    153 
    154         slope,intercept = OLSRegression(x, y)
    155         slopes.append(slope)
    156 
    157     if len(slopes) == 0:
    158         return None,None,None
    159 
    160     m = statistics.mean(slopes)
    161     if len(slopes) == 1:
    162         return (m, None, slopes)
    163     else:
    164         return (m, statistics.stdev(slopes), slopes)
    165 
    166    
    167 def OLSRegression(x,y):
    168     #print(x,y)
    169     x = numpy.array(x)
    170     y = numpy.array(y)
    171     #A = numpy.vstack([x, numpy.ones(len(x))]).T
    172     #m, c = numpy.linalg.lstsq(A, y)[0] # broken
    173     #c,m = numpy.polynomial.polynomial.polyfit(x, y, 1) # less accurate
    174     c,m = numpy.polynomial.Polynomial.fit(x,y,1).convert().coef
    175 
    176     #print(m,c)
    177 
    178     #import matplotlib.pyplot as plt
    179     #plt.clf()
    180     #plt.scatter(x, y)
    181     #plt.plot(x, m*x + c, 'r', label='Fitted line')
    182     #plt.show()
    183    
    184     return (m,c)
    185 
    186 
     76    def stop(self):
     77        if self._proc:
     78            self._proc.terminate()
     79            self._proc.wait(2)
     80            if self._proc.poll() == None:
     81                self._proc.kill()
     82                self._proc.wait(1)
     83            self._proc = None
     84   
     85    def is_running(self):
     86        return (self._proc.poll() == None)
     87           
     88    def __del__(self):
     89        self.stop()
     90
     91           
    18792def startSniffer(target_ip, target_port, output_file):
    18893    my_ip = getLocalIP(target_ip, target_port)
     
    198103        sniffer.wait(1)
    199104
    200        
    201 def setCPUAffinity():
    202     import ctypes
    203     from ctypes import cdll,c_int,byref
    204     cpus = multiprocessing.cpu_count()
    205    
    206     libc = cdll.LoadLibrary("libc.so.6")
    207     #libc.sched_setaffinity(os.getpid(), 1, ctypes.byref(ctypes.c_int(0x01)))
    208     return libc.sched_setaffinity(0, 4, byref(c_int(0x00000001<<(cpus-1))))
    209 
    210105
    211106# Monkey patching that instruments the HTTPResponse to collect connection source port info
     
    215110    def __init__(self, sock, *args, **kwargs):
    216111        self.local_address = sock.getsockname()
     112        #print(self.local_address)
    217113        super(MonitoredHTTPResponse, self).__init__(sock,*args,**kwargs)
    218114           
Note: See TracChangeset for help on using the changeset viewer.