source: trunk/lib/nanownlib/parallel.py @ 9

Last change on this file since 9 was 9, checked in by tim, 9 years ago

.

File size: 1.3 KB
RevLine 
[9]1#
2
3import threading
4import queue
5
6
7class WorkerThreads(object):
8    workq = None
9    resultq = None
10    target = None
11   
12    def __init__(self, num_workers, target):
13        self.workq = queue.Queue()
14        self.resultq = queue.Queue()
15        self.target = target
16       
17        self.workers = []
18        for i in range(num_workers):
19            t = threading.Thread(target=self._worker)
20            t.daemon = True
21            t.start()
22            self.workers.append(t)
23
24    def _worker(self):
25        while True:
26            item = self.workq.get()
27            if item == None:
28                self.workq.task_done()
29                break
30
31            job_id,args = item
32            try:
33                self.resultq.put((job_id, self.target(*args)))
34            except Exception as e:
35                sys.stderr.write("ERROR: Job '%s' failed with '%s'.  Dropping...\n",
36                                 (str(job_id),str(e)))
37            self.workq.task_done()
38
39    def addJob(self, job_id, args):
40        self.workq.put((job_id, args))
41           
42    def wait(self):
43        self.workq.join()
44
45    def __del__(self):
46        self.stop()
47   
48    def stop(self):
49        for i in range(0,len(self.workers)):
50            self.workq.put(None)
51        for w in self.workers:
52            w.join()
Note: See TracBrowser for help on using the repository browser.