- Timestamp:
- 08/01/15 19:01:31 (9 years ago)
- Location:
- trunk/lib/nanownlib
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/nanownlib/__init__.py
r13 r16 18 18 import gzip 19 19 import statistics 20 import numpy 21 import netifaces 20 try: 21 import numpy 22 except: 23 sys.stderr.write('ERROR: Could not import numpy module. Ensure it is installed.\n') 24 sys.stderr.write(' Under Debian, the package name is "python3-numpy"\n.') 25 sys.exit(1) 26 22 27 try: 23 28 import requests … … 39 44 40 45 def getIfaceForIP(ip): 46 try: 47 import netifaces 48 except: 49 sys.stderr.write('ERROR: Could not import netifaces module. Ensure it is installed.\n') 50 sys.stderr.write(' Try: pip3 install netifaces\n.') 51 sys.exit(1) 52 41 53 for iface in netifaces.interfaces(): 42 54 addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, None) … … 176 188 my_ip = getLocalIP(target_ip, target_port) 177 189 my_iface = getIfaceForIP(my_ip) 178 return subprocess.Popen(['chrt', '-r', '99', 'nanown- csamp', my_iface, my_ip,190 return subprocess.Popen(['chrt', '-r', '99', 'nanown-listen', my_iface, my_ip, 179 191 target_ip, "%d" % target_port, output_file, '0']) 180 192 … … 303 315 304 316 305 def analyzeProbes(db ):317 def analyzeProbes(db, trim=None, recompute=False): 306 318 db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)") 307 319 db.conn.commit() … … 316 328 pcursor.execute("DELETE FROM trim_analysis") 317 329 db.conn.commit() 330 if recompute: 331 pcursor.execute("DELETE FROM analysis") 332 db.conn.commit() 318 333 319 334 def loadPackets(db): 320 335 cursor = db.conn.cursor() 321 cursor.execute("SELECT * FROM packets ORDER BY probe_id") 336 #cursor.execute("SELECT * FROM packets ORDER BY probe_id") 337 cursor.execute("SELECT * FROM packets WHERE probe_id NOT IN (SELECT probe_id FROM analysis) ORDER BY probe_id") 322 338 323 339 probe_id = None … … 334 350 ret_val.append((probe_id,entry)) 335 351 return ret_val 336 337 start = time.time() 352 353 def processPackets(packet_cache, strim, rtrim): 354 sent_tally = [] 355 rcvd_tally = [] 356 analyses = [] 357 for probe_id,packets in packet_cache: 358 try: 359 analysis,s,r = analyzePackets(packets, timestamp_precision) 360 analysis['probe_id'] = probe_id 361 analyses.append(analysis) 362 sent_tally.append(s) 363 rcvd_tally.append(r) 364 except Exception as e: 365 #traceback.print_exc() 366 sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id) 367 db.addTrimAnalyses(analyses) 368 db.conn.commit() 369 return statistics.mode(sent_tally),statistics.mode(rcvd_tally) 370 371 #start = time.time() 338 372 packet_cache = loadPackets(db) 339 print("packets loaded in: %f" % (time.time()-start)) 340 341 count = 0 342 sent_tally = [] 343 rcvd_tally = [] 344 for probe_id,packets in packet_cache: 345 try: 346 analysis,s,r = analyzePackets(packets, timestamp_precision) 347 analysis['probe_id'] = probe_id 348 sent_tally.append(s) 349 rcvd_tally.append(r) 350 db.addTrimAnalyses([analysis]) 351 except Exception as e: 352 #traceback.print_exc() 353 sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id) 354 355 #print(pid,analysis) 356 count += 1 373 #print("packets loaded in: %f" % (time.time()-start)) 374 375 if trim != None: 376 best_strim,best_rtrim = trim 377 processPackets(packet_cache, best_strim, best_rtrim) 378 else: 379 num_sent,num_rcvd = processPackets(packet_cache, 0, 0) 380 print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd)) 381 382 for strim in range(0,num_sent): 383 for rtrim in range(0,num_rcvd): 384 #print(strim,rtrim) 385 if strim == 0 and rtrim == 0: 386 continue # no point in doing 0,0 again 387 processPackets(packet_cache, strim, rtrim) 388 389 390 unusual_case,delta = findUnusualTestCase(db, (0,0)) 391 evaluations = {} 392 for strim in range(0,num_sent): 393 for rtrim in range(0,num_rcvd): 394 evaluations[(strim,rtrim)] = evaluateTrim(db, unusual_case, strim, rtrim) 395 396 import pprint 397 pprint.pprint(evaluations) 398 399 delta_margin = 0.15 400 best_strim = 0 401 best_rtrim = 0 402 good_delta,good_mad = evaluations[(0,0)] 403 404 for strim in range(1,num_sent): 405 delta,mad = evaluations[(strim,0)] 406 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 407 best_strim = strim 408 else: 409 break 410 411 good_delta,good_mad = evaluations[(best_strim,0)] 412 for rtrim in range(1,num_rcvd): 413 delta,mad = evaluations[(best_strim,rtrim)] 414 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 415 best_rtrim = rtrim 416 else: 417 break 418 419 print("selected trim parameters:",(best_strim,best_rtrim)) 420 421 pcursor.execute("""INSERT OR IGNORE INTO analysis 422 SELECT id,probe_id,suspect,packet_rtt,tsval_rtt 423 FROM trim_analysis 424 WHERE sent_trimmed=? AND rcvd_trimmed=?""", 425 (best_strim,best_rtrim)) 357 426 db.conn.commit() 358 num_sent = statistics.mode(sent_tally) 359 num_rcvd = statistics.mode(rcvd_tally) 360 sent_tally = None 361 rcvd_tally = None 362 print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd)) 363 364 for strim in range(0,num_sent): 365 for rtrim in range(0,num_rcvd): 366 #print(strim,rtrim) 367 if strim == 0 and rtrim == 0: 368 continue # no point in doing 0,0 again 369 for probe_id,packets in packet_cache: 370 try: 371 analysis,s,r = analyzePackets(packets, timestamp_precision, strim, rtrim) 372 analysis['probe_id'] = probe_id 373 except Exception as e: 374 #traceback.print_exc() 375 sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id) 376 377 db.addTrimAnalyses([analysis]) 378 db.conn.commit() 379 380 # Populate analysis table so findUnusualTestCase can give us a starting point 381 pcursor.execute("DELETE FROM analysis") 382 db.conn.commit() 383 pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=0 AND rcvd_trimmed=0") 384 385 unusual_case,delta = findUnusualTestCase(db) 386 evaluations = {} 387 for strim in range(0,num_sent): 388 for rtrim in range(0,num_rcvd): 389 evaluations[(strim,rtrim)] = evaluateTrim(db, unusual_case, strim, rtrim) 390 391 import pprint 392 pprint.pprint(evaluations) 393 394 delta_margin = 0.15 395 best_strim = 0 396 best_rtrim = 0 397 good_delta,good_mad = evaluations[(0,0)] 398 399 for strim in range(1,num_sent): 400 delta,mad = evaluations[(strim,0)] 401 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 402 best_strim = strim 403 else: 404 break 405 406 good_delta,good_mad = evaluations[(best_strim,0)] 407 for rtrim in range(1,num_rcvd): 408 delta,mad = evaluations[(best_strim,rtrim)] 409 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 410 best_rtrim = rtrim 411 else: 412 break 413 414 print("selected trim parameters:",(best_strim,best_rtrim)) 415 416 if best_strim != 0 or best_rtrim !=0: 417 pcursor.execute("DELETE FROM analysis") 418 db.conn.commit() 419 pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=? AND rcvd_trimmed=?", 420 (best_strim,best_rtrim)) 421 422 #pcursor.execute("DELETE FROM trim_analysis") 423 db.conn.commit() 424 425 return count 427 428 return len(packet_cache) 426 429 427 430 … … 442 445 ptimes = cursor.fetchone() 443 446 window_size = 100*int((ptimes['end']-ptimes['start'])/ptimes['count']) 444 print("associate window_size:", window_size)447 #print("associate window_size:", window_size) 445 448 446 449 db.addPackets(parseJSONLines(sniffer_fp), window_size) … … 460 463 461 464 462 def findUnusualTestCase(db ):465 def findUnusualTestCase(db, trim=None): 463 466 test_cases = enumStoredTestCases(db) 464 467 if trim != None: 468 params = {'strim':trim[0], 'rtrim':trim[1]} 469 qsuffix = " AND sent_trimmed=:strim AND rcvd_trimmed=:rtrim" 470 table = "trim_analysis" 471 else: 472 params = {} 473 qsuffix = "" 474 table = "analysis" 475 465 476 cursor = db.conn.cursor() 466 cursor.execute("SELECT packet_rtt FROM probes, analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test')")477 cursor.execute("SELECT packet_rtt FROM probes,"+table+" a WHERE probes.id=a.probe_id AND probes.type in ('train','test')"+qsuffix, params) 467 478 global_tm = quadsummary([row['packet_rtt'] for row in cursor]) 468 479 469 480 tm_abs = [] 470 481 tm_map = {} 482 471 483 # XXX: if more speed needed, percentile extension to sqlite might be handy... 472 484 for tc in test_cases: 473 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case=?", (tc,)) 485 params['test_case']=tc 486 query = """SELECT packet_rtt FROM probes,"""+table+""" a 487 WHERE probes.id=a.probe_id AND probes.type in ('train','test') 488 AND probes.test_case=:test_case""" + qsuffix 489 cursor.execute(query, params) 474 490 tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor]) 475 491 tm_abs.append((abs(tm_map[tc]-global_tm), tc)) 476 492 477 493 magnitude,tc = max(tm_abs) 478 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case<>?", (tc,)) 494 params['test_case']=tc 495 query = """SELECT packet_rtt FROM probes,"""+table+""" a 496 WHERE probes.id=a.probe_id AND probes.type in ('train','test') 497 AND probes.test_case<>:test_case""" + qsuffix 498 cursor.execute(query,params) 479 499 remaining_tm = quadsummary([row['packet_rtt'] for row in cursor]) 480 500 481 ret_val = (tc, tm_map[tc]-remaining_tm) 482 print("unusual_case: %s, delta: %f" % ret_val) 483 return ret_val 501 delta = tm_map[tc]-remaining_tm 502 # Hack to make the chosen unusual_case more intuitive to the user 503 if len(test_cases) == 2 and delta < 0.0: 504 tc = [t for t in test_cases if t != tc][0] 505 delta = abs(delta) 506 507 return tc,delta 484 508 485 509 … … 492 516 cursor.execute("SELECT count(id) c FROM (SELECT id FROM probes WHERE type=? AND time_of_day>? GROUP BY sample)", (st[0],int(start_time*1000000000))) 493 517 count = cursor.fetchone()[0] 494 output += " | %s remaining: % d" % (st[0], st[1]-count)518 output += " | %s remaining: %6d" % (st[0], st[1]-count) 495 519 total_completed += count 496 520 total_requested += st[1] 497 521 498 522 rate = total_completed / (time.time() - start_time) 499 total_time = total_requested / rate 523 total_time = total_requested / rate 500 524 eta = datetime.datetime.fromtimestamp(start_time+total_time) 501 print("STATUS:",output[3:],"| est. total_time: %s | est. ETA: %s" % (str(datetime.timedelta(seconds=total_time)), str(eta))) 525 print("STATUS:",output[3:],"| est. total_time: %s | ETA: %s" % (str(datetime.timedelta(seconds=total_time)), eta.strftime("%Y-%m-%d %X"))) 526 527 528 529 def evaluateTestResults(db): 530 cursor = db.conn.cursor() 531 query = """ 532 SELECT classifier FROM classifier_results GROUP BY classifier ORDER BY classifier; 533 """ 534 cursor.execute(query) 535 classifiers = [] 536 for c in cursor: 537 classifiers.append(c[0]) 538 539 best_obs = [] 540 best_error = [] 541 max_obs = 0 542 for classifier in classifiers: 543 query=""" 544 SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error 545 FROM classifier_results 546 WHERE trial_type='test' 547 AND classifier=:classifier 548 AND (false_positives+false_negatives)/2.0 < 5.0 549 ORDER BY num_observations,(false_positives+false_negatives) 550 LIMIT 1 551 """ 552 cursor.execute(query, {'classifier':classifier}) 553 row = cursor.fetchone() 554 if row == None: 555 query=""" 556 SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error 557 FROM classifier_results 558 WHERE trial_type='test' and classifier=:classifier 559 ORDER BY (false_positives+false_negatives),num_observations 560 LIMIT 1 561 """ 562 cursor.execute(query, {'classifier':classifier}) 563 row = cursor.fetchone() 564 if row == None: 565 sys.stderr.write("WARN: couldn't find test results for classifier '%s'.\n" % classifier) 566 continue 567 row = dict(row) 568 569 best_error.append(dict(row)) 570 else: 571 best_obs.append(dict(row)) 572 573 574 return best_obs,best_error -
trunk/lib/nanownlib/parallel.py
r11 r16 48 48 49 49 def stop(self): 50 for i in range(0,len(self.workers)): 50 try: 51 while True: 52 self.workq.get(block=False) 53 self.workq.task_done() 54 except queue.Empty as e: 55 pass 56 57 for i in range(len(self.workers)): 51 58 self.workq.put(None) 52 59 for w in self.workers: -
trunk/lib/nanownlib/stats.py
r13 r16 7 7 import gzip 8 8 import random 9 import scipy10 import scipy.stats11 9 import numpy 12 10 … … 249 247 250 248 mh = f(diffs, params['distance']) 249 #print("estimate:", mh) 251 250 if greater: 252 251 if mh > params['threshold']: -
trunk/lib/nanownlib/storage.py
r11 r16 7 7 import threading 8 8 import sqlite3 9 10 import numpy 9 try: 10 import numpy 11 except: 12 sys.stderr.write('ERROR: Could not import numpy module. Ensure it is installed.\n') 13 sys.stderr.write(' Under Debian, the package name is "python3-numpy"\n.') 14 sys.exit(1) 15 11 16 # Don't trust numpy's seeding 12 17 numpy.random.seed(random.SystemRandom().randint(0,2**32-1)) … … 39 44 tcpts_mean REAL, 40 45 tcpts_stddev REAL, 41 tcpts_slopes TEXT) 46 tcpts_slopes TEXT, 47 unusual_case TEXT, 48 greater INTEGER) 42 49 """) 43 50 … … 196 203 def addPackets(self, pkts, window_size): 197 204 query = ("INSERT INTO packets (id,probe_id,sent,observed,tsval,payload_len,tcpseq,tcpack)" 198 " VALUES( randomblob(16),"205 " VALUES(hex(randomblob(16))," 199 206 "(SELECT id FROM probes WHERE local_port=:local_port AND :observed>time_of_day" 200 207 " AND :observed<time_of_day+userspace_rtt+%d" … … 254 261 self.conn.execute(query, params) 255 262 self.conn.commit() 256 263 264 def setUnusualCase(self, unusual_case, greater): 265 query = """SELECT * FROM meta LIMIT 1""" 266 cursor = self.conn.cursor() 267 cursor.execute(query) 268 row = cursor.fetchone() 269 if row == None: 270 params = {"id":_newid()} 271 else: 272 params = dict(row) 273 274 params["unusual_case"]=unusual_case 275 params["greater"]=greater 276 277 keys = params.keys() 278 columns = ','.join(keys) 279 placeholders = ':'+', :'.join(keys) 280 281 query = """INSERT OR REPLACE INTO meta (%s) VALUES (%s)""" % (columns, placeholders) 282 cursor.execute(query, params) 283 284 285 def getUnusualCase(self): 286 query = """SELECT unusual_case,greater FROM meta LIMIT 1""" 287 cursor = self.conn.cursor() 288 cursor.execute(query) 289 row = cursor.fetchone() 290 if row == None or row[0] == None or row[1] == None: 291 return None 292 else: 293 return tuple(row) -
trunk/lib/nanownlib/train.py
r13 r16 43 43 num_trials = 200 44 44 lows = [p[1] for p in performance[0:5]] 45 widths = [w/10.0 for w in range(5, 65,5)]45 widths = [w/10.0 for w in range(5,155,10)] 46 46 performance = [] 47 47 for width in widths: … … 85 85 86 86 num_trials = 500 87 widths = [good_width+(x/100.0) for x in range(- 70,75,5) if good_width+(x/100.0) > 0.0]87 widths = [good_width+(x/100.0) for x in range(-120,125,5) if good_width+(x/100.0) > 0.0] 88 88 performance = [] 89 89 for width in widths:
Note: See TracChangeset
for help on using the changeset viewer.