Changeset 10 for trunk/lib/nanownlib/__init__.py
- Timestamp:
- 07/13/15 19:16:30 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/nanownlib/__init__.py
r6 r10 4 4 import sys 5 5 import time 6 import traceback 6 7 import random 7 8 import argparse … … 209 210 def removeDuplicatePackets(packets): 210 211 #return packets 211 suspect = None212 suspect = '' 212 213 seen = {} 213 214 # XXX: Need to review this deduplication algorithm and make sure it is correct 214 215 for p in packets: 215 216 key = (p['sent'],p['tcpseq'],p['tcpack'],p['payload_len']) 216 if (key not in seen)\ 217 or p['sent']==1 and (seen[key]['observed'] < p['observed'])\ 218 or p['sent']==0 and (seen[key]['observed'] > p['observed']): 219 #if (key not in seen) or (seen[key]['observed'] > p['observed']): 217 if (key not in seen): 220 218 seen[key] = p 221 222 if len(seen) < len(packets): 223 suspect = 'd' 224 #sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen))) 219 continue 220 if p['sent']==1 and (seen[key]['observed'] > p['observed']): #earliest sent 221 seen[key] = p 222 suspect += 's' 223 continue 224 if p['sent']==0 and (seen[key]['observed'] > p['observed']): #earliest rcvd 225 seen[key] = p 226 suspect += 'r' 227 continue 228 229 #if len(seen) < len(packets): 230 # sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen))) 225 231 226 232 return suspect,seen.values() … … 230 236 suspect,packets = removeDuplicatePackets(packets) 231 237 232 #sort_key = lambda d: (d['tcpseq'],d['tcpack']) 233 sort_key = lambda d: (d['observed'],d['tcpseq']) 238 sort_key = lambda d: (d['tcpseq'],d['observed']) 234 239 sent = sorted((p for p in packets if p['sent']==1 and p['payload_len']>0), key=sort_key) 235 240 rcvd = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=sort_key) 236 241 237 if len(sent) <= trim_sent: 238 last_sent = sent[-1] 239 else: 240 last_sent = sent[trim_sent] 241 242 if len(rcvd) <= trim_rcvd: 243 last_rcvd = rcvd[0] 244 else: 245 last_rcvd = rcvd[len(rcvd)-1-trim_rcvd] 242 alt_key = lambda d: (d['observed'],d['tcpseq']) 243 rcvd_alt = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=alt_key) 244 245 s_off = trim_sent 246 if s_off >= len(sent): 247 s_off = -1 248 last_sent = sent[s_off] 249 250 r_off = len(rcvd) - trim_rcvd - 1 251 if r_off <= 0: 252 r_off = 0 253 last_rcvd = rcvd[r_off] 254 if last_rcvd != rcvd_alt[r_off]: 255 suspect += 'R' 246 256 247 257 packet_rtt = last_rcvd['observed'] - last_sent['observed'] … … 272 282 query=""" 273 283 SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis 274 WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.sample AND probes.type in ('train','test')) 275 FROM (SELECT probes.sample,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u 284 WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test')) 285 FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test') AND 1 NOT IN (select 1 from probes p,trim_analysis t WHERE p.sample=s AND t.probe_id=p.id AND t.suspect LIKE '%R%')) u 286 """ 287 query=""" 288 SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis 289 WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test')) 290 FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u 276 291 """ 277 292 … … 280 295 differences = [row[0] for row in cursor] 281 296 282 return trimean(differences),mad(differences)297 return ubersummary(differences),mad(differences) 283 298 284 299 … … 287 302 db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)") 288 303 pcursor = db.conn.cursor() 289 kcursor = db.conn.cursor()304 db.conn.commit() 290 305 291 306 pcursor.execute("SELECT tcpts_mean FROM meta") … … 297 312 pcursor.execute("DELETE FROM trim_analysis") 298 313 db.conn.commit() 314 315 def loadPackets(db): 316 cursor = db.conn.cursor() 317 cursor.execute("SELECT * FROM packets ORDER BY probe_id") 318 319 probe_id = None 320 entry = [] 321 ret_val = [] 322 for p in cursor: 323 if probe_id == None: 324 probe_id = p['probe_id'] 325 if p['probe_id'] != probe_id: 326 ret_val.append((probe_id,entry)) 327 probe_id = p['probe_id'] 328 entry = [] 329 entry.append(dict(p)) 330 ret_val.append((probe_id,entry)) 331 return ret_val 332 333 start = time.time() 334 packet_cache = loadPackets(db) 335 print("packets loaded in: %f" % (time.time()-start)) 299 336 300 337 count = 0 301 338 sent_tally = [] 302 339 rcvd_tally = [] 303 for pid, in pcursor.execute("SELECT id FROM probes"): 304 kcursor.execute("SELECT * FROM packets WHERE probe_id=?", (pid,)) 340 for probe_id,packets in packet_cache: 305 341 try: 306 analysis,s,r = analyzePackets( kcursor.fetchall(), timestamp_precision)307 analysis['probe_id'] = p id342 analysis,s,r = analyzePackets(packets, timestamp_precision) 343 analysis['probe_id'] = probe_id 308 344 sent_tally.append(s) 309 345 rcvd_tally.append(r) 346 db.addTrimAnalyses([analysis]) 310 347 except Exception as e: 311 print(e)312 sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % p id)348 traceback.print_exc() 349 sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id) 313 350 314 351 #print(pid,analysis) 315 db.addTrimAnalyses([analysis])316 352 count += 1 317 353 db.conn.commit() … … 326 362 if strim == 0 and rtrim == 0: 327 363 continue # no point in doing 0,0 again 328 for pid, in pcursor.execute("SELECT id FROM probes"): 329 kcursor.execute("SELECT * FROM packets WHERE probe_id=?", (pid,)) 364 for probe_id,packets in packet_cache: 330 365 try: 331 analysis,s,r = analyzePackets( kcursor.fetchall(), timestamp_precision, strim, rtrim)332 analysis['probe_id'] = p id366 analysis,s,r = analyzePackets(packets, timestamp_precision, strim, rtrim) 367 analysis['probe_id'] = probe_id 333 368 except Exception as e: 334 369 print(e) … … 336 371 337 372 db.addTrimAnalyses([analysis]) 338 373 db.conn.commit() 339 374 340 375 # Populate analysis table so findUnusualTestCase can give us a starting point … … 359 394 for strim in range(1,num_sent): 360 395 delta,mad = evaluations[(strim,0)] 361 if abs(good_delta - delta) < abs(delta_margin*good_delta) and mad < good_mad:396 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 362 397 best_strim = strim 363 398 else: … … 367 402 for rtrim in range(1,num_rcvd): 368 403 delta,mad = evaluations[(best_strim,rtrim)] 369 if (abs(delta) > abs(good_delta) or abs(good_delta - delta) < abs(delta_margin*good_delta)) and mad < good_mad:404 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 370 405 best_rtrim = rtrim 371 406 else: … … 425 460 cursor = db.conn.cursor() 426 461 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test')") 427 global_tm = trimean([row['packet_rtt'] for row in cursor])462 global_tm = quadsummary([row['packet_rtt'] for row in cursor]) 428 463 429 464 tm_abs = [] … … 432 467 for tc in test_cases: 433 468 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case=?", (tc,)) 434 tm_map[tc] = trimean([row['packet_rtt'] for row in cursor])469 tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor]) 435 470 tm_abs.append((abs(tm_map[tc]-global_tm), tc)) 436 471 437 472 magnitude,tc = max(tm_abs) 438 473 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case<>?", (tc,)) 439 remaining_tm = trimean([row['packet_rtt'] for row in cursor])474 remaining_tm = quadsummary([row['packet_rtt'] for row in cursor]) 440 475 441 476 ret_val = (tc, tm_map[tc]-remaining_tm)
Note: See TracChangeset
for help on using the changeset viewer.