]> code.ossystems Code Review - openembedded-core.git/commitdiff
cooker: use a pool, abort on first parse error
authorChris Larson <chris_larson@mentor.com>
Tue, 7 Dec 2010 18:00:22 +0000 (13:00 -0500)
committerRichard Purdie <rpurdie@linux.intel.com>
Tue, 4 Jan 2011 14:46:46 +0000 (14:46 +0000)
(Bitbake rev: 9caf65e79f95fe0045e727391e974c4c1e7411ff)

Signed-off-by: Chris Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
bitbake/lib/bb/cooker.py

index 18a22de71193267ec6c695b70e8bec3030911cd2..bf896e942871340368f9387a3f39a2f64bec9dff 100644 (file)
 
 from __future__ import print_function
 import sys, os, glob, os.path, re, time
+import atexit
+import itertools
 import logging
-import sre_constants
-import threading
 import multiprocessing
 import signal
-import atexit
+import sre_constants
+import threading
 from cStringIO import StringIO
 from contextlib import closing
 import bb
@@ -45,11 +46,6 @@ class MultipleMatches(Exception):
     Exception raised when multiple file matches are found
     """
 
-class ParsingErrorsFound(Exception):
-    """
-    Exception raised when parsing errors are found
-    """
-
 class NothingToBuild(Exception):
     """
     Exception raised when there is nothing to build
@@ -976,6 +972,10 @@ class CookerExit(bb.event.Event):
     def __init__(self):
         bb.event.Event.__init__(self)
 
+def parse_file(task):
+    filename, appends = task
+    return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg)
+
 class CookerParser(object):
     def __init__(self, cooker, filelist, masked):
         self.filelist = filelist
@@ -993,113 +993,89 @@ class CookerParser(object):
         self.total = len(filelist)
 
         self.current = 0
-        self.bb_cache = None
-        self.task_queue = None
-        self.result_queue = None
-        self.fromcache = None
         self.num_processes = int(self.cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or
                                  multiprocessing.cpu_count())
 
-    def launch_processes(self):
-        self.task_queue = multiprocessing.Queue()
-        self.result_queue = multiprocessing.Queue()
+        self.bb_cache = bb.cache.Cache(self.cfgdata)
         self.fromcache = []
+        self.willparse = []
         for filename in self.filelist:
             appends = self.cooker.get_file_appends(filename)
             if not self.bb_cache.cacheValid(filename):
-                self.task_queue.put((filename, appends))
+                self.willparse.append((filename, appends))
             else:
                 self.fromcache.append((filename, appends))
         self.toparse = self.total - len(self.fromcache)
         self.progress_chunk = max(self.toparse / 100, 1)
 
-        def worker(input, output, cfgdata):
+        self.start()
+
+    def start(self):
+        def init(cfg):
             signal.signal(signal.SIGINT, signal.SIG_IGN)
-            for filename, appends in iter(input.get, 'STOP'):
-                try:
-                    infos = bb.cache.Cache.parse(filename, appends, cfgdata)
-                except bb.parse.ParseError as exc:
-                    output.put(exc)
-                else:
-                    output.put(infos)
+            parse_file.cfg = cfg
 
-        self.processes = []
-        for i in xrange(self.num_processes):
-            process = multiprocessing.Process(target=worker,
-                                              args=(self.task_queue,
-                                                    self.result_queue,
-                                                    self.cfgdata))
-            process.start()
-            self.processes.append(process)
+        bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
+
+        self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata])
+        parsed = self.pool.imap(parse_file, self.willparse)
+        self.pool.close()
+
+        self.results = itertools.chain(self.load_cached(), parsed)
 
     def shutdown(self, clean=True):
-        self.result_queue.close()
-        for process in self.processes:
-            if clean:
-                self.task_queue.put('STOP')
-            else:
-                process.terminate()
-        self.task_queue.close()
-        for process in self.processes:
-            process.join()
+        if clean:
+            event = bb.event.ParseCompleted(self.cached, self.parsed,
+                                            self.skipped, self.masked,
+                                            self.virtuals, self.error,
+                                            self.total)
+            bb.event.fire(event, self.cfgdata)
+        else:
+            self.pool.terminate()
+        self.pool.join()
+
         sync = threading.Thread(target=self.bb_cache.sync)
         sync.start()
         atexit.register(lambda: sync.join())
+
         codesync = threading.Thread(target=bb.codeparser.parser_cache_save(self.cooker.configuration.data))
         codesync.start()
         atexit.register(lambda: codesync.join())
-        if self.error > 0:
-            raise ParsingErrorsFound()
+
+    def load_cached(self):
+        for filename, appends in self.fromcache:
+            cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
+            yield not cached, infos
 
     def parse_next(self):
-        if self.current >= self.total:
-            event = bb.event.ParseCompleted(self.cached, self.parsed,
-                                            self.skipped, self.masked,
-                                            self.virtuals, self.error,
-                                            self.total)
-            bb.event.fire(event, self.cfgdata)
+        try:
+            parsed, result = self.results.next()
+        except StopIteration:
             self.shutdown()
             return False
-        elif not self.bb_cache:
-            self.bb_cache = bb.cache.Cache(self.cfgdata)
-            self.launch_processes()
-            bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
-            return True
-
-        try:
-            if self.result_queue.empty() and self.fromcache:
-                filename, appends = self.fromcache.pop()
-                _, result = self.bb_cache.load(filename, appends, self.cfgdata)
-                parsed = False
-                self.cached += 1
-            else:
-                result = self.result_queue.get()
-                if isinstance(result, Exception):
-                    raise result
-
-                parsed = True
-                self.parsed += 1
-                if self.parsed % self.progress_chunk == 0:
-                    bb.event.fire(bb.event.ParseProgress(self.parsed),
-                                  self.cfgdata)
         except KeyboardInterrupt:
             self.shutdown(clean=False)
             raise
-        except Exception as e:
-            self.error += 1
-            parselog.critical(str(e))
-        else:
-            self.virtuals += len(result)
-
-            for virtualfn, info in result:
-                if info.skipped:
-                    self.skipped += 1
-                else:
-                    self.bb_cache.add_info(virtualfn, info, self.cooker.status,
-                                           parsed=parsed)
+        except Exception as exc:
+            self.shutdown(clean=False)
+            sys.exit(1)
 
         self.current += 1
+        self.virtuals += len(result)
+        if parsed:
+            self.parsed += 1
+            if self.parsed % self.progress_chunk == 0:
+                bb.event.fire(bb.event.ParseProgress(self.parsed),
+                              self.cfgdata)
+        else:
+            self.cached += 1
+
+        for virtualfn, info in result:
+            if info.skipped:
+                self.skipped += 1
+            else:
+                self.bb_cache.add_info(virtualfn, info, self.cooker.status,
+                                        parsed=parsed)
         return True
 
     def reparse(self, filename):