Ignore:
Timestamp:
07/13/15 19:16:30 (9 years ago)
Author:
tim
Message:

.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/nanownlib/__init__.py

    r6 r10  
    44import sys
    55import time
     6import traceback
    67import random
    78import argparse
     
    209210def removeDuplicatePackets(packets):
    210211    #return packets
    211     suspect = None
     212    suspect = ''
    212213    seen = {}
    213214    # XXX: Need to review this deduplication algorithm and make sure it is correct
    214215    for p in packets:
    215216        key = (p['sent'],p['tcpseq'],p['tcpack'],p['payload_len'])
    216         if (key not in seen)\
    217            or p['sent']==1 and (seen[key]['observed'] < p['observed'])\
    218            or p['sent']==0 and (seen[key]['observed'] > p['observed']):
    219             #if (key not in seen) or (seen[key]['observed'] > p['observed']):
     217        if (key not in seen):
    220218            seen[key] = p
    221    
    222     if len(seen) < len(packets):
    223         suspect = 'd'
    224         #sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen)))
     219            continue
     220        if p['sent']==1 and (seen[key]['observed'] > p['observed']): #earliest sent
     221            seen[key] = p
     222            suspect += 's'
     223            continue
     224        if p['sent']==0 and (seen[key]['observed'] > p['observed']): #earliest rcvd
     225            seen[key] = p
     226            suspect += 'r'
     227            continue
     228   
     229    #if len(seen) < len(packets):
     230    #   sys.stderr.write("INFO: removed %d duplicate packets.\n" % (len(packets) - len(seen)))
    225231
    226232    return suspect,seen.values()
     
    230236    suspect,packets = removeDuplicatePackets(packets)
    231237
    232     #sort_key = lambda d: (d['tcpseq'],d['tcpack'])
    233     sort_key = lambda d: (d['observed'],d['tcpseq'])
     238    sort_key = lambda d: (d['tcpseq'],d['observed'])
    234239    sent = sorted((p for p in packets if p['sent']==1 and p['payload_len']>0), key=sort_key)
    235240    rcvd = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=sort_key)
    236241
    237     if len(sent) <= trim_sent:
    238         last_sent = sent[-1]
    239     else:
    240         last_sent = sent[trim_sent]
    241 
    242     if len(rcvd) <= trim_rcvd:
    243         last_rcvd = rcvd[0]
    244     else:
    245         last_rcvd = rcvd[len(rcvd)-1-trim_rcvd]
     242    alt_key = lambda d: (d['observed'],d['tcpseq'])
     243    rcvd_alt = sorted((p for p in packets if p['sent']==0 and p['payload_len']>0), key=alt_key)
     244
     245    s_off = trim_sent
     246    if s_off >= len(sent):
     247        s_off = -1
     248    last_sent = sent[s_off]
     249
     250    r_off = len(rcvd) - trim_rcvd - 1
     251    if r_off <= 0:
     252        r_off = 0
     253    last_rcvd = rcvd[r_off]
     254    if last_rcvd != rcvd_alt[r_off]:
     255        suspect += 'R'
    246256   
    247257    packet_rtt = last_rcvd['observed'] - last_sent['observed']
     
    272282    query="""
    273283      SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis
    274                          WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.sample AND probes.type in ('train','test'))
    275       FROM (SELECT probes.sample,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u
     284                         WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test'))
     285      FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test') AND 1 NOT IN (select 1 from probes p,trim_analysis t WHERE p.sample=s AND t.probe_id=p.id AND t.suspect LIKE '%R%')) u
     286    """
     287    query="""
     288      SELECT packet_rtt-(SELECT avg(packet_rtt) FROM probes,trim_analysis
     289                         WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case!=:unusual_case AND sample=u.s AND probes.type in ('train','test'))
     290      FROM (SELECT probes.sample s,packet_rtt FROM probes,trim_analysis WHERE sent_trimmed=:strim AND rcvd_trimmed=:rtrim AND trim_analysis.probe_id=probes.id AND probes.test_case=:unusual_case AND probes.type in ('train','test')) u
    276291    """
    277292
     
    280295    differences = [row[0] for row in cursor]
    281296   
    282     return trimean(differences),mad(differences)
     297    return ubersummary(differences),mad(differences)
    283298
    284299
     
    287302    db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)")
    288303    pcursor = db.conn.cursor()
    289     kcursor = db.conn.cursor()
     304    db.conn.commit()
    290305
    291306    pcursor.execute("SELECT tcpts_mean FROM meta")
     
    297312    pcursor.execute("DELETE FROM trim_analysis")
    298313    db.conn.commit()
     314
     315    def loadPackets(db):
     316        cursor = db.conn.cursor()
     317        cursor.execute("SELECT * FROM packets ORDER BY probe_id")
     318
     319        probe_id = None
     320        entry = []
     321        ret_val = []
     322        for p in cursor:
     323            if probe_id == None:
     324                probe_id = p['probe_id']
     325            if p['probe_id'] != probe_id:
     326                ret_val.append((probe_id,entry))
     327                probe_id = p['probe_id']
     328                entry = []
     329            entry.append(dict(p))
     330        ret_val.append((probe_id,entry))
     331        return ret_val
     332   
     333    start = time.time()
     334    packet_cache = loadPackets(db)
     335    print("packets loaded in: %f" % (time.time()-start))
    299336   
    300337    count = 0
    301338    sent_tally = []
    302339    rcvd_tally = []
    303     for pid, in pcursor.execute("SELECT id FROM probes"):
    304         kcursor.execute("SELECT * FROM packets WHERE probe_id=?", (pid,))
     340    for probe_id,packets in packet_cache:
    305341        try:
    306             analysis,s,r = analyzePackets(kcursor.fetchall(), timestamp_precision)
    307             analysis['probe_id'] = pid
     342            analysis,s,r = analyzePackets(packets, timestamp_precision)
     343            analysis['probe_id'] = probe_id
    308344            sent_tally.append(s)
    309345            rcvd_tally.append(r)
     346            db.addTrimAnalyses([analysis])
    310347        except Exception as e:
    311             print(e)
    312             sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % pid)
     348            traceback.print_exc()
     349            sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
    313350       
    314351        #print(pid,analysis)
    315         db.addTrimAnalyses([analysis])
    316352        count += 1
    317353    db.conn.commit()
     
    326362            if strim == 0 and rtrim == 0:
    327363                continue # no point in doing 0,0 again
    328             for pid, in pcursor.execute("SELECT id FROM probes"):
    329                 kcursor.execute("SELECT * FROM packets WHERE probe_id=?", (pid,))
     364            for probe_id,packets in packet_cache:
    330365                try:
    331                     analysis,s,r = analyzePackets(kcursor.fetchall(), timestamp_precision, strim, rtrim)
    332                     analysis['probe_id'] = pid
     366                    analysis,s,r = analyzePackets(packets, timestamp_precision, strim, rtrim)
     367                    analysis['probe_id'] = probe_id
    333368                except Exception as e:
    334369                    print(e)
     
    336371                   
    337372                db.addTrimAnalyses([analysis])
    338             db.conn.commit()
     373    db.conn.commit()
    339374
    340375    # Populate analysis table so findUnusualTestCase can give us a starting point
     
    359394    for strim in range(1,num_sent):
    360395        delta,mad = evaluations[(strim,0)]
    361         if abs(good_delta - delta) < abs(delta_margin*good_delta) and mad < good_mad:
     396        if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
    362397            best_strim = strim
    363398        else:
     
    367402    for rtrim in range(1,num_rcvd):
    368403        delta,mad = evaluations[(best_strim,rtrim)]
    369         if (abs(delta) > abs(good_delta) or abs(good_delta - delta) < abs(delta_margin*good_delta)) and mad < good_mad:
     404        if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
    370405            best_rtrim = rtrim
    371406        else:
     
    425460    cursor = db.conn.cursor()
    426461    cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test')")
    427     global_tm = trimean([row['packet_rtt'] for row in cursor])
     462    global_tm = quadsummary([row['packet_rtt'] for row in cursor])
    428463
    429464    tm_abs = []
     
    432467    for tc in test_cases:
    433468        cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case=?", (tc,))
    434         tm_map[tc] = trimean([row['packet_rtt'] for row in cursor])
     469        tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor])
    435470        tm_abs.append((abs(tm_map[tc]-global_tm), tc))
    436471
    437472    magnitude,tc = max(tm_abs)
    438473    cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case<>?", (tc,))
    439     remaining_tm = trimean([row['packet_rtt'] for row in cursor])
     474    remaining_tm = quadsummary([row['packet_rtt'] for row in cursor])
    440475
    441476    ret_val = (tc, tm_map[tc]-remaining_tm)
Note: See TracChangeset for help on using the changeset viewer.