Changeset 20
- Timestamp:
- 08/18/15 22:09:24 (9 years ago)
- Location:
- trunk
- Files:
-
- 2 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/bin/sampler
r9 r20 30 30 31 31 from nanownlib import * 32 from nanownlib.platform import * 33 from nanownlib.tcpts import * 32 34 import nanownlib.storage 33 35 … … 36 38 parser.add_argument('-c', dest='cases', type=str, default='{"short":10000,"long":13000}', 37 39 help='JSON representation of echo timing cases.') 38 parser.add_argument('--no-tcpts', action='store_true', help='Dis bale TCP timestamp profiling')40 parser.add_argument('--no-tcpts', action='store_true', help='Disable TCP timestamp profiling') 39 41 parser.add_argument('--no-control', action='store_true', help='Do not collect separate control data. Instead, synthesize it from test and train data.') 40 42 parser.add_argument('session_name', default=None, … … 49 51 50 52 51 session_name = options.session_name 52 num_samples=options.sample_count 53 hostname = options.host 54 port = options.port 55 protocol = 'http' 53 num_samples = options.sample_count 54 hostname = options.host 55 port = options.port 56 protocol = 'http' 56 57 57 58 cases = json.loads(options.cases) 59 db_file = "%s.db" % options.session_name 60 db = nanownlib.storage.db(db_file) 61 58 62 59 63 def extractReportedRuntime(headers, body): … … 64 68 def sendRequest(data=None): 65 69 method = 'GET' 66 path = '/ data/clamav-audit-results.txt?t=' + data70 path = '/?t=' + data 67 71 url = "%s://%s:%d%s" % (protocol,hostname,port,path) 68 72 headers = {} … … 74 78 try: 75 79 session = requests.Session() 76 response = session.send(req )77 #print( repr(response.raw._original_response.local_address))80 response = session.send(req, allow_redirects=False) 81 #print("sendRequest:", repr(response.raw._original_response.local_address)) 78 82 reported = extractReportedRuntime(response.headers, response.text) 79 83 retry = False … … 82 86 time.sleep(1.0) 83 87 sys.stderr.write("ERROR: retrying...\n") 84 88 89 85 90 return {'userspace_rtt':response.elapsed.microseconds*1000, 86 91 'reported':reported, … … 97 102 98 103 setCPUAffinity() 99 setTCPTimestamps() 100 host_ip = socket.gethostbyname(hostname) #XXX: what about multiple A records? 104 time.sleep(0.25) # Give our process a chance to migrate to a different CPU if necessary 105 setPowersave(False) # XXX: test this to see if it helps 106 setLowLatency(True) # XXX: test this to see if it helps 107 tcpts_previous = setTCPTimestamps(True) 108 109 110 #XXX: what about multiple A records? 111 # perform this during a script generation step, measuring lowest latency server 112 # and hard-coding the IP and host name separately. However, including all 113 # server IPs in comments 114 host_ip = socket.gethostbyname(hostname) 115 101 116 102 117 meta = {'tcpts_mean':None,'tcpts_stddev':None,'tcpts_slopes':None} … … 125 140 % (meta['tcpts_mean'], meta['tcpts_stddev'], 100*meta['tcpts_stddev']/meta['tcpts_mean'])) 126 141 127 128 sniffer_fp = tempfile.NamedTemporaryFile('w+t')129 db_file = "%s.db" % session_name130 131 sniffer = startSniffer(host_ip, port, sniffer_fp.name)132 db = nanownlib.storage.db(db_file)133 142 db.addMeta(meta) 134 time.sleep(0.5) # ensure sniffer is fully ready and our process is migrated 135 136 if options.no_control: 137 num_control = 0 138 else: 139 num_control = int(num_samples*2/5) 140 141 num_train = int((num_samples-num_control)/3) 142 num_test = num_samples-num_train-num_control 143 144 sample_types = [('train',num_train), 145 ('train_null',num_control), 146 ('test',num_test)] 147 148 sid = 0 149 report_interval = 20 150 start = time.time() 151 next_report = start+report_interval 152 for st,count in sample_types: 153 if sniffer.poll() != None: 154 sys.stderr.write('ERROR: Sniffer quit, sender exiting...\n') 155 break 143 144 145 def findNextSampleID(db): 146 cursor = db.conn.cursor() 147 cursor.execute("SELECT max(sample) FROM probes") 148 row = cursor.fetchone() 149 if row != None and row[0] != None: 150 return row[0]+1 151 152 return 0 153 154 155 def collectSamples(db, sample_type, count, sniffer): 156 sniffer.start() 157 158 if not sniffer.is_running(): 159 sys.stderr.write('ERROR: Sniffer did not start...\n') 160 return 156 161 162 sid = findNextSampleID(db) 157 163 for k in range(0,count): 158 164 sample_order = list(cases.items()) 159 165 random.shuffle(sample_order) 160 if s t.endswith('null'):166 if sample_type.endswith('null'): 161 167 for i in range(1,len(sample_order)): 162 168 sample_order[i] = (sample_order[i][0],sample_order[0][1]) 163 169 random.shuffle(sample_order) 164 #print('after', sample_order)165 170 166 171 results = [] … … 168 173 for i in range(len(sample_order)): 169 174 results.append(fetch({'sample':sid, 'test_case':sample_order[i][0], 170 'type':s t, 'tc_order':i, 'time_of_day':now},175 'type':sample_type, 'tc_order':i, 'time_of_day':now}, 171 176 sample_order[i][1])) 172 177 173 #print(results)178 print(results) 174 179 db.addProbes(results) 175 180 db.conn.commit() 176 181 sid += 1 177 182 178 if (time.time() > next_report): 179 #s = time.time() 180 reportProgress(db, sample_types, start) 181 #print("reportProgress time:", time.time()-s) 182 next_report += report_interval 183 184 print("probes complete in %f" % (time.time()-start)) 185 time.sleep(2.0) # Give sniffer a chance to collect remaining packets 186 stopSniffer(sniffer) 187 188 start = time.time() 189 associatePackets(sniffer_fp, db) 190 sniffer_fp.close() 191 end = time.time() 192 print("associate time:", end-start) 193 183 time.sleep(2.0) # Give sniffer a chance to collect remaining packets 184 sniffer.stop() 185 #print(sniffer.openPacketLog().read()) 186 start = time.time() 187 associatePackets(sniffer.openPacketLog(), db) 188 end = time.time() 189 print("associate time:", end-start) 190 191 192 193 if options.no_control: 194 num_control = 0 195 else: 196 num_control = int(num_samples*2/5) 197 198 num_train = int((num_samples-num_control)/3) 199 num_test = num_samples-num_train-num_control 200 201 sample_types = [('train',num_train), 202 ('train_null',num_control), 203 ('test',num_test)] 204 205 sniffer = snifferProcess(host_ip, port) 206 for st,count in sample_types: 207 collectSamples(db, st,count,sniffer) 208 209 210 #start = time.time() 211 #report_interval = 20 212 #next_report = start+report_interval 213 # if (time.time() > next_report): 214 # reportProgress(db, sample_types, start) 215 # next_report += report_interval 216 217 194 218 if options.no_control: 195 219 print("TODO: implement control synthesizing!") … … 199 223 end = time.time() 200 224 print("analyzed %d probes' packets in: %f" % (num_probes, end-start)) 225 226 227 setPowersave(True) # XXX: test this to see if it actually helps 228 setLowLatency(False) # XXX: test this to see if it actually helps 229 setTCPTimestamps(tcpts_previous) -
trunk/lib/nanownlib/__init__.py
r16 r20 5 5 import time 6 6 import traceback 7 import random8 import argparse9 7 import socket 10 8 import datetime 11 9 import http.client 12 import threading13 import queue14 10 import subprocess 15 import multiprocessing 16 import csv 11 import tempfile 17 12 import json 18 13 import gzip 19 14 import statistics 20 try:21 import numpy22 except:23 sys.stderr.write('ERROR: Could not import numpy module. Ensure it is installed.\n')24 sys.stderr.write(' Under Debian, the package name is "python3-numpy"\n.')25 sys.exit(1)26 15 27 16 try: … … 59 48 60 49 61 def setTCPTimestamps(enabled=True): 62 fh = open('/proc/sys/net/ipv4/tcp_timestamps', 'r+b') 63 ret_val = False 64 if fh.read(1) == b'1': 65 ret_val = True 66 67 fh.seek(0) 68 if enabled: 69 fh.write(b'1') 70 else: 71 fh.write(b'0') 72 fh.close() 73 74 return ret_val 75 76 77 def trickleHTTPRequest(ip,port,hostname): 78 my_port = None 79 try: 80 sock = socket.create_connection((ip, port)) 81 my_port = sock.getsockname()[1] 50 class snifferProcess(object): 51 my_ip = None 52 my_iface = None 53 target_ip = None 54 target_port = None 55 _proc = None 56 _spool = None 57 58 def __init__(self, target_ip, target_port): 59 self.target_ip = target_ip 60 self.target_port = target_port 61 self.my_ip = getLocalIP(target_ip, target_port) 62 self.my_iface = getIfaceForIP(self.my_ip) 63 print(self.my_ip, self.my_iface) 64 65 def start(self): 66 self._spool = tempfile.NamedTemporaryFile('w+t') 67 self._proc = subprocess.Popen(['chrt', '-r', '99', 'nanown-listen', 68 self.my_iface, self.my_ip, 69 self.target_ip, "%d" % self.target_port, 70 self._spool.name, '0']) 71 time.sleep(0.25) 72 73 def openPacketLog(self): 74 return open(self._spool.name, 'rt') 82 75 83 #print('.') 84 sock.sendall(b'GET / HTTP/1.1\r\n') 85 time.sleep(0.5) 86 rest = b'''Host: '''+hostname.encode('utf-8')+b'''\r\nUser-Agent: Secret Agent Man\r\nX-Extra: extra read all about it!\r\nConnection: close\r\n''' 87 for r in rest: 88 sock.sendall(bytearray([r])) 89 time.sleep(0.05) 90 91 time.sleep(0.5) 92 sock.sendall('\r\n') 93 94 r = None 95 while r != b'': 96 r = sock.recv(16) 97 98 sock.close() 99 except Exception as e: 100 pass 101 102 return my_port 103 104 105 def runTimestampProbes(host_ip, port, hostname, num_trials, concurrency=4): 106 myq = queue.Queue() 107 def threadWrapper(*args): 108 try: 109 myq.put(trickleHTTPRequest(*args)) 110 except Exception as e: 111 sys.stderr.write("ERROR from trickleHTTPRequest: %s\n" % repr(e)) 112 myq.put(None) 113 114 threads = [] 115 ports = [] 116 for i in range(num_trials): 117 if len(threads) >= concurrency: 118 ports.append(myq.get()) 119 t = threading.Thread(target=threadWrapper, args=(host_ip, port, hostname)) 120 t.start() 121 threads.append(t) 122 123 for t in threads: 124 t.join() 125 126 while myq.qsize() > 0: 127 ports.append(myq.get()) 128 129 return ports 130 131 132 def computeTimestampPrecision(sniffer_fp, ports): 133 rcvd = [] 134 for line in sniffer_fp: 135 p = json.loads(line) 136 if p['sent']==0: 137 rcvd.append((p['observed'],p['tsval'],int(p['local_port']))) 138 139 slopes = [] 140 for port in ports: 141 trcvd = [tr for tr in rcvd if tr[2]==port and tr[1]!=0] 142 143 if len(trcvd) < 2: 144 sys.stderr.write("WARN: Inadequate data points.\n") 145 continue 146 147 if trcvd[0][1] > trcvd[-1][1]: 148 sys.stderr.write("WARN: TSval wrap.\n") 149 continue 150 151 x = [tr[1] for tr in trcvd] 152 y = [tr[0] for tr in trcvd] 153 154 slope,intercept = OLSRegression(x, y) 155 slopes.append(slope) 156 157 if len(slopes) == 0: 158 return None,None,None 159 160 m = statistics.mean(slopes) 161 if len(slopes) == 1: 162 return (m, None, slopes) 163 else: 164 return (m, statistics.stdev(slopes), slopes) 165 166 167 def OLSRegression(x,y): 168 #print(x,y) 169 x = numpy.array(x) 170 y = numpy.array(y) 171 #A = numpy.vstack([x, numpy.ones(len(x))]).T 172 #m, c = numpy.linalg.lstsq(A, y)[0] # broken 173 #c,m = numpy.polynomial.polynomial.polyfit(x, y, 1) # less accurate 174 c,m = numpy.polynomial.Polynomial.fit(x,y,1).convert().coef 175 176 #print(m,c) 177 178 #import matplotlib.pyplot as plt 179 #plt.clf() 180 #plt.scatter(x, y) 181 #plt.plot(x, m*x + c, 'r', label='Fitted line') 182 #plt.show() 183 184 return (m,c) 185 186 76 def stop(self): 77 if self._proc: 78 self._proc.terminate() 79 self._proc.wait(2) 80 if self._proc.poll() == None: 81 self._proc.kill() 82 self._proc.wait(1) 83 self._proc = None 84 85 def is_running(self): 86 return (self._proc.poll() == None) 87 88 def __del__(self): 89 self.stop() 90 91 187 92 def startSniffer(target_ip, target_port, output_file): 188 93 my_ip = getLocalIP(target_ip, target_port) … … 198 103 sniffer.wait(1) 199 104 200 201 def setCPUAffinity():202 import ctypes203 from ctypes import cdll,c_int,byref204 cpus = multiprocessing.cpu_count()205 206 libc = cdll.LoadLibrary("libc.so.6")207 #libc.sched_setaffinity(os.getpid(), 1, ctypes.byref(ctypes.c_int(0x01)))208 return libc.sched_setaffinity(0, 4, byref(c_int(0x00000001<<(cpus-1))))209 210 105 211 106 # Monkey patching that instruments the HTTPResponse to collect connection source port info … … 215 110 def __init__(self, sock, *args, **kwargs): 216 111 self.local_address = sock.getsockname() 112 #print(self.local_address) 217 113 super(MonitoredHTTPResponse, self).__init__(sock,*args,**kwargs) 218 114 -
trunk/lib/nanownlib/stats.py
r16 r20 7 7 import gzip 8 8 import random 9 import numpy 9 try: 10 import numpy 11 except: 12 sys.stderr.write('ERROR: Could not import numpy module. Ensure it is installed.\n') 13 sys.stderr.write(' Under Debian, the package name is "python3-numpy"\n.') 14 sys.exit(1) 10 15 11 16 # Don't trust numpy's seeding … … 31 36 32 37 return statistics.mean(products) 38 39 40 def OLSRegression(x,y): 41 #print(x,y) 42 x = numpy.array(x) 43 y = numpy.array(y) 44 #A = numpy.vstack([x, numpy.ones(len(x))]).T 45 #m, c = numpy.linalg.lstsq(A, y)[0] # broken 46 #c,m = numpy.polynomial.polynomial.polyfit(x, y, 1) # less accurate 47 c,m = numpy.polynomial.Polynomial.fit(x,y,1).convert().coef 48 49 #print(m,c) 50 51 #import matplotlib.pyplot as plt 52 #plt.clf() 53 #plt.scatter(x, y) 54 #plt.plot(x, m*x + c, 'r', label='Fitted line') 55 #plt.show() 56 57 return (m,c) 33 58 34 59
Note: See TracChangeset
for help on using the changeset viewer.