source: trunk/lib/nanownlib/parallel.py @ 24

Last change on this file since 24 was 16, checked in by tim, 9 years ago

.

File size: 1.5 KB
Line 
1#
2
3import sys
4import threading
5import queue
6
7
8class WorkerThreads(object):
9    workq = None
10    resultq = None
11    target = None
12   
13    def __init__(self, num_workers, target):
14        self.workq = queue.Queue()
15        self.resultq = queue.Queue()
16        self.target = target
17       
18        self.workers = []
19        for i in range(num_workers):
20            t = threading.Thread(target=self._worker)
21            t.daemon = True
22            t.start()
23            self.workers.append(t)
24
25    def _worker(self):
26        while True:
27            item = self.workq.get()
28            if item == None:
29                self.workq.task_done()
30                break
31
32            job_id,args = item
33            try:
34                self.resultq.put((job_id, self.target(*args)))
35            except Exception as e:
36                sys.stderr.write("ERROR: Job '%s' failed with '%s'.  Dropping...\n" %
37                                 (str(job_id),str(e)))
38            self.workq.task_done()
39
40    def addJob(self, job_id, args):
41        self.workq.put((job_id, args))
42           
43    def wait(self):
44        self.workq.join()
45
46    def __del__(self):
47        self.stop()
48   
49    def stop(self):
50        try:
51            while True:
52                self.workq.get(block=False)
53                self.workq.task_done()
54        except queue.Empty as e:
55            pass
56       
57        for i in range(len(self.workers)):
58            self.workq.put(None)
59        for w in self.workers:
60            w.join()
Note: See TracBrowser for help on using the repository browser.