Ignore:
Timestamp:
08/01/15 19:01:31 (9 years ago)
Author:
tim
Message:

.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/nanownlib/__init__.py

    r13 r16  
    1818import gzip
    1919import statistics
    20 import numpy
    21 import netifaces
     20try:
     21    import numpy
     22except:
     23    sys.stderr.write('ERROR: Could not import numpy module.  Ensure it is installed.\n')
     24    sys.stderr.write('       Under Debian, the package name is "python3-numpy"\n.')
     25    sys.exit(1)
     26
    2227try:
    2328    import requests
     
    3944
    4045def getIfaceForIP(ip):
     46    try:
     47        import netifaces
     48    except:
     49        sys.stderr.write('ERROR: Could not import netifaces module.  Ensure it is installed.\n')
     50        sys.stderr.write('       Try: pip3 install netifaces\n.')
     51        sys.exit(1)
     52   
    4153    for iface in netifaces.interfaces():
    4254        addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, None)
     
    176188    my_ip = getLocalIP(target_ip, target_port)
    177189    my_iface = getIfaceForIP(my_ip)
    178     return subprocess.Popen(['chrt', '-r', '99', 'nanown-csamp', my_iface, my_ip,
     190    return subprocess.Popen(['chrt', '-r', '99', 'nanown-listen', my_iface, my_ip,
    179191                             target_ip, "%d" % target_port, output_file, '0'])
    180192
     
    303315
    304316
    305 def analyzeProbes(db):
     317def analyzeProbes(db, trim=None, recompute=False):
    306318    db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)")
    307319    db.conn.commit()
     
    316328    pcursor.execute("DELETE FROM trim_analysis")
    317329    db.conn.commit()
     330    if recompute:
     331        pcursor.execute("DELETE FROM analysis")
     332        db.conn.commit()
    318333
    319334    def loadPackets(db):
    320335        cursor = db.conn.cursor()
    321         cursor.execute("SELECT * FROM packets ORDER BY probe_id")
     336        #cursor.execute("SELECT * FROM packets ORDER BY probe_id")
     337        cursor.execute("SELECT * FROM packets WHERE probe_id NOT IN (SELECT probe_id FROM analysis) ORDER BY probe_id")
    322338
    323339        probe_id = None
     
    334350        ret_val.append((probe_id,entry))
    335351        return ret_val
    336    
    337     start = time.time()
     352
     353    def processPackets(packet_cache, strim, rtrim):
     354        sent_tally = []
     355        rcvd_tally = []
     356        analyses = []
     357        for probe_id,packets in packet_cache:
     358            try:
     359                analysis,s,r = analyzePackets(packets, timestamp_precision)
     360                analysis['probe_id'] = probe_id
     361                analyses.append(analysis)
     362                sent_tally.append(s)
     363                rcvd_tally.append(r)
     364            except Exception as e:
     365                #traceback.print_exc()
     366                sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
     367        db.addTrimAnalyses(analyses)
     368        db.conn.commit()
     369        return statistics.mode(sent_tally),statistics.mode(rcvd_tally)
     370   
     371    #start = time.time()
    338372    packet_cache = loadPackets(db)
    339     print("packets loaded in: %f" % (time.time()-start))
    340    
    341     count = 0
    342     sent_tally = []
    343     rcvd_tally = []
    344     for probe_id,packets in packet_cache:
    345         try:
    346             analysis,s,r = analyzePackets(packets, timestamp_precision)
    347             analysis['probe_id'] = probe_id
    348             sent_tally.append(s)
    349             rcvd_tally.append(r)
    350             db.addTrimAnalyses([analysis])
    351         except Exception as e:
    352             #traceback.print_exc()
    353             sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
    354        
    355         #print(pid,analysis)
    356         count += 1
     373    #print("packets loaded in: %f" % (time.time()-start))
     374
     375    if trim != None:
     376        best_strim,best_rtrim = trim
     377        processPackets(packet_cache, best_strim, best_rtrim)
     378    else:
     379        num_sent,num_rcvd = processPackets(packet_cache, 0, 0)
     380        print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd))
     381   
     382        for strim in range(0,num_sent):
     383            for rtrim in range(0,num_rcvd):
     384                #print(strim,rtrim)
     385                if strim == 0 and rtrim == 0:
     386                    continue # no point in doing 0,0 again
     387                processPackets(packet_cache, strim, rtrim)
     388
     389   
     390        unusual_case,delta = findUnusualTestCase(db, (0,0))
     391        evaluations = {}
     392        for strim in range(0,num_sent):
     393            for rtrim in range(0,num_rcvd):
     394                evaluations[(strim,rtrim)] = evaluateTrim(db, unusual_case, strim, rtrim)
     395
     396        import pprint
     397        pprint.pprint(evaluations)
     398
     399        delta_margin = 0.15
     400        best_strim = 0
     401        best_rtrim = 0
     402        good_delta,good_mad = evaluations[(0,0)]
     403   
     404        for strim in range(1,num_sent):
     405            delta,mad = evaluations[(strim,0)]
     406            if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
     407                best_strim = strim
     408            else:
     409                break
     410
     411        good_delta,good_mad = evaluations[(best_strim,0)]
     412        for rtrim in range(1,num_rcvd):
     413            delta,mad = evaluations[(best_strim,rtrim)]
     414            if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
     415                best_rtrim = rtrim
     416            else:
     417                break
     418
     419        print("selected trim parameters:",(best_strim,best_rtrim))
     420   
     421    pcursor.execute("""INSERT OR IGNORE INTO analysis
     422                         SELECT id,probe_id,suspect,packet_rtt,tsval_rtt
     423                           FROM trim_analysis
     424                           WHERE sent_trimmed=? AND rcvd_trimmed=?""",
     425                    (best_strim,best_rtrim))
    357426    db.conn.commit()
    358     num_sent = statistics.mode(sent_tally)
    359     num_rcvd = statistics.mode(rcvd_tally)
    360     sent_tally = None
    361     rcvd_tally = None
    362     print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd))
    363    
    364     for strim in range(0,num_sent):
    365         for rtrim in range(0,num_rcvd):
    366             #print(strim,rtrim)
    367             if strim == 0 and rtrim == 0:
    368                 continue # no point in doing 0,0 again
    369             for probe_id,packets in packet_cache:
    370                 try:
    371                     analysis,s,r = analyzePackets(packets, timestamp_precision, strim, rtrim)
    372                     analysis['probe_id'] = probe_id
    373                 except Exception as e:
    374                     #traceback.print_exc()
    375                     sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
    376                    
    377                 db.addTrimAnalyses([analysis])
    378     db.conn.commit()
    379 
    380     # Populate analysis table so findUnusualTestCase can give us a starting point
    381     pcursor.execute("DELETE FROM analysis")
    382     db.conn.commit()
    383     pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=0 AND rcvd_trimmed=0")
    384    
    385     unusual_case,delta = findUnusualTestCase(db)
    386     evaluations = {}
    387     for strim in range(0,num_sent):
    388         for rtrim in range(0,num_rcvd):
    389             evaluations[(strim,rtrim)] = evaluateTrim(db, unusual_case, strim, rtrim)
    390 
    391     import pprint
    392     pprint.pprint(evaluations)
    393 
    394     delta_margin = 0.15
    395     best_strim = 0
    396     best_rtrim = 0
    397     good_delta,good_mad = evaluations[(0,0)]
    398    
    399     for strim in range(1,num_sent):
    400         delta,mad = evaluations[(strim,0)]
    401         if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
    402             best_strim = strim
    403         else:
    404             break
    405 
    406     good_delta,good_mad = evaluations[(best_strim,0)]
    407     for rtrim in range(1,num_rcvd):
    408         delta,mad = evaluations[(best_strim,rtrim)]
    409         if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
    410             best_rtrim = rtrim
    411         else:
    412             break
    413 
    414     print("selected trim parameters:",(best_strim,best_rtrim))
    415    
    416     if best_strim != 0 or best_rtrim !=0:
    417         pcursor.execute("DELETE FROM analysis")
    418         db.conn.commit()
    419         pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=? AND rcvd_trimmed=?",
    420                         (best_strim,best_rtrim))
    421 
    422     #pcursor.execute("DELETE FROM trim_analysis")
    423     db.conn.commit()
    424    
    425     return count
     427   
     428    return len(packet_cache)
    426429
    427430
     
    442445    ptimes = cursor.fetchone()
    443446    window_size = 100*int((ptimes['end']-ptimes['start'])/ptimes['count'])
    444     print("associate window_size:", window_size)
     447    #print("associate window_size:", window_size)
    445448
    446449    db.addPackets(parseJSONLines(sniffer_fp), window_size)
     
    460463
    461464
    462 def findUnusualTestCase(db):
     465def findUnusualTestCase(db, trim=None):
    463466    test_cases = enumStoredTestCases(db)
    464 
     467    if trim != None:
     468        params = {'strim':trim[0], 'rtrim':trim[1]}
     469        qsuffix = " AND sent_trimmed=:strim AND rcvd_trimmed=:rtrim"
     470        table = "trim_analysis"
     471    else:
     472        params = {}
     473        qsuffix = ""
     474        table = "analysis"
     475   
    465476    cursor = db.conn.cursor()
    466     cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test')")
     477    cursor.execute("SELECT packet_rtt FROM probes,"+table+" a WHERE probes.id=a.probe_id AND probes.type in ('train','test')"+qsuffix, params)
    467478    global_tm = quadsummary([row['packet_rtt'] for row in cursor])
    468479
    469480    tm_abs = []
    470481    tm_map = {}
     482
    471483    # XXX: if more speed needed, percentile extension to sqlite might be handy...
    472484    for tc in test_cases:
    473         cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case=?", (tc,))
     485        params['test_case']=tc
     486        query = """SELECT packet_rtt FROM probes,"""+table+""" a
     487                   WHERE probes.id=a.probe_id AND probes.type in ('train','test')
     488                   AND probes.test_case=:test_case""" + qsuffix
     489        cursor.execute(query, params)
    474490        tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor])
    475491        tm_abs.append((abs(tm_map[tc]-global_tm), tc))
    476492
    477493    magnitude,tc = max(tm_abs)
    478     cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case<>?", (tc,))
     494    params['test_case']=tc
     495    query = """SELECT packet_rtt FROM probes,"""+table+""" a
     496               WHERE probes.id=a.probe_id AND probes.type in ('train','test')
     497               AND probes.test_case<>:test_case""" + qsuffix
     498    cursor.execute(query,params)
    479499    remaining_tm = quadsummary([row['packet_rtt'] for row in cursor])
    480500
    481     ret_val = (tc, tm_map[tc]-remaining_tm)
    482     print("unusual_case: %s, delta: %f" % ret_val)
    483     return ret_val
     501    delta = tm_map[tc]-remaining_tm
     502    # Hack to make the chosen unusual_case more intuitive to the user
     503    if len(test_cases) == 2 and delta < 0.0:
     504        tc = [t for t in test_cases if t != tc][0]
     505        delta = abs(delta)
     506
     507    return tc,delta
    484508
    485509
     
    492516        cursor.execute("SELECT count(id) c FROM (SELECT id FROM probes WHERE type=? AND time_of_day>? GROUP BY sample)", (st[0],int(start_time*1000000000)))
    493517        count = cursor.fetchone()[0]
    494         output += " | %s remaining: %d" % (st[0], st[1]-count)
     518        output += " | %s remaining: %6d" % (st[0], st[1]-count)
    495519        total_completed += count
    496520        total_requested += st[1]
    497521
    498522    rate = total_completed / (time.time() - start_time)
    499     total_time = total_requested / rate       
     523    total_time = total_requested / rate
    500524    eta = datetime.datetime.fromtimestamp(start_time+total_time)
    501     print("STATUS:",output[3:],"| est. total_time: %s | est. ETA: %s" % (str(datetime.timedelta(seconds=total_time)), str(eta)))
     525    print("STATUS:",output[3:],"| est. total_time: %s | ETA: %s" % (str(datetime.timedelta(seconds=total_time)), eta.strftime("%Y-%m-%d %X")))
     526
     527
     528
     529def evaluateTestResults(db):
     530    cursor = db.conn.cursor()
     531    query = """
     532      SELECT classifier FROM classifier_results GROUP BY classifier ORDER BY classifier;
     533    """
     534    cursor.execute(query)
     535    classifiers = []
     536    for c in cursor:
     537        classifiers.append(c[0])
     538
     539    best_obs = []
     540    best_error = []
     541    max_obs = 0
     542    for classifier in classifiers:
     543        query="""
     544        SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error
     545        FROM classifier_results
     546        WHERE trial_type='test'
     547         AND classifier=:classifier
     548         AND (false_positives+false_negatives)/2.0 < 5.0
     549        ORDER BY num_observations,(false_positives+false_negatives)
     550        LIMIT 1
     551        """
     552        cursor.execute(query, {'classifier':classifier})
     553        row = cursor.fetchone()
     554        if row == None:
     555            query="""
     556            SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error
     557            FROM classifier_results
     558            WHERE trial_type='test' and classifier=:classifier
     559            ORDER BY (false_positives+false_negatives),num_observations
     560            LIMIT 1
     561            """
     562            cursor.execute(query, {'classifier':classifier})
     563            row = cursor.fetchone()
     564            if row == None:
     565                sys.stderr.write("WARN: couldn't find test results for classifier '%s'.\n" % classifier)
     566                continue
     567            row = dict(row)
     568
     569            best_error.append(dict(row))
     570        else:
     571            best_obs.append(dict(row))
     572
     573
     574    return best_obs,best_error
Note: See TracChangeset for help on using the changeset viewer.