Changeset 20


Ignore:
Timestamp:
08/18/15 22:09:24 (9 years ago)
Author:
tim
Message:

major code refactoring, better organizing location of library functions

Location:
trunk
Files:
2 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/bin/sampler

    r9 r20  
    3030
    3131from nanownlib import *
     32from nanownlib.platform import *
     33from nanownlib.tcpts import *
    3234import nanownlib.storage
    3335
     
    3638parser.add_argument('-c', dest='cases', type=str, default='{"short":10000,"long":13000}',
    3739                    help='JSON representation of echo timing cases.')
    38 parser.add_argument('--no-tcpts', action='store_true', help='Disbale TCP timestamp profiling')
     40parser.add_argument('--no-tcpts', action='store_true', help='Disable TCP timestamp profiling')
    3941parser.add_argument('--no-control', action='store_true', help='Do not collect separate control data.  Instead, synthesize it from test and train data.')
    4042parser.add_argument('session_name', default=None,
     
    4951
    5052
    51 session_name = options.session_name
    52 num_samples=options.sample_count
    53 hostname = options.host
    54 port = options.port
    55 protocol = 'http'
     53num_samples = options.sample_count
     54hostname    = options.host
     55port        = options.port
     56protocol    = 'http'
    5657
    5758cases = json.loads(options.cases)
     59db_file = "%s.db" % options.session_name
     60db = nanownlib.storage.db(db_file)
     61
    5862
    5963def extractReportedRuntime(headers, body):
     
    6468def sendRequest(data=None):
    6569    method = 'GET'
    66     path = '/data/clamav-audit-results.txt?t=' + data
     70    path = '/?t=' + data
    6771    url = "%s://%s:%d%s" % (protocol,hostname,port,path)
    6872    headers = {}
     
    7478        try:
    7579            session = requests.Session()
    76             response = session.send(req)
    77             #print(repr(response.raw._original_response.local_address))
     80            response = session.send(req, allow_redirects=False)
     81            #print("sendRequest:", repr(response.raw._original_response.local_address))
    7882            reported = extractReportedRuntime(response.headers, response.text)
    7983            retry = False
     
    8286            time.sleep(1.0)
    8387            sys.stderr.write("ERROR: retrying...\n")
    84    
     88
     89       
    8590    return {'userspace_rtt':response.elapsed.microseconds*1000,
    8691            'reported':reported,
     
    97102
    98103setCPUAffinity()
    99 setTCPTimestamps()
    100 host_ip = socket.gethostbyname(hostname) #XXX: what about multiple A records?
     104time.sleep(0.25) # Give our process a chance to migrate to a different CPU if necessary
     105setPowersave(False) # XXX: test this to see if it helps
     106setLowLatency(True) # XXX: test this to see if it helps
     107tcpts_previous = setTCPTimestamps(True)
     108
     109
     110#XXX: what about multiple A records?
     111#     perform this during a script generation step, measuring lowest latency server
     112#     and hard-coding the IP and host name separately.  However, including all
     113#     server IPs in comments
     114host_ip = socket.gethostbyname(hostname)
     115
    101116
    102117meta = {'tcpts_mean':None,'tcpts_stddev':None,'tcpts_slopes':None}
     
    125140                     % (meta['tcpts_mean'], meta['tcpts_stddev'], 100*meta['tcpts_stddev']/meta['tcpts_mean']))
    126141
    127 
    128 sniffer_fp = tempfile.NamedTemporaryFile('w+t')
    129 db_file = "%s.db" % session_name
    130 
    131 sniffer = startSniffer(host_ip, port, sniffer_fp.name)
    132 db = nanownlib.storage.db(db_file)
    133142db.addMeta(meta)
    134 time.sleep(0.5) # ensure sniffer is fully ready and our process is migrated
    135 
    136 if options.no_control:
    137     num_control = 0
    138 else:
    139     num_control = int(num_samples*2/5)
    140 
    141 num_train = int((num_samples-num_control)/3)
    142 num_test = num_samples-num_train-num_control
    143 
    144 sample_types = [('train',num_train),
    145                 ('train_null',num_control),
    146                 ('test',num_test)]
    147 
    148 sid = 0
    149 report_interval = 20
    150 start = time.time()
    151 next_report = start+report_interval
    152 for st,count in sample_types:
    153     if sniffer.poll() != None:
    154         sys.stderr.write('ERROR: Sniffer quit, sender exiting...\n')
    155         break
     143
     144
     145def findNextSampleID(db):
     146    cursor = db.conn.cursor()
     147    cursor.execute("SELECT max(sample) FROM probes")
     148    row = cursor.fetchone()
     149    if row != None and row[0] != None:
     150        return row[0]+1
     151
     152    return 0
     153
     154
     155def collectSamples(db, sample_type, count, sniffer):
     156    sniffer.start()
     157
     158    if not sniffer.is_running():
     159        sys.stderr.write('ERROR: Sniffer did not start...\n')
     160        return
    156161       
     162    sid = findNextSampleID(db)
    157163    for k in range(0,count):
    158164        sample_order = list(cases.items())
    159165        random.shuffle(sample_order)
    160         if st.endswith('null'):
     166        if sample_type.endswith('null'):
    161167            for i in range(1,len(sample_order)):
    162168                sample_order[i] = (sample_order[i][0],sample_order[0][1])
    163169            random.shuffle(sample_order)
    164             #print('after', sample_order)
    165170           
    166171        results = []
     
    168173        for i in range(len(sample_order)):
    169174            results.append(fetch({'sample':sid, 'test_case':sample_order[i][0],
    170                                   'type':st, 'tc_order':i, 'time_of_day':now},
     175                                  'type':sample_type, 'tc_order':i, 'time_of_day':now},
    171176                                 sample_order[i][1]))
    172177
    173         #print(results)
     178        print(results)
    174179        db.addProbes(results)
    175180        db.conn.commit()
    176181        sid += 1
    177182
    178         if (time.time() > next_report):
    179             #s = time.time()
    180             reportProgress(db, sample_types, start)
    181             #print("reportProgress time:", time.time()-s)
    182             next_report += report_interval
    183 
    184 print("probes complete in %f" % (time.time()-start))
    185 time.sleep(2.0) # Give sniffer a chance to collect remaining packets
    186 stopSniffer(sniffer)
    187 
    188 start = time.time()
    189 associatePackets(sniffer_fp, db)
    190 sniffer_fp.close()
    191 end = time.time()
    192 print("associate time:", end-start)
    193 
     183    time.sleep(2.0) # Give sniffer a chance to collect remaining packets
     184    sniffer.stop()
     185    #print(sniffer.openPacketLog().read())
     186    start = time.time()
     187    associatePackets(sniffer.openPacketLog(), db)
     188    end = time.time()
     189    print("associate time:", end-start)
     190   
     191
     192
     193if options.no_control:
     194    num_control = 0
     195else:
     196    num_control = int(num_samples*2/5)
     197
     198num_train = int((num_samples-num_control)/3)
     199num_test = num_samples-num_train-num_control
     200
     201sample_types = [('train',num_train),
     202                ('train_null',num_control),
     203                ('test',num_test)]
     204
     205sniffer = snifferProcess(host_ip, port)
     206for st,count in sample_types:
     207    collectSamples(db, st,count,sniffer)
     208
     209
     210#start = time.time()
     211#report_interval = 20
     212#next_report = start+report_interval
     213#        if (time.time() > next_report):
     214#            reportProgress(db, sample_types, start)
     215#            next_report += report_interval
     216
     217   
    194218if options.no_control:
    195219    print("TODO: implement control synthesizing!")
     
    199223end = time.time()
    200224print("analyzed %d probes' packets in: %f" % (num_probes, end-start))
     225
     226
     227setPowersave(True) # XXX: test this to see if it actually helps
     228setLowLatency(False) # XXX: test this to see if it actually helps
     229setTCPTimestamps(tcpts_previous)
  • trunk/lib/nanownlib/__init__.py

    r16 r20  
    55import time
    66import traceback
    7 import random
    8 import argparse
    97import socket
    108import datetime
    119import http.client
    12 import threading
    13 import queue
    1410import subprocess
    15 import multiprocessing
    16 import csv
     11import tempfile
    1712import json
    1813import gzip
    1914import statistics
    20 try:
    21     import numpy
    22 except:
    23     sys.stderr.write('ERROR: Could not import numpy module.  Ensure it is installed.\n')
    24     sys.stderr.write('       Under Debian, the package name is "python3-numpy"\n.')
    25     sys.exit(1)
    2615
    2716try:
     
    5948
    6049
    61 def setTCPTimestamps(enabled=True):
    62     fh = open('/proc/sys/net/ipv4/tcp_timestamps', 'r+b')
    63     ret_val = False
    64     if fh.read(1) == b'1':
    65         ret_val = True
    66 
    67     fh.seek(0)
    68     if enabled:
    69         fh.write(b'1')
    70     else:
    71         fh.write(b'0')
    72     fh.close()
    73    
    74     return ret_val
    75 
    76 
    77 def trickleHTTPRequest(ip,port,hostname):
    78     my_port = None
    79     try:
    80         sock = socket.create_connection((ip, port))
    81         my_port = sock.getsockname()[1]
     50class snifferProcess(object):
     51    my_ip = None
     52    my_iface = None
     53    target_ip = None
     54    target_port = None
     55    _proc = None
     56    _spool = None
     57   
     58    def __init__(self, target_ip, target_port):
     59        self.target_ip = target_ip
     60        self.target_port = target_port
     61        self.my_ip = getLocalIP(target_ip, target_port)
     62        self.my_iface = getIfaceForIP(self.my_ip)
     63        print(self.my_ip, self.my_iface)
     64
     65    def start(self):
     66        self._spool = tempfile.NamedTemporaryFile('w+t')
     67        self._proc = subprocess.Popen(['chrt', '-r', '99', 'nanown-listen',
     68                                       self.my_iface, self.my_ip,
     69                                       self.target_ip, "%d" % self.target_port,
     70                                       self._spool.name, '0'])
     71        time.sleep(0.25)
     72
     73    def openPacketLog(self):
     74        return open(self._spool.name, 'rt')
    8275       
    83         #print('.')
    84         sock.sendall(b'GET / HTTP/1.1\r\n')
    85         time.sleep(0.5)
    86         rest = b'''Host: '''+hostname.encode('utf-8')+b'''\r\nUser-Agent: Secret Agent Man\r\nX-Extra: extra read all about it!\r\nConnection: close\r\n'''
    87         for r in rest:
    88             sock.sendall(bytearray([r]))
    89             time.sleep(0.05)
    90 
    91         time.sleep(0.5)
    92         sock.sendall('\r\n')
    93 
    94         r = None
    95         while r != b'':
    96             r = sock.recv(16)
    97 
    98         sock.close()
    99     except Exception as e:
    100         pass
    101 
    102     return my_port
    103 
    104 
    105 def runTimestampProbes(host_ip, port, hostname, num_trials, concurrency=4):
    106     myq = queue.Queue()
    107     def threadWrapper(*args):
    108         try:
    109             myq.put(trickleHTTPRequest(*args))
    110         except Exception as e:
    111             sys.stderr.write("ERROR from trickleHTTPRequest: %s\n" % repr(e))
    112             myq.put(None)
    113 
    114     threads = []
    115     ports = []
    116     for i in range(num_trials):
    117         if len(threads) >= concurrency:
    118             ports.append(myq.get())
    119         t = threading.Thread(target=threadWrapper, args=(host_ip, port, hostname))
    120         t.start()
    121         threads.append(t)
    122 
    123     for t in threads:
    124         t.join()
    125 
    126     while myq.qsize() > 0:
    127         ports.append(myq.get())
    128 
    129     return ports
    130 
    131 
    132 def computeTimestampPrecision(sniffer_fp, ports):
    133     rcvd = []
    134     for line in sniffer_fp:
    135         p = json.loads(line)
    136         if p['sent']==0:
    137             rcvd.append((p['observed'],p['tsval'],int(p['local_port'])))
    138 
    139     slopes = []
    140     for port in ports:
    141         trcvd = [tr for tr in rcvd if tr[2]==port and tr[1]!=0]
    142 
    143         if len(trcvd) < 2:
    144             sys.stderr.write("WARN: Inadequate data points.\n")
    145             continue
    146        
    147         if trcvd[0][1] > trcvd[-1][1]:
    148             sys.stderr.write("WARN: TSval wrap.\n")
    149             continue
    150 
    151         x = [tr[1] for tr in trcvd]
    152         y = [tr[0] for tr in trcvd]
    153 
    154         slope,intercept = OLSRegression(x, y)
    155         slopes.append(slope)
    156 
    157     if len(slopes) == 0:
    158         return None,None,None
    159 
    160     m = statistics.mean(slopes)
    161     if len(slopes) == 1:
    162         return (m, None, slopes)
    163     else:
    164         return (m, statistics.stdev(slopes), slopes)
    165 
    166    
    167 def OLSRegression(x,y):
    168     #print(x,y)
    169     x = numpy.array(x)
    170     y = numpy.array(y)
    171     #A = numpy.vstack([x, numpy.ones(len(x))]).T
    172     #m, c = numpy.linalg.lstsq(A, y)[0] # broken
    173     #c,m = numpy.polynomial.polynomial.polyfit(x, y, 1) # less accurate
    174     c,m = numpy.polynomial.Polynomial.fit(x,y,1).convert().coef
    175 
    176     #print(m,c)
    177 
    178     #import matplotlib.pyplot as plt
    179     #plt.clf()
    180     #plt.scatter(x, y)
    181     #plt.plot(x, m*x + c, 'r', label='Fitted line')
    182     #plt.show()
    183    
    184     return (m,c)
    185 
    186 
     76    def stop(self):
     77        if self._proc:
     78            self._proc.terminate()
     79            self._proc.wait(2)
     80            if self._proc.poll() == None:
     81                self._proc.kill()
     82                self._proc.wait(1)
     83            self._proc = None
     84   
     85    def is_running(self):
     86        return (self._proc.poll() == None)
     87           
     88    def __del__(self):
     89        self.stop()
     90
     91           
    18792def startSniffer(target_ip, target_port, output_file):
    18893    my_ip = getLocalIP(target_ip, target_port)
     
    198103        sniffer.wait(1)
    199104
    200        
    201 def setCPUAffinity():
    202     import ctypes
    203     from ctypes import cdll,c_int,byref
    204     cpus = multiprocessing.cpu_count()
    205    
    206     libc = cdll.LoadLibrary("libc.so.6")
    207     #libc.sched_setaffinity(os.getpid(), 1, ctypes.byref(ctypes.c_int(0x01)))
    208     return libc.sched_setaffinity(0, 4, byref(c_int(0x00000001<<(cpus-1))))
    209 
    210105
    211106# Monkey patching that instruments the HTTPResponse to collect connection source port info
     
    215110    def __init__(self, sock, *args, **kwargs):
    216111        self.local_address = sock.getsockname()
     112        #print(self.local_address)
    217113        super(MonitoredHTTPResponse, self).__init__(sock,*args,**kwargs)
    218114           
  • trunk/lib/nanownlib/stats.py

    r16 r20  
    77import gzip
    88import random
    9 import numpy
     9try:
     10    import numpy
     11except:
     12    sys.stderr.write('ERROR: Could not import numpy module.  Ensure it is installed.\n')
     13    sys.stderr.write('       Under Debian, the package name is "python3-numpy"\n.')
     14    sys.exit(1)
    1015
    1116# Don't trust numpy's seeding
     
    3136
    3237    return statistics.mean(products)
     38
     39   
     40def OLSRegression(x,y):
     41    #print(x,y)
     42    x = numpy.array(x)
     43    y = numpy.array(y)
     44    #A = numpy.vstack([x, numpy.ones(len(x))]).T
     45    #m, c = numpy.linalg.lstsq(A, y)[0] # broken
     46    #c,m = numpy.polynomial.polynomial.polyfit(x, y, 1) # less accurate
     47    c,m = numpy.polynomial.Polynomial.fit(x,y,1).convert().coef
     48
     49    #print(m,c)
     50
     51    #import matplotlib.pyplot as plt
     52    #plt.clf()
     53    #plt.scatter(x, y)
     54    #plt.plot(x, m*x + c, 'r', label='Fitted line')
     55    #plt.show()
     56   
     57    return (m,c)
    3358
    3459
Note: See TracChangeset for help on using the changeset viewer.