source: trunk/bin/train @ 4

Last change on this file since 4 was 4, checked in by tim, 9 years ago

.

  • Property svn:executable set to *
File size: 14.1 KB
RevLine 
[4]1#!/usr/bin/env python3
2#-*- mode: Python;-*-
3
4import sys
5import os
6import time
7import random
8import statistics
9import functools
10import argparse
11import threading
12import queue
13import pprint
14import json
15
16
17VERSION = "{DEVELOPMENT}"
18if VERSION == "{DEVELOPMENT}":
19    script_dir = '.'
20    try:
21        script_dir = os.path.dirname(os.path.realpath(__file__))
22    except:
23        try:
24            script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
25        except:
26            pass
27    sys.path.append("%s/../lib" % script_dir)
28
29from nanownlib import *
30import nanownlib.storage
31from nanownlib.stats import boxTest,multiBoxTest,subsample,bootstrap,bootstrap2,trimean,midhinge,midhingeTest,samples2Distributions,samples2MeanDiffs
32
33parser = argparse.ArgumentParser(
34    description="")
35#parser.add_argument('-c', dest='cases', type=str, default='{"short":10000,"long":1010000}',
36#                    help='JSON representation of echo timing cases. Default: {"short":10000,"long":1010000}')
37parser.add_argument('session_data', default=None,
38                    help='Database file storing session information')
39options = parser.parse_args()
40
41
42
43class WorkerThreads(object):
44    workq = None
45    resultq = None
46    target = None
47   
48    def __init__(self, num_workers, target):
49        self.workq = queue.Queue()
50        self.resultq = queue.Queue()
51        self.target = target
52       
53        self.workers = []
54        for i in range(num_workers):
55            t = threading.Thread(target=self._worker)
56            t.daemon = True
57            t.start()
58            self.workers.append(t)
59
60    def _worker(self):
61        while True:
62            item = self.workq.get()
63            if item == None:
64                self.workq.task_done()
65                break
66
67            job_id,args = item
68            self.resultq.put((job_id, self.target(*args)))
69            self.workq.task_done()
70
71    def addJob(self, job_id, args):
72        self.workq.put((job_id, args))
73           
74    def wait(self):
75        self.workq.join()
76
77    def stop(self):
78        for i in range(0,len(workers)):
79            self.workq.put(None)
80        for w in self.workers:
81            w.join()
82
83           
84def trainBoxTest(db, test_cases, longest, subsample_size):
85
86    def trainAux(low,high,num_trials):
87        estimator = functools.partial(boxTest, {'low':low, 'high':high})
88        estimates = bootstrap(estimator, db, 'train', test_cases, subsample_size, num_trials)
89        null_estimates = bootstrap(estimator, db, 'train_null', test_cases, subsample_size, num_trials)
90
91        #XXX: need to have a configurable policy on what we're looking for.
92        #     which is longest or which is shortest?
93        bad_estimates = len([e for e in estimates if e != longest])
94        bad_null_estimates = len([e for e in null_estimates if e != None])
95       
96        false_negatives = 100.0*bad_estimates/num_trials
97        false_positives = 100.0*bad_null_estimates/num_trials
98        return false_positives,false_negatives
99
100    start = time.time()
101    wt = WorkerThreads(2, trainAux)
102   
103    width = 2.0
104    performance = []
105    percentiles = list(range(0,50,2))
106    for low in percentiles:
107        wt.addJob(low, (low,low+width,200))
108    wt.wait()
109    while not wt.resultq.empty():
110        job_id,errors = wt.resultq.get()
111        fp,fn = errors
112        performance.append(((fp+fn)/2.0, job_id, fn, fp))
113    performance.sort()
114    pprint.pprint(performance)
115    print(time.time()-start)
116   
117    lows = [p[1] for p in performance[0:5]]
118    widths = [w/10.0 for w in range(0,105,5)]
119    performance = []
120    for width in widths:
121        false_positives = []
122        false_negatives = []
123        for low in lows:
124            wt.addJob(low,(low,low+width,150))
125        wt.wait()
126        while not wt.resultq.empty():
127            job_id,errors = wt.resultq.get()
128            fp,fn = errors
129            false_negatives.append(fn)
130            false_positives.append(fp)
131
132        #print(width, false_negatives)
133        #print(width, false_positives)
134        performance.append(((statistics.mean(false_positives)+statistics.mean(false_negatives))/2.0,
135                            width, statistics.mean(false_negatives), statistics.mean(false_positives)))
136    performance.sort()
137    pprint.pprint(performance)
138    good_width = performance[0][1]
139    print("good_width:",good_width)
140
141
142    lc = {}
143    for low in lows:
144        if low-1 > 0:
145            lc[low-1] = None
146        lc[low] = None
147        lc[low+1] = None
148    lows = lc.keys()
149   
150    performance = []
151    for low in lows:
152        wt.addJob(low, (low,low+good_width,300))
153    wt.wait()
154    while not wt.resultq.empty():
155        job_id,errors = wt.resultq.get()
156        fp,fn = errors
157        performance.append(((fp+fn)/2.0, job_id, fn, fp))
158    performance.sort()
159    pprint.pprint(performance)
160    best_low = performance[0][1]
161    print("best_low:", best_low)
162
163   
164    widths = [good_width-0.4,good_width-0.3,good_width-0.2,good_width-0.1,
165              good_width,good_width+0.1,good_width+0.2,good_width+0.3,good_width+0.4]
166    performance = []
167    for width in widths:
168        wt.addJob(width, (best_low,best_low+width,200))
169    wt.wait()
170    while not wt.resultq.empty():
171        job_id,errors = wt.resultq.get()
172        fp,fn = errors
173        performance.append(((fp+fn)/2.0, job_id, fn, fp))
174    performance.sort()
175    pprint.pprint(performance)
176    best_width=performance[0][1]
177    print("best_width:",best_width)
178    print("final_performance:", performance[0][0])
179
180    return {"low":best_low,"high":best_low+good_width}
181
182
183def trainBoxTest2(db, unusual_case, greater, subsample_size):
184
185    def trainAux(low,high,num_trials):
186        estimator = functools.partial(multiBoxTest, {'low':low, 'high':high}, unusual_case, greater)
187        estimates = bootstrap2(estimator, db, 'train', subsample_size, num_trials)
188        null_estimates = bootstrap2(estimator, db, 'train_null', subsample_size, num_trials)
189
190        bad_estimates = len([e for e in estimates if e != 1])
191        bad_null_estimates = len([e for e in null_estimates if e != 0])
192       
193        false_negatives = 100.0*bad_estimates/num_trials
194        false_positives = 100.0*bad_null_estimates/num_trials
195        return false_positives,false_negatives
196
197    start = time.time()
198    wt = WorkerThreads(2, trainAux)
199   
200    num_trials = 200
201    width = 2.0
202    performance = []
203    percentiles = list(range(0,50,2))
204    for low in percentiles:
205        wt.addJob(low, (low,low+width,num_trials))
206    wt.wait()
207    while not wt.resultq.empty():
208        job_id,errors = wt.resultq.get()
209        fp,fn = errors
210        performance.append(((fp+fn)/2.0, job_id, fn, fp))
211    performance.sort()
212    pprint.pprint(performance)
213    print(time.time()-start)
214   
215    num_trials = 150
216    lows = [p[1] for p in performance[0:5]]
217    widths = [w/10.0 for w in range(0,105,5)]
218    performance = []
219    for width in widths:
220        false_positives = []
221        false_negatives = []
222        for low in lows:
223            wt.addJob(low,(low,low+width,num_trials))
224        wt.wait()
225        while not wt.resultq.empty():
226            job_id,errors = wt.resultq.get()
227            fp,fn = errors
228            false_negatives.append(fn)
229            false_positives.append(fp)
230
231        #print(width, false_negatives)
232        #print(width, false_positives)
233        performance.append(((statistics.mean(false_positives)+statistics.mean(false_negatives))/2.0,
234                            width, statistics.mean(false_negatives), statistics.mean(false_positives)))
235    performance.sort()
236    pprint.pprint(performance)
237    good_width = performance[0][1]
238    print("good_width:",good_width)
239
240
241    lc = {}
242    for low in lows:
243        if low-1 >= 0:
244            lc[low-1] = None
245        lc[low] = None
246        lc[low+1] = None
247    lows = lc.keys()
248    print("candidate lows:")
249    pprint.pprint(lows)
250   
251    num_trials = 300
252    performance = []
253    for low in lows:
254        wt.addJob(low, (low,low+good_width,num_trials))
255    wt.wait()
256    while not wt.resultq.empty():
257        job_id,errors = wt.resultq.get()
258        fp,fn = errors
259        performance.append(((fp+fn)/2.0, job_id, fn, fp))
260    performance.sort()
261    pprint.pprint(performance)
262    best_low = performance[0][1]
263    print("best_low:", best_low)
264
265    num_trials = 200
266    widths = [good_width-0.4,good_width-0.3,good_width-0.2,good_width-0.1,
267              good_width,good_width+0.1,good_width+0.2,good_width+0.3,good_width+0.4]
268    performance = []
269    for width in widths:
270        wt.addJob(width, (best_low,best_low+width,num_trials))
271    wt.wait()
272    while not wt.resultq.empty():
273        job_id,errors = wt.resultq.get()
274        fp,fn = errors
275        performance.append(((fp+fn)/2.0, job_id, fn, fp))
276    performance.sort()
277    pprint.pprint(performance)
278    best_width=performance[0][1]
279    print("best_width:",best_width)
280    print("final_performance:", performance[0][0])
281   
282    params = json.dumps({"low":best_low,"high":best_low+good_width})
283    return {'algorithm':"boxtest",
284            'params':params,
285            'sample_size':subsample_size,
286            'num_trials':num_trials,
287            'trial_type':"train",
288            'false_positives':performance[0][3],
289            'false_negatives':performance[0][2]}
290
291
292def trainMidhinge(db, unusual_case, greater, subsample_size):
293
294    def trainAux(distance, threshold, num_trials):
295        estimator = functools.partial(midhingeTest, {'distance':distance,'threshold':threshold}, unusual_case, greater)
296        estimates = bootstrap2(estimator, db, 'train', subsample_size, num_trials)
297        null_estimates = bootstrap2(estimator, db, 'train_null', subsample_size, num_trials)
298
299        bad_estimates = len([e for e in estimates if e != 1])
300        bad_null_estimates = len([e for e in null_estimates if e != 0])
301       
302        false_negatives = 100.0*bad_estimates/num_trials
303        false_positives = 100.0*bad_null_estimates/num_trials
304        return false_positives,false_negatives
305
306    #determine expected delta based on differences
307    start = time.time()
308    mean_diffs = list(samples2MeanDiffs(subsample(db, 'train'), 'packet_rtt', unusual_case))
309    threshold = trimean(mean_diffs)/2.0
310    print("initial threshold:", threshold)
311    print("median threshold:", statistics.median(mean_diffs)/2.0)
312    print("midhinge threshold:", midhinge(mean_diffs)/2.0)
313    print("trimean threshold:", trimean(mean_diffs)/2.0)
314   
315    mean_diffs = list(samples2MeanDiffs(subsample(db, 'train_null'), 'packet_rtt', unusual_case))
316    print(len(mean_diffs))
317    print("null mean:", statistics.mean(mean_diffs))
318    print("null median:", statistics.median(mean_diffs))
319    print("null midhinge:", midhinge(mean_diffs))
320    print("null trimean:", trimean(mean_diffs))
321    print(time.time()-start)
322
323   
324    start = time.time()
325    wt = WorkerThreads(1, trainAux)
326   
327    num_trials = 200
328    performance = []
329    #for distance in range(1,46,4):
330    for distance in range(25,46,4):
331        wt.addJob(distance, (distance,threshold,num_trials))
332    wt.wait()
333    while not wt.resultq.empty():
334        job_id,errors = wt.resultq.get()
335        fp,fn = errors
336        performance.append(((fp+fn)/2.0, job_id, fn, fp))
337    performance.sort()
338    pprint.pprint(performance)
339    print(time.time()-start)
340    good_distance = performance[0][1]
341    print("good_distance:",good_distance)
342
343   
344    num_trials = 200
345    start = time.time()
346    performance = []
347    for t in range(80,125,5):
348        wt.addJob(threshold*(t/100.0), (good_distance,threshold*(t/100.0),num_trials))
349    wt.wait()
350    while not wt.resultq.empty():
351        job_id,errors = wt.resultq.get()
352        fp,fn = errors
353        performance.append(((fp+fn)/2.0, job_id, fn, fp))
354    performance.sort()
355    pprint.pprint(performance)
356    print(time.time()-start)
357    good_threshold = performance[0][1]
358    print("good_threshold:", good_threshold)
359
360   
361    num_trials = 200
362    start = time.time()
363    performance = []
364    for d in range(-4,5):
365        wt.addJob(good_distance+d, (good_distance+d,good_threshold,num_trials))
366    wt.wait()
367    while not wt.resultq.empty():
368        job_id,errors = wt.resultq.get()
369        fp,fn = errors
370        performance.append(((fp+fn)/2.0, job_id, fn, fp))
371    performance.sort()
372    pprint.pprint(performance)
373    print(time.time()-start)
374    best_distance = performance[0][1]
375    print("best_distance:",best_distance)
376
377    num_trials = 200
378    start = time.time()
379    performance = []
380    for t in range(95,106):
381        wt.addJob(good_threshold*(t/100.0), (best_distance,good_threshold*(t/100.0),num_trials))
382    wt.wait()
383    while not wt.resultq.empty():
384        job_id,errors = wt.resultq.get()
385        fp,fn = errors
386        performance.append(((fp+fn)/2.0, job_id, fn, fp))
387    performance.sort()
388    pprint.pprint(performance)
389    print(time.time()-start)
390    best_threshold = performance[0][1]
391    print("best_threshold:", best_threshold)
392
393    params = json.dumps({'distance':best_distance,'threshold':best_threshold})
394    return {'algorithm':"midhinge",
395            'params':params,
396            'sample_size':subsample_size,
397            'num_trials':num_trials,
398            'trial_type':"train",
399            'false_positives':performance[0][3],
400            'false_negatives':performance[0][2]}
401
402
403#classifiers = {'boxtest':{'train':trainBoxTest2, 'test':multiBoxTest},
404#               'midhinge':{'train':trainMidhinge, 'test':midhinge}}
405
406
407db = nanownlib.storage.db(options.session_data)
408#cursor = db.cursor()
409#cursor.execute("SELECT min(sample) min, max(sample) max FROM probes")
410#train_start,test_end = cursor.fetchone()
411#train_end = int(test_end-train_start)
412#test_start = train_end+1
413#subsample_size = min(10000,(train_end-train_start+1)/4)
414
415start = time.time()
416unusual_case,unusual_diff = findUnusualTestCase(db)
417greater = (unusual_diff > 0)
418print("unusual_case:", unusual_case)
419print("unusual_diff:", unusual_diff)
420end = time.time()
421print(":", end-start)
422
423start = time.time()
424results = trainMidhinge(db, unusual_case, greater, 6000)
425db.addClassifierResults(results)
426print("midhinge result:", results)
427end = time.time()
428print(":", end-start)
429
430start = time.time()
431results = trainBoxTest2(db, unusual_case, greater, 6000)
432db.addClassifierResults(results)
433print("multi box test result:", results)
434end = time.time()
435print(":", end-start)
436
437#start = time.time()
438#print("box test params:", trainBoxTest(db, test_cases, 'long', 100))
439#end = time.time()
440#print(":", end-start)
441
442
Note: See TracBrowser for help on using the repository browser.