Changeset 16 for trunk/lib


Ignore:
Timestamp:
08/01/15 19:01:31 (9 years ago)
Author:
tim
Message:

.

Location:
trunk/lib/nanownlib
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/lib/nanownlib/__init__.py

    r13 r16  
    1818import gzip
    1919import statistics
    20 import numpy
    21 import netifaces
     20try:
     21    import numpy
     22except:
     23    sys.stderr.write('ERROR: Could not import numpy module.  Ensure it is installed.\n')
     24    sys.stderr.write('       Under Debian, the package name is "python3-numpy"\n.')
     25    sys.exit(1)
     26
    2227try:
    2328    import requests
     
    3944
    4045def getIfaceForIP(ip):
     46    try:
     47        import netifaces
     48    except:
     49        sys.stderr.write('ERROR: Could not import netifaces module.  Ensure it is installed.\n')
     50        sys.stderr.write('       Try: pip3 install netifaces\n.')
     51        sys.exit(1)
     52   
    4153    for iface in netifaces.interfaces():
    4254        addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, None)
     
    176188    my_ip = getLocalIP(target_ip, target_port)
    177189    my_iface = getIfaceForIP(my_ip)
    178     return subprocess.Popen(['chrt', '-r', '99', 'nanown-csamp', my_iface, my_ip,
     190    return subprocess.Popen(['chrt', '-r', '99', 'nanown-listen', my_iface, my_ip,
    179191                             target_ip, "%d" % target_port, output_file, '0'])
    180192
     
    303315
    304316
    305 def analyzeProbes(db):
     317def analyzeProbes(db, trim=None, recompute=False):
    306318    db.conn.execute("CREATE INDEX IF NOT EXISTS packets_probe ON packets (probe_id)")
    307319    db.conn.commit()
     
    316328    pcursor.execute("DELETE FROM trim_analysis")
    317329    db.conn.commit()
     330    if recompute:
     331        pcursor.execute("DELETE FROM analysis")
     332        db.conn.commit()
    318333
    319334    def loadPackets(db):
    320335        cursor = db.conn.cursor()
    321         cursor.execute("SELECT * FROM packets ORDER BY probe_id")
     336        #cursor.execute("SELECT * FROM packets ORDER BY probe_id")
     337        cursor.execute("SELECT * FROM packets WHERE probe_id NOT IN (SELECT probe_id FROM analysis) ORDER BY probe_id")
    322338
    323339        probe_id = None
     
    334350        ret_val.append((probe_id,entry))
    335351        return ret_val
    336    
    337     start = time.time()
     352
     353    def processPackets(packet_cache, strim, rtrim):
     354        sent_tally = []
     355        rcvd_tally = []
     356        analyses = []
     357        for probe_id,packets in packet_cache:
     358            try:
     359                analysis,s,r = analyzePackets(packets, timestamp_precision)
     360                analysis['probe_id'] = probe_id
     361                analyses.append(analysis)
     362                sent_tally.append(s)
     363                rcvd_tally.append(r)
     364            except Exception as e:
     365                #traceback.print_exc()
     366                sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
     367        db.addTrimAnalyses(analyses)
     368        db.conn.commit()
     369        return statistics.mode(sent_tally),statistics.mode(rcvd_tally)
     370   
     371    #start = time.time()
    338372    packet_cache = loadPackets(db)
    339     print("packets loaded in: %f" % (time.time()-start))
    340    
    341     count = 0
    342     sent_tally = []
    343     rcvd_tally = []
    344     for probe_id,packets in packet_cache:
    345         try:
    346             analysis,s,r = analyzePackets(packets, timestamp_precision)
    347             analysis['probe_id'] = probe_id
    348             sent_tally.append(s)
    349             rcvd_tally.append(r)
    350             db.addTrimAnalyses([analysis])
    351         except Exception as e:
    352             #traceback.print_exc()
    353             sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
    354        
    355         #print(pid,analysis)
    356         count += 1
     373    #print("packets loaded in: %f" % (time.time()-start))
     374
     375    if trim != None:
     376        best_strim,best_rtrim = trim
     377        processPackets(packet_cache, best_strim, best_rtrim)
     378    else:
     379        num_sent,num_rcvd = processPackets(packet_cache, 0, 0)
     380        print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd))
     381   
     382        for strim in range(0,num_sent):
     383            for rtrim in range(0,num_rcvd):
     384                #print(strim,rtrim)
     385                if strim == 0 and rtrim == 0:
     386                    continue # no point in doing 0,0 again
     387                processPackets(packet_cache, strim, rtrim)
     388
     389   
     390        unusual_case,delta = findUnusualTestCase(db, (0,0))
     391        evaluations = {}
     392        for strim in range(0,num_sent):
     393            for rtrim in range(0,num_rcvd):
     394                evaluations[(strim,rtrim)] = evaluateTrim(db, unusual_case, strim, rtrim)
     395
     396        import pprint
     397        pprint.pprint(evaluations)
     398
     399        delta_margin = 0.15
     400        best_strim = 0
     401        best_rtrim = 0
     402        good_delta,good_mad = evaluations[(0,0)]
     403   
     404        for strim in range(1,num_sent):
     405            delta,mad = evaluations[(strim,0)]
     406            if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
     407                best_strim = strim
     408            else:
     409                break
     410
     411        good_delta,good_mad = evaluations[(best_strim,0)]
     412        for rtrim in range(1,num_rcvd):
     413            delta,mad = evaluations[(best_strim,rtrim)]
     414            if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
     415                best_rtrim = rtrim
     416            else:
     417                break
     418
     419        print("selected trim parameters:",(best_strim,best_rtrim))
     420   
     421    pcursor.execute("""INSERT OR IGNORE INTO analysis
     422                         SELECT id,probe_id,suspect,packet_rtt,tsval_rtt
     423                           FROM trim_analysis
     424                           WHERE sent_trimmed=? AND rcvd_trimmed=?""",
     425                    (best_strim,best_rtrim))
    357426    db.conn.commit()
    358     num_sent = statistics.mode(sent_tally)
    359     num_rcvd = statistics.mode(rcvd_tally)
    360     sent_tally = None
    361     rcvd_tally = None
    362     print("num_sent: %d, num_rcvd: %d" % (num_sent,num_rcvd))
    363    
    364     for strim in range(0,num_sent):
    365         for rtrim in range(0,num_rcvd):
    366             #print(strim,rtrim)
    367             if strim == 0 and rtrim == 0:
    368                 continue # no point in doing 0,0 again
    369             for probe_id,packets in packet_cache:
    370                 try:
    371                     analysis,s,r = analyzePackets(packets, timestamp_precision, strim, rtrim)
    372                     analysis['probe_id'] = probe_id
    373                 except Exception as e:
    374                     #traceback.print_exc()
    375                     sys.stderr.write("WARN: couldn't find enough packets for probe_id=%s\n" % probe_id)
    376                    
    377                 db.addTrimAnalyses([analysis])
    378     db.conn.commit()
    379 
    380     # Populate analysis table so findUnusualTestCase can give us a starting point
    381     pcursor.execute("DELETE FROM analysis")
    382     db.conn.commit()
    383     pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=0 AND rcvd_trimmed=0")
    384    
    385     unusual_case,delta = findUnusualTestCase(db)
    386     evaluations = {}
    387     for strim in range(0,num_sent):
    388         for rtrim in range(0,num_rcvd):
    389             evaluations[(strim,rtrim)] = evaluateTrim(db, unusual_case, strim, rtrim)
    390 
    391     import pprint
    392     pprint.pprint(evaluations)
    393 
    394     delta_margin = 0.15
    395     best_strim = 0
    396     best_rtrim = 0
    397     good_delta,good_mad = evaluations[(0,0)]
    398    
    399     for strim in range(1,num_sent):
    400         delta,mad = evaluations[(strim,0)]
    401         if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
    402             best_strim = strim
    403         else:
    404             break
    405 
    406     good_delta,good_mad = evaluations[(best_strim,0)]
    407     for rtrim in range(1,num_rcvd):
    408         delta,mad = evaluations[(best_strim,rtrim)]
    409         if delta*good_delta > 0.0 and (abs(good_delta) - abs(delta)) < abs(delta_margin*good_delta) and mad < good_mad:
    410             best_rtrim = rtrim
    411         else:
    412             break
    413 
    414     print("selected trim parameters:",(best_strim,best_rtrim))
    415    
    416     if best_strim != 0 or best_rtrim !=0:
    417         pcursor.execute("DELETE FROM analysis")
    418         db.conn.commit()
    419         pcursor.execute("INSERT INTO analysis SELECT id,probe_id,suspect,packet_rtt,tsval_rtt FROM trim_analysis WHERE sent_trimmed=? AND rcvd_trimmed=?",
    420                         (best_strim,best_rtrim))
    421 
    422     #pcursor.execute("DELETE FROM trim_analysis")
    423     db.conn.commit()
    424    
    425     return count
     427   
     428    return len(packet_cache)
    426429
    427430
     
    442445    ptimes = cursor.fetchone()
    443446    window_size = 100*int((ptimes['end']-ptimes['start'])/ptimes['count'])
    444     print("associate window_size:", window_size)
     447    #print("associate window_size:", window_size)
    445448
    446449    db.addPackets(parseJSONLines(sniffer_fp), window_size)
     
    460463
    461464
    462 def findUnusualTestCase(db):
     465def findUnusualTestCase(db, trim=None):
    463466    test_cases = enumStoredTestCases(db)
    464 
     467    if trim != None:
     468        params = {'strim':trim[0], 'rtrim':trim[1]}
     469        qsuffix = " AND sent_trimmed=:strim AND rcvd_trimmed=:rtrim"
     470        table = "trim_analysis"
     471    else:
     472        params = {}
     473        qsuffix = ""
     474        table = "analysis"
     475   
    465476    cursor = db.conn.cursor()
    466     cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test')")
     477    cursor.execute("SELECT packet_rtt FROM probes,"+table+" a WHERE probes.id=a.probe_id AND probes.type in ('train','test')"+qsuffix, params)
    467478    global_tm = quadsummary([row['packet_rtt'] for row in cursor])
    468479
    469480    tm_abs = []
    470481    tm_map = {}
     482
    471483    # XXX: if more speed needed, percentile extension to sqlite might be handy...
    472484    for tc in test_cases:
    473         cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case=?", (tc,))
     485        params['test_case']=tc
     486        query = """SELECT packet_rtt FROM probes,"""+table+""" a
     487                   WHERE probes.id=a.probe_id AND probes.type in ('train','test')
     488                   AND probes.test_case=:test_case""" + qsuffix
     489        cursor.execute(query, params)
    474490        tm_map[tc] = quadsummary([row['packet_rtt'] for row in cursor])
    475491        tm_abs.append((abs(tm_map[tc]-global_tm), tc))
    476492
    477493    magnitude,tc = max(tm_abs)
    478     cursor.execute("SELECT packet_rtt FROM probes,analysis WHERE probes.id=analysis.probe_id AND probes.type in ('train','test') AND probes.test_case<>?", (tc,))
     494    params['test_case']=tc
     495    query = """SELECT packet_rtt FROM probes,"""+table+""" a
     496               WHERE probes.id=a.probe_id AND probes.type in ('train','test')
     497               AND probes.test_case<>:test_case""" + qsuffix
     498    cursor.execute(query,params)
    479499    remaining_tm = quadsummary([row['packet_rtt'] for row in cursor])
    480500
    481     ret_val = (tc, tm_map[tc]-remaining_tm)
    482     print("unusual_case: %s, delta: %f" % ret_val)
    483     return ret_val
     501    delta = tm_map[tc]-remaining_tm
     502    # Hack to make the chosen unusual_case more intuitive to the user
     503    if len(test_cases) == 2 and delta < 0.0:
     504        tc = [t for t in test_cases if t != tc][0]
     505        delta = abs(delta)
     506
     507    return tc,delta
    484508
    485509
     
    492516        cursor.execute("SELECT count(id) c FROM (SELECT id FROM probes WHERE type=? AND time_of_day>? GROUP BY sample)", (st[0],int(start_time*1000000000)))
    493517        count = cursor.fetchone()[0]
    494         output += " | %s remaining: %d" % (st[0], st[1]-count)
     518        output += " | %s remaining: %6d" % (st[0], st[1]-count)
    495519        total_completed += count
    496520        total_requested += st[1]
    497521
    498522    rate = total_completed / (time.time() - start_time)
    499     total_time = total_requested / rate       
     523    total_time = total_requested / rate
    500524    eta = datetime.datetime.fromtimestamp(start_time+total_time)
    501     print("STATUS:",output[3:],"| est. total_time: %s | est. ETA: %s" % (str(datetime.timedelta(seconds=total_time)), str(eta)))
     525    print("STATUS:",output[3:],"| est. total_time: %s | ETA: %s" % (str(datetime.timedelta(seconds=total_time)), eta.strftime("%Y-%m-%d %X")))
     526
     527
     528
     529def evaluateTestResults(db):
     530    cursor = db.conn.cursor()
     531    query = """
     532      SELECT classifier FROM classifier_results GROUP BY classifier ORDER BY classifier;
     533    """
     534    cursor.execute(query)
     535    classifiers = []
     536    for c in cursor:
     537        classifiers.append(c[0])
     538
     539    best_obs = []
     540    best_error = []
     541    max_obs = 0
     542    for classifier in classifiers:
     543        query="""
     544        SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error
     545        FROM classifier_results
     546        WHERE trial_type='test'
     547         AND classifier=:classifier
     548         AND (false_positives+false_negatives)/2.0 < 5.0
     549        ORDER BY num_observations,(false_positives+false_negatives)
     550        LIMIT 1
     551        """
     552        cursor.execute(query, {'classifier':classifier})
     553        row = cursor.fetchone()
     554        if row == None:
     555            query="""
     556            SELECT classifier,params,num_observations,(false_positives+false_negatives)/2 error
     557            FROM classifier_results
     558            WHERE trial_type='test' and classifier=:classifier
     559            ORDER BY (false_positives+false_negatives),num_observations
     560            LIMIT 1
     561            """
     562            cursor.execute(query, {'classifier':classifier})
     563            row = cursor.fetchone()
     564            if row == None:
     565                sys.stderr.write("WARN: couldn't find test results for classifier '%s'.\n" % classifier)
     566                continue
     567            row = dict(row)
     568
     569            best_error.append(dict(row))
     570        else:
     571            best_obs.append(dict(row))
     572
     573
     574    return best_obs,best_error
  • trunk/lib/nanownlib/parallel.py

    r11 r16  
    4848   
    4949    def stop(self):
    50         for i in range(0,len(self.workers)):
     50        try:
     51            while True:
     52                self.workq.get(block=False)
     53                self.workq.task_done()
     54        except queue.Empty as e:
     55            pass
     56       
     57        for i in range(len(self.workers)):
    5158            self.workq.put(None)
    5259        for w in self.workers:
  • trunk/lib/nanownlib/stats.py

    r13 r16  
    77import gzip
    88import random
    9 import scipy
    10 import scipy.stats
    119import numpy
    1210
     
    249247
    250248    mh = f(diffs, params['distance'])
     249    #print("estimate:", mh)
    251250    if greater:
    252251        if mh > params['threshold']:
  • trunk/lib/nanownlib/storage.py

    r11 r16  
    77import threading
    88import sqlite3
    9 
    10 import numpy
     9try:
     10    import numpy
     11except:
     12    sys.stderr.write('ERROR: Could not import numpy module.  Ensure it is installed.\n')
     13    sys.stderr.write('       Under Debian, the package name is "python3-numpy"\n.')
     14    sys.exit(1)
     15
    1116# Don't trust numpy's seeding
    1217numpy.random.seed(random.SystemRandom().randint(0,2**32-1))
     
    3944                                      tcpts_mean REAL,
    4045                                      tcpts_stddev REAL,
    41                                       tcpts_slopes TEXT)
     46                                      tcpts_slopes TEXT,
     47                                      unusual_case TEXT,
     48                                      greater INTEGER)
    4249                """)
    4350
     
    196203    def addPackets(self, pkts, window_size):
    197204        query = ("INSERT INTO packets (id,probe_id,sent,observed,tsval,payload_len,tcpseq,tcpack)"
    198                  " VALUES(randomblob(16),"
     205                 " VALUES(hex(randomblob(16)),"
    199206                 "(SELECT id FROM probes WHERE local_port=:local_port AND :observed>time_of_day"
    200207                 " AND :observed<time_of_day+userspace_rtt+%d"
     
    254261        self.conn.execute(query, params)
    255262        self.conn.commit()
    256        
     263   
     264    def setUnusualCase(self, unusual_case, greater):
     265        query = """SELECT * FROM meta LIMIT 1"""
     266        cursor = self.conn.cursor()
     267        cursor.execute(query)
     268        row = cursor.fetchone()
     269        if row == None:
     270            params = {"id":_newid()}
     271        else:
     272            params = dict(row)
     273
     274        params["unusual_case"]=unusual_case
     275        params["greater"]=greater
     276       
     277        keys = params.keys()
     278        columns = ','.join(keys)
     279        placeholders = ':'+', :'.join(keys)
     280       
     281        query = """INSERT OR REPLACE INTO meta (%s) VALUES (%s)""" % (columns, placeholders)
     282        cursor.execute(query, params)
     283       
     284       
     285    def getUnusualCase(self):
     286        query = """SELECT unusual_case,greater FROM meta LIMIT 1"""
     287        cursor = self.conn.cursor()
     288        cursor.execute(query)
     289        row = cursor.fetchone()
     290        if row == None or row[0] == None or row[1] == None:
     291            return None
     292        else:
     293            return tuple(row)
  • trunk/lib/nanownlib/train.py

    r13 r16  
    4343    num_trials = 200
    4444    lows = [p[1] for p in performance[0:5]]
    45     widths = [w/10.0 for w in range(5,65,5)]
     45    widths = [w/10.0 for w in range(5,155,10)]
    4646    performance = []
    4747    for width in widths:
     
    8585   
    8686    num_trials = 500
    87     widths = [good_width+(x/100.0) for x in range(-70,75,5) if good_width+(x/100.0) > 0.0]
     87    widths = [good_width+(x/100.0) for x in range(-120,125,5) if good_width+(x/100.0) > 0.0]
    8888    performance = []
    8989    for width in widths:
Note: See TracChangeset for help on using the changeset viewer.