]> code.ossystems Code Review - openembedded-core.git/commitdiff
oe/utils.py: Fix thread leakage in ThreadPool
authorAníbal Limón <anibal.limon@linux.intel.com>
Tue, 23 Jun 2015 16:49:53 +0000 (11:49 -0500)
committerRichard Purdie <richard.purdie@linuxfoundation.org>
Fri, 26 Jun 2015 08:25:50 +0000 (09:25 +0100)
In order to fix Thread leakage caused by not call join() in Threads,

Pass num_tasks in ThreadPool for add all the tasks into a Queue this
enable catch of Queue.Empty exception and exit the threads.

classes/sstate.bbclass: Change checkstatus function to match new
ThreadPool operation.

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
meta/classes/sstate.bbclass
meta/lib/oe/utils.py

index 1e5e98a1da0516f76827ef2f56b9cb7e421159be..a80d1ced72b2aa75bc19c23828dffd3ca6a48390 100644 (file)
@@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
             bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist))
             import multiprocessing
             nproc = min(multiprocessing.cpu_count(), len(tasklist))
-            pool = oe.utils.ThreadedPool(nproc)
+            pool = oe.utils.ThreadedPool(nproc, len(tasklist))
             for t in tasklist:
                 pool.add_task(checkstatus, t)
+            pool.start()
             pool.wait_completion()
 
     inheritlist = d.getVar("INHERIT", True)
index 0de880013a6fac446e322ec5fca62d204606c629..f0d3c14137c717958e8d8aaefd447e2c32d96e65 100644 (file)
@@ -222,11 +222,16 @@ class ThreadedWorker(Thread):
         Thread.__init__(self)
         self.tasks = tasks
         self.daemon = True
-        self.start()
 
     def run(self):
+        from Queue import Empty
+
         while True:
-            func, args, kargs = self.tasks.get()
+            try:
+                func, args, kargs = self.tasks.get(block=False)
+            except Empty:
+                break
+
             try:
                 func(*args, **kargs)
             except Exception, e:
@@ -236,9 +241,17 @@ class ThreadedWorker(Thread):
 
 class ThreadedPool:
     """Pool of threads consuming tasks from a queue"""
-    def __init__(self, num_threads):
-        self.tasks = Queue(num_threads)
-        for _ in range(num_threads): ThreadedWorker(self.tasks)
+    def __init__(self, num_workers, num_tasks):
+        self.tasks = Queue(num_tasks)
+        self.workers = []
+
+        for _ in range(num_workers):
+            worker = ThreadedWorker(self.tasks)
+            self.workers.append(worker)
+
+    def start(self):
+        for worker in self.workers:
+            worker.start()
 
     def add_task(self, func, *args, **kargs):
         """Add a task to the queue"""
@@ -247,4 +260,5 @@ class ThreadedPool:
     def wait_completion(self):
         """Wait for completion of all the tasks in the queue"""
         self.tasks.join()
-
+        for worker in self.workers:
+            worker.join()