- Timestamp:
- 08/18/15 22:09:24 (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/bin/sampler
r9 r20 30 30 31 31 from nanownlib import * 32 from nanownlib.platform import * 33 from nanownlib.tcpts import * 32 34 import nanownlib.storage 33 35 … … 36 38 parser.add_argument('-c', dest='cases', type=str, default='{"short":10000,"long":13000}', 37 39 help='JSON representation of echo timing cases.') 38 parser.add_argument('--no-tcpts', action='store_true', help='Dis bale TCP timestamp profiling')40 parser.add_argument('--no-tcpts', action='store_true', help='Disable TCP timestamp profiling') 39 41 parser.add_argument('--no-control', action='store_true', help='Do not collect separate control data. Instead, synthesize it from test and train data.') 40 42 parser.add_argument('session_name', default=None, … … 49 51 50 52 51 session_name = options.session_name 52 num_samples=options.sample_count 53 hostname = options.host 54 port = options.port 55 protocol = 'http' 53 num_samples = options.sample_count 54 hostname = options.host 55 port = options.port 56 protocol = 'http' 56 57 57 58 cases = json.loads(options.cases) 59 db_file = "%s.db" % options.session_name 60 db = nanownlib.storage.db(db_file) 61 58 62 59 63 def extractReportedRuntime(headers, body): … … 64 68 def sendRequest(data=None): 65 69 method = 'GET' 66 path = '/ data/clamav-audit-results.txt?t=' + data70 path = '/?t=' + data 67 71 url = "%s://%s:%d%s" % (protocol,hostname,port,path) 68 72 headers = {} … … 74 78 try: 75 79 session = requests.Session() 76 response = session.send(req )77 #print( repr(response.raw._original_response.local_address))80 response = session.send(req, allow_redirects=False) 81 #print("sendRequest:", repr(response.raw._original_response.local_address)) 78 82 reported = extractReportedRuntime(response.headers, response.text) 79 83 retry = False … … 82 86 time.sleep(1.0) 83 87 sys.stderr.write("ERROR: retrying...\n") 84 88 89 85 90 return {'userspace_rtt':response.elapsed.microseconds*1000, 86 91 'reported':reported, … … 97 102 98 103 setCPUAffinity() 99 setTCPTimestamps() 100 host_ip = socket.gethostbyname(hostname) #XXX: what about multiple A records? 104 time.sleep(0.25) # Give our process a chance to migrate to a different CPU if necessary 105 setPowersave(False) # XXX: test this to see if it helps 106 setLowLatency(True) # XXX: test this to see if it helps 107 tcpts_previous = setTCPTimestamps(True) 108 109 110 #XXX: what about multiple A records? 111 # perform this during a script generation step, measuring lowest latency server 112 # and hard-coding the IP and host name separately. However, including all 113 # server IPs in comments 114 host_ip = socket.gethostbyname(hostname) 115 101 116 102 117 meta = {'tcpts_mean':None,'tcpts_stddev':None,'tcpts_slopes':None} … … 125 140 % (meta['tcpts_mean'], meta['tcpts_stddev'], 100*meta['tcpts_stddev']/meta['tcpts_mean'])) 126 141 127 128 sniffer_fp = tempfile.NamedTemporaryFile('w+t')129 db_file = "%s.db" % session_name130 131 sniffer = startSniffer(host_ip, port, sniffer_fp.name)132 db = nanownlib.storage.db(db_file)133 142 db.addMeta(meta) 134 time.sleep(0.5) # ensure sniffer is fully ready and our process is migrated 135 136 if options.no_control: 137 num_control = 0 138 else: 139 num_control = int(num_samples*2/5) 140 141 num_train = int((num_samples-num_control)/3) 142 num_test = num_samples-num_train-num_control 143 144 sample_types = [('train',num_train), 145 ('train_null',num_control), 146 ('test',num_test)] 147 148 sid = 0 149 report_interval = 20 150 start = time.time() 151 next_report = start+report_interval 152 for st,count in sample_types: 153 if sniffer.poll() != None: 154 sys.stderr.write('ERROR: Sniffer quit, sender exiting...\n') 155 break 143 144 145 def findNextSampleID(db): 146 cursor = db.conn.cursor() 147 cursor.execute("SELECT max(sample) FROM probes") 148 row = cursor.fetchone() 149 if row != None and row[0] != None: 150 return row[0]+1 151 152 return 0 153 154 155 def collectSamples(db, sample_type, count, sniffer): 156 sniffer.start() 157 158 if not sniffer.is_running(): 159 sys.stderr.write('ERROR: Sniffer did not start...\n') 160 return 156 161 162 sid = findNextSampleID(db) 157 163 for k in range(0,count): 158 164 sample_order = list(cases.items()) 159 165 random.shuffle(sample_order) 160 if s t.endswith('null'):166 if sample_type.endswith('null'): 161 167 for i in range(1,len(sample_order)): 162 168 sample_order[i] = (sample_order[i][0],sample_order[0][1]) 163 169 random.shuffle(sample_order) 164 #print('after', sample_order)165 170 166 171 results = [] … … 168 173 for i in range(len(sample_order)): 169 174 results.append(fetch({'sample':sid, 'test_case':sample_order[i][0], 170 'type':s t, 'tc_order':i, 'time_of_day':now},175 'type':sample_type, 'tc_order':i, 'time_of_day':now}, 171 176 sample_order[i][1])) 172 177 173 #print(results)178 print(results) 174 179 db.addProbes(results) 175 180 db.conn.commit() 176 181 sid += 1 177 182 178 if (time.time() > next_report): 179 #s = time.time() 180 reportProgress(db, sample_types, start) 181 #print("reportProgress time:", time.time()-s) 182 next_report += report_interval 183 184 print("probes complete in %f" % (time.time()-start)) 185 time.sleep(2.0) # Give sniffer a chance to collect remaining packets 186 stopSniffer(sniffer) 187 188 start = time.time() 189 associatePackets(sniffer_fp, db) 190 sniffer_fp.close() 191 end = time.time() 192 print("associate time:", end-start) 193 183 time.sleep(2.0) # Give sniffer a chance to collect remaining packets 184 sniffer.stop() 185 #print(sniffer.openPacketLog().read()) 186 start = time.time() 187 associatePackets(sniffer.openPacketLog(), db) 188 end = time.time() 189 print("associate time:", end-start) 190 191 192 193 if options.no_control: 194 num_control = 0 195 else: 196 num_control = int(num_samples*2/5) 197 198 num_train = int((num_samples-num_control)/3) 199 num_test = num_samples-num_train-num_control 200 201 sample_types = [('train',num_train), 202 ('train_null',num_control), 203 ('test',num_test)] 204 205 sniffer = snifferProcess(host_ip, port) 206 for st,count in sample_types: 207 collectSamples(db, st,count,sniffer) 208 209 210 #start = time.time() 211 #report_interval = 20 212 #next_report = start+report_interval 213 # if (time.time() > next_report): 214 # reportProgress(db, sample_types, start) 215 # next_report += report_interval 216 217 194 218 if options.no_control: 195 219 print("TODO: implement control synthesizing!") … … 199 223 end = time.time() 200 224 print("analyzed %d probes' packets in: %f" % (num_probes, end-start)) 225 226 227 setPowersave(True) # XXX: test this to see if it actually helps 228 setLowLatency(False) # XXX: test this to see if it actually helps 229 setTCPTimestamps(tcpts_previous)
Note: See TracChangeset
for help on using the changeset viewer.