Changeset 10 for trunk/lib/nanownlib
- Timestamp:
- 07/13/15 19:16:30 (9 years ago)
- Location:
- trunk/lib/nanownlib
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/lib/nanownlib/__init__.py
r6 r10 4 4 import sys 5 5 import time 6 import traceback 6 7 import random 7 8 import argparse … … 209 210 def removeDuplicatePackets(packets): 210 211 #return packets 211 suspect = None212 suspect = '' 212 213 seen = {} 213 214 # XXX: Need to review this deduplication algorithm and make sure it is correct 214 215 for p in packets: 215 216 key = (p['sent'],p['tcpseq'],p['tcpack'],p['payload_len']) 216 if (key not in seen)\ 217 or p['sent']==1 and (seen[key]['observed'] < p['observed'])\ 218 or p['sent']==0 and (seen[key]['observed'] > p['observed']): 219 #if (key not in seen) or (seen[key]['observed'] > p['observed']): 217 if (key not in seen): 220 218 seen[key] = p 221 222 if len(seen) < len(packets): 223 suspect = 'd' 224 #sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen))) 219 continue 220 if p['sent']==1 and (seen[key]['observed'] > p['observed']): #earliest sent 221 seen[key] = p 222 suspect += 's' 223 continue 224 if p['sent']==0 and (seen[key]['observed'] > p['observed']): #earliest rcvd 225 seen[key] = p 226 suspect += 'r' 227 continue 228 229 #if len(seen) < len(packets): 230 # sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen))) 225 231 226 232 return suspect,seen.values() … … 230 236 suspect,packets = removeDuplicatePackets(packets) 231 237 232 #sort_key = lambda d: (d['tcpseq'],d['tcpack']) 233 sort_key = lambda d: (d['observed'],d['tcpseq']) 238 sort_key = lambda d: (d['tcpseq'],d['observed']) 234 239 sent = sorted((p for p in packets if p['sent']==1 and p['payload_len']>0), key=sort_key) 235 240 rcvd = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=sort_key) 236 241 237 if len(sent) <= trim_sent: 238 last_sent = sent[-1] 239 else: 240 last_sent = sent[trim_sent] 241 242 if len(rcvd) <= trim_rcvd: 243 last_rcvd = rcvd[0] 244 else: 245 last_rcvd = rcvd[len(rcvd)-1-trim_rcvd] 242 alt_key = lambda d: (d['observed'],d['tcpseq']) 243 rcvd_alt = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=alt_key) 244 245 s_off = trim_sent 246 if s_off >= len(sent): 247 s_off = -1 248 last_sent = sent[s_off] 249 250 r_off = len(rcvd) - trim_rcvd - 1 251 if r_off <= 0: 252 r_off = 0 253 last_rcvd = rcvd[r_off] 254 if last_rcvd != rcvd_alt[r_off]: 255 suspect += 'R' 246 256 247 257 packet_rtt = last_rcvd['observed'] - last_sent['observed'] … … 272 282 query=""" 273 283 SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis 274 WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.sample AND probes.type in ('train','test')) 275 FROM (SELECT probes.sample,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u 284 WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test')) 285 FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test') AND 1 NOT IN (select 1 from probes p,trim_analysis t WHERE p.sample=s AND t.probe_id=p.id AND t.suspect LIKE '%R%')) u 286 """ 287 query=""" 288 SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis 289 WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test')) 290 FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u 276 291 """ 277 292 … … 280 295 differences = [row[0] for row in cursor] 281 296 282 return trimean(differences),mad(differences)297 return ubersummary(differences),mad(differences) 283 298 284 299 … … 287 302 db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)") 288 303 pcursor = db.conn.cursor() 289 kcursor = db.conn.cursor()304 db.conn.commit() 290 305 291 306 pcursor.execute("SELECT tcpts_mean FROM meta") … … 297 312 pcursor.execute("DELETE FROM trim_analysis") 298 313 db.conn.commit() 314 315 def loadPackets(db): 316 cursor = db.conn.cursor() 317 cursor.execute("SELECT * FROM packets ORDER BY probe_id") 318 319 probe_id = None 320 entry = [] 321 ret_val = [] 322 for p in cursor: 323 if probe_id == None: 324 probe_id = p['probe_id'] 325 if p['probe_id'] != probe_id: 326 ret_val.append((probe_id,entry)) 327 probe_id = p['probe_id'] 328 entry = [] 329 entry.append(dict(p)) 330 ret_val.append((probe_id,entry)) 331 return ret_val 332 333 start = time.time() 334 packet_cache = loadPackets(db) 335 print("packets loaded in: %f" % (time.time()-start)) 299 336 300 337 count = 0 301 338 sent_tally = [] 302 339 rcvd_tally = [] 303 for pid, in pcursor.execute("SELECT id FROM probes"): 304 kcursor.execute("SELECT * FROM packets WHERE probe_id=?", (pid,)) 340 for probe_id,packets in packet_cache: 305 341 try: 306 analysis,s,r = analyzePackets( kcursor.fetchall(), timestamp_precision)307 analysis['probe_id'] = p id342 analysis,s,r = analyzePackets(packets, timestamp_precision) 343 analysis['probe_id'] = probe_id 308 344 sent_tally.append(s) 309 345 rcvd_tally.append(r) 346 db.addTrimAnalyses([analysis]) 310 347 except Exception as e: 311 print(e)312 sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % p id)348 traceback.print_exc() 349 sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id) 313 350 314 351 #print(pid,analysis) 315 db.addTrimAnalyses([analysis])316 352 count += 1 317 353 db.conn.commit() … … 326 362 if strim == 0 and rtrim == 0: 327 363 continue # no point in doing 0,0 again 328 for pid, in pcursor.execute("SELECT id FROM probes"): 329 kcursor.execute("SELECT * FROM packets WHERE probe_id=?", (pid,)) 364 for probe_id,packets in packet_cache: 330 365 try: 331 analysis,s,r = analyzePackets( kcursor.fetchall(), timestamp_precision, strim, rtrim)332 analysis['probe_id'] = p id366 analysis,s,r = analyzePackets(packets, timestamp_precision, strim, rtrim) 367 analysis['probe_id'] = probe_id 333 368 except Exception as e: 334 369 print(e) … … 336 371 337 372 db.addTrimAnalyses([analysis]) 338 373 db.conn.commit() 339 374 340 375 # Populate analysis table so findUnusualTestCase can give us a starting point … … 359 394 for strim in range(1,num_sent): 360 395 delta,mad = evaluations[(strim,0)] 361 if abs(good_delta - delta) < abs(delta_margin*good_delta) and mad < good_mad:396 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 362 397 best_strim = strim 363 398 else: … … 367 402 for rtrim in range(1,num_rcvd): 368 403 delta,mad = evaluations[(best_strim,rtrim)] 369 if (abs(delta) > abs(good_delta) or abs(good_delta - delta) < abs(delta_margin*good_delta)) and mad < good_mad:404 if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad: 370 405 best_rtrim = rtrim 371 406 else: … … 425 460 cursor = db.conn.cursor() 426 461 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test')") 427 global_tm = trimean([row['packet_rtt'] for row in cursor])462 global_tm = quadsummary([row['packet_rtt'] for row in cursor]) 428 463 429 464 tm_abs = [] … … 432 467 for tc in test_cases: 433 468 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case=?", (tc,)) 434 tm_map[tc] = trimean([row['packet_rtt'] for row in cursor])469 tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor]) 435 470 tm_abs.append((abs(tm_map[tc]-global_tm), tc)) 436 471 437 472 magnitude,tc = max(tm_abs) 438 473 cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case<>?", (tc,)) 439 remaining_tm = trimean([row['packet_rtt'] for row in cursor])474 remaining_tm = quadsummary([row['packet_rtt'] for row in cursor]) 440 475 441 476 ret_val = (tc, tm_map[tc]-remaining_tm) -
trunk/lib/nanownlib/stats.py
r8 r10 2 2 import sys 3 3 import os 4 import functools 4 5 import math 5 6 import statistics … … 133 134 134 135 135 def midhinge(values, distance=25): 136 return (numpy.percentile(values, 50-distance) + numpy.percentile(values, 50+distance))/2.0 136 def midsummary(values, distance=25): 137 #return (numpy.percentile(values, 50-distance) + numpy.percentile(values, 50+distance))/2.0 138 l,h = numpy.percentile(values, (50-distance,50+distance)) 139 return (l+h)/2.0 137 140 138 141 def trimean(values, distance=25): 139 return (midhinge(values, distance) + statistics.median(values))/2 140 142 return (midsummary(values, distance) + statistics.median(values))/2 143 144 def ubersummary(values, distance=25): 145 left2 = 50-distance 146 left1 = left2/2.0 147 left3 = (left2+50)/2.0 148 right2 = 50+distance 149 right3 = (right2+50)/2.0 150 right1 = (right2+100)/2.0 151 l1,l2,l3,r3,r2,r1 = numpy.percentile(values, (left1,left2,left3,right3,right2,right1)) 152 #print(left1,left2,left3,50,right3,right2,right1) 153 #print(l1,l2,l3,m,r3,r2,r1) 154 return (l1+l2*4+l3+r3+r2*4+r1)/12.0 155 #return statistics.mean((l1,l2,l3,m,r3,r2,r1)) 156 157 def quadsummary(values, distance=25): 158 left2 = 50-distance 159 left1 = left2/2.0 160 right2 = 50+distance 161 right1 = (right2+100)/2.0 162 l1,l2,r2,r1 = numpy.percentile(values, (left1,left2,right2,right1)) 163 #print(left1,left2,left3,50,right3,right2,right1) 164 #print(l1,l2,l3,m,r3,r2,r1) 165 return (l1+l2+r2+r1)/4.0 166 #return statistics.mean((l1,l2,l3,m,r3,r2,r1)) 167 168 def quadsummary(values, distance=25): 169 left1 = 50-distance 170 left2 = (left1+50)/2.0 171 right1 = 50+distance 172 right2 = (right1+50)/2.0 173 l1,l2,r2,r1 = numpy.percentile(values, (left1,left2,right2,right1)) 174 #print(left1,left2,left3,50,right3,right2,right1) 175 #print(l1,l2,l3,m,r3,r2,r1) 176 return (l1+l2+r2+r1)/4.0 177 #return statistics.mean((l1,l2,l3,m,r3,r2,r1)) 178 179 141 180 def weightedMean(derived, weights): 142 181 normalizer = sum(weights.values())/len(weights) … … 170 209 171 210 172 def estimateMid hinge(derived):173 return mid hinge([(d['long']-d['short']) for d in derived.values()])211 def estimateMidsummary(derived): 212 return midsummary([(d['long']-d['short']) for d in derived.values()]) 174 213 175 214 … … 348 387 rest = [s['other_cases'] for s in samples] 349 388 350 uc_high = numpy.percentile(uc, params['high'])351 rest_ low = numpy.percentile(rest, params['low'])389 uc_high,uc_low = numpy.percentile(uc, (params['high'],params['low'])) 390 rest_high,rest_low = numpy.percentile(rest, (params['high'],params['low'])) 352 391 if uc_high < rest_low: 353 392 if greater: … … 356 395 return 1 357 396 358 uc_low = numpy.percentile(uc, params['low'])359 rest_high = numpy.percentile(rest, params['high'])360 397 if rest_high < uc_low: 361 398 if greater: … … 369 406 # Returns 1 if unusual_case is unusual in the expected direction 370 407 # 0 otherwise 371 def midhingeTest(params, greater, samples):408 def summaryTest(f, params, greater, samples): 372 409 diffs = [s['unusual_case']-s['other_cases'] for s in samples] 373 410 374 mh = midhinge(diffs, params['distance']) 375 #mh = trimean(diffs, params['distance']) 411 mh = f(diffs, params['distance']) 376 412 if greater: 377 413 if mh > params['threshold']: … … 385 421 return 0 386 422 423 midsummaryTest = functools.partial(summaryTest, midsummary) 424 trimeanTest = functools.partial(summaryTest, trimean) 425 ubersummaryTest = functools.partial(summaryTest, ubersummary) 426 quadsummaryTest = functools.partial(summaryTest, quadsummary) 387 427 388 428 def rmse(expected, measurements): … … 392 432 def nrmse(expected, measurements): 393 433 return rmse(expected, measurements)/(max(measurements)-min(measurements)) 434 435 436 class KalmanFilter1D: 437 def __init__(self, x0, P, R, Q): 438 self.x = x0 439 self.P = P 440 self.R = R 441 self.Q = Q 442 443 def update(self, z): 444 self.x = (self.P * z + self.x * self.R) / (self.P + self.R) 445 self.P = 1. / (1./self.P + 1./self.R) 446 447 def predict(self, u=0.0): 448 self.x += u 449 self.P += self.Q 450 451 452 def kfilter(params, observations): 453 x = numpy.array(observations) 454 movement = 0 455 est = [] 456 var = [] 457 kf = KalmanFilter1D(x0 = quadsummary(x), # initial state 458 #P = 10000, # initial variance 459 P = 10, # initial variance 460 R = numpy.std(x), # msensor noise 461 Q = 0) # movement noise 462 for round in range(1): 463 for d in x: 464 kf.predict(movement) 465 kf.update(d) 466 est.append(kf.x) 467 var.append(kf.P) 468 469 return({'est':est, 'var':var}) 470 471 472 def kalmanTest(params, greater, samples): 473 diffs = [s['unusual_case']-s['other_cases'] for s in samples] 474 475 m = kfilter(params, diffs)['est'][-1] 476 if greater: 477 if m > params['threshold']: 478 return 1 479 else: 480 return 0 481 else: 482 if m < params['threshold']: 483 return 1 484 else: 485 return 0 486 487 488 def kalmanTest2(params, greater, samples): 489 diffs = [s['unusual_case']-s['other_cases'] for s in samples] 490 491 estimates = [] 492 size = 500 493 for i in range(100): 494 off = random.randrange(0,len(diffs)) 495 sub = diffs[off:size] 496 if len(sub) < size: 497 sub += diffs[0:size-len(sub)] 498 estimates.append(kfilter(params, sub)['est'][-1]) 499 500 m = quadsummary(estimates) 501 if greater: 502 if m > params['threshold']: 503 return 1 504 else: 505 return 0 506 else: 507 if m < params['threshold']: 508 return 1 509 else: 510 return 0 511 -
trunk/lib/nanownlib/storage.py
r9 r10 4 4 import os 5 5 import uuid 6 import random 6 7 import threading 7 8 import sqlite3 8 9 9 10 import numpy 11 # Don't trust numpy's seeding 12 numpy.random.seed(random.SystemRandom().randint(0,2**32-1)) 10 13 11 14 def _newid(): … … 18 21 _population_sizes = None 19 22 _population_cache = None 23 _offset_cache = None 24 _cur_offsets = None 20 25 21 26 def __init__(self, path): … … 26 31 self._population_sizes = {} 27 32 self._population_cache = {} 33 self._offset_cache = {} 34 self._cur_offsets = {} 28 35 29 36 if not exists: … … 79 86 self.conn.execute( 80 87 """CREATE TABLE classifier_results (id BLOB PRIMARY KEY, 81 algorithm TEXT, 88 classifier TEXT, 89 trial_type TEXT, 90 num_observations INTEGER, 91 num_trials INTEGER, 82 92 params TEXT, 83 sample_size INTEGER,84 num_trials INTEGER,85 trial_type TEXT,86 93 false_positives REAL, 87 94 false_negatives REAL) … … 109 116 110 117 def subseries(self, probe_type, unusual_case, size=None, offset=None, field='packet_rtt'): 111 if (probe_type,unusual_case,field) not in self._population_cache: 118 cache_key = (probe_type,unusual_case,field) 119 120 if cache_key not in self._population_cache: 112 121 query=""" 113 122 SELECT %(field)s AS unusual_case, … … 121 130 cursor = self.conn.cursor() 122 131 cursor.execute(query, params) 123 self._population_cache[(probe_type,unusual_case,field)] = [dict(row) for row in cursor.fetchall()] 124 125 population = self._population_cache[(probe_type,unusual_case,field)] 132 p = [dict(row) for row in cursor.fetchall()] 133 self._population_cache[cache_key] = p 134 self._offset_cache[cache_key] = tuple(numpy.random.random_integers(0,len(p)-1, len(p)/5)) 135 self._cur_offsets[cache_key] = 0 136 137 population = self._population_cache[cache_key] 126 138 127 139 if size == None or size > len(population): 128 140 size = len(population) 129 141 if offset == None or offset >= len(population) or offset < 0: 130 offset = numpy.random.random_integers(0,len(population)-1) 131 142 offset = self._offset_cache[cache_key][self._cur_offsets[cache_key]] 143 self._cur_offsets[cache_key] = (offset + 1) % len(self._offset_cache[cache_key]) 144 132 145 try: 133 ret_val = population[offset:offset+size] 146 offset = int(offset) 147 size = int(size) 134 148 except Exception as e: 135 149 print(e, offset, size) 136 150 return None 151 152 ret_val = population[offset:offset+size] 137 153 if len(ret_val) < size: 138 154 ret_val += population[0:size-len(ret_val)] 139 155 140 156 return ret_val 141 142 157 158 159 def resetOffsets(self): 160 for k in self._cur_offsets.keys(): 161 self._cur_offsets[k] = 0 162 163 143 164 def clearCache(self): 144 165 self._population_cache = {} 166 self._offset_cache = {} 167 self._cur_offsets = {} 145 168 146 169 … … 188 211 self.conn.commit() 189 212 return ret_val 213 214 def fetchClassifierResult(self, classifier, trial_type, num_observations): 215 query = """ 216 SELECT * FROM classifier_results 217 WHERE classifier=? AND trial_type=? AND num_observations=? 218 ORDER BY false_positives+false_negatives 219 LIMIT 1; 220 """ 221 cursor = self.conn.cursor() 222 cursor.execute(query, (classifier, trial_type, num_observations)) 223 ret_val = cursor.fetchone() 224 225 if ret_val != None: 226 ret_val = dict(ret_val) 227 return ret_val 228 229 def deleteClassifierResults(self, classifier, trial_type, num_observations=None): 230 params = {"classifier":classifier,"trial_type":trial_type,"num_observations":num_observations} 231 query = """ 232 DELETE FROM classifier_results 233 WHERE classifier=:classifier AND trial_type=:trial_type 234 """ 235 if num_observations != None: 236 query += " AND num_observations=:num_observations" 237 238 self.conn.execute(query, params) 239 self.conn.commit() 240
Note: See TracChangeset
for help on using the changeset viewer.