]> code.ossystems Code Review - openembedded-core.git/commitdiff
Implement parallel parsing support
authorChris Larson <chris_larson@mentor.com>
Fri, 19 Nov 2010 03:21:54 +0000 (20:21 -0700)
committerRichard Purdie <rpurdie@linux.intel.com>
Tue, 4 Jan 2011 14:46:42 +0000 (14:46 +0000)
This utilizes python's multiprocessing module.  The default number of threads
to be used is the same as the number of available processor cores, however,
you can manually set this with the BB_NUMBER_PARSE_THREADS variable.

(Bitbake rev: c7b3ec819549e51e438d293969e205883fee725f)

Signed-off-by: Chris Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
bitbake/lib/bb/cache.py
bitbake/lib/bb/cooker.py
bitbake/lib/bb/pysh/pyshyacc.py

index 23845bc07b82508bdf622b842a8bff93f9dcb69d..93dccf21f162845e879b2c726c7a5f8797a3de09 100644 (file)
@@ -232,48 +232,57 @@ class Cache(object):
         bb_data = cls.load_bbfile(fn, appends, cfgData)
         return bb_data[virtual]
 
-    def loadData(self, fn, appends, cfgData, cacheData):
-        """
-        Load a subset of data for fn.
-        If the cached data is valid we do nothing,
-        To do this, we need to parse the file and set the system
-        to record the variables accessed.
-        Return the cache status and whether the file was skipped when parsed
-        """
-        skipped, virtuals = 0, 0
+    @classmethod
+    def parse(cls, filename, appends, configdata):
+        """Parse the specified filename, returning the recipe information"""
+        infos = []
+        datastores = cls.load_bbfile(filename, appends, configdata)
+        depends = set()
+        for variant, data in sorted(datastores.iteritems(),
+                                    key=lambda i: i[0],
+                                    reverse=True):
+            virtualfn = cls.realfn2virtual(filename, variant)
+            depends |= (data.getVar("__depends", False) or set())
+            if depends and not variant:
+                data.setVar("__depends", depends)
+            info = RecipeInfo.from_metadata(filename, data)
+            infos.append((virtualfn, info))
+        return infos
+
+    def load(self, filename, appends, configdata):
+        """Obtain the recipe information for the specified filename,
+        using cached values if available, otherwise parsing.
+
+        Note that if it does parse to obtain the info, it will not
+        automatically add the information to the cache or to your
+        CacheData.  Use the add or add_info method to do so after
+        running this, or use loadData instead."""
+        cached = self.cacheValid(filename)
+        if cached:
+            infos = []
+            info = self.depends_cache[filename]
+            for variant in info.variants:
+                virtualfn = self.realfn2virtual(filename, variant)
+                infos.append((virtualfn, self.depends_cache[virtualfn]))
+        else:
+            logger.debug(1, "Parsing %s", filename)
+            return self.parse(filename, appends, configdata)
 
-        if fn not in self.checked:
-            self.cacheValidUpdate(fn)
+        return cached, infos
 
-        cached = self.cacheValid(fn)
-        if not cached:
-            logger.debug(1, "Parsing %s", fn)
-            datastores = self.load_bbfile(fn, appends, cfgData)
-            depends = set()
-            for variant, data in sorted(datastores.iteritems(),
-                                        key=lambda i: i[0],
-                                        reverse=True):
-                virtualfn = self.realfn2virtual(fn, variant)
-                depends |= (data.getVar("__depends", False) or set())
-                if depends and not variant:
-                    data.setVar("__depends", depends)
-                info = RecipeInfo.from_metadata(fn, data)
-                if not info.nocache:
-                    # The recipe was parsed, and is not marked as being
-                    # uncacheable, so we need to ensure that we write out the
-                    # new cache data.
-                    self.cacheclean = False
-                self.depends_cache[virtualfn] = info
+    def loadData(self, fn, appends, cfgData, cacheData):
+        """Load the recipe info for the specified filename,
+        parsing and adding to the cache if necessary, and adding
+        the recipe information to the supplied CacheData instance."""
+        skipped, virtuals = 0, 0
 
-        info = self.depends_cache[fn]
-        for variant in info.variants:
-            virtualfn = self.realfn2virtual(fn, variant)
-            vinfo = self.depends_cache[virtualfn]
-            if vinfo.skipped:
+        cached, infos = self.load(fn, appends, cfgData)
+        for virtualfn, info in infos:
+            if info.skipped:
                 logger.debug(1, "Skipping %s", virtualfn)
                 skipped += 1
             else:
-                cacheData.add_from_recipeinfo(virtualfn, vinfo)
+                self.add_info(virtualfn, info, cacheData, not cached)
                 virtuals += 1
 
         return cached, skipped, virtuals
@@ -283,6 +292,9 @@ class Cache(object):
         Is the cache valid for fn?
         Fast version, no timestamps checked.
         """
+        if fn not in self.checked:
+            self.cacheValidUpdate(fn)
+
         # Is cache enabled?
         if not self.has_cache:
             return False
@@ -412,14 +424,22 @@ class Cache(object):
     def mtime(cachefile):
         return bb.parse.cached_mtime_noerror(cachefile)
 
-    def add(self, file_name, data, cacheData):
+    def add_info(self, filename, info, cacheData, parsed=None):
+        self.depends_cache[filename] = info
+        cacheData.add_from_recipeinfo(filename, info)
+        if parsed and not info.nocache:
+            # The recipe was parsed, and is not marked as being
+            # uncacheable, so we need to ensure that we write out the
+            # new cache data.
+            self.cacheclean = False
+
+    def add(self, file_name, data, cacheData, parsed=None):
         """
         Save data we need into the cache
         """
         realfn = self.virtualfn2realfn(file_name)[0]
         info = RecipeInfo.from_metadata(realfn, data)
-        self.depends_cache[file_name] = info
-        cacheData.add_from_recipeinfo(file_name, info)
+        self.add_info(file_name, info, cacheData, parsed)
 
     @staticmethod
     def load_bbfile(bbfile, appends, config):
index 6194919e4cf5cddf3f851ab96b7f9af59667efd9..0143c149b8f1e6e9dfc63916371891cc842a1749 100644 (file)
@@ -25,6 +25,8 @@ from __future__ import print_function
 import sys, os, glob, os.path, re, time
 import logging
 import sre_constants
+import multiprocessing
+import signal
 from cStringIO import StringIO
 from contextlib import closing
 import bb
@@ -976,7 +978,7 @@ class CookerExit(bb.event.Event):
     def __init__(self):
         bb.event.Event.__init__(self)
 
-class CookerParser:
+class CookerParser(object):
     def __init__(self, cooker, filelist, masked):
         # Internal data
         self.filelist = filelist
@@ -987,49 +989,106 @@ class CookerParser:
         self.cached = 0
         self.error = 0
         self.masked = masked
-        self.total = len(filelist)
 
         self.skipped = 0
         self.virtuals = 0
+        self.total = len(filelist)
 
-        # Pointer to the next file to parse
-        self.pointer = 0
-
-    def parse_next(self):
-        cooker = self.cooker
-        if self.pointer < len(self.filelist):
-            f = self.filelist[self.pointer]
-
-            try:
-                fromCache, skipped, virtuals = cooker.bb_cache.loadData(f, cooker.get_file_appends(f), cooker.configuration.data, cooker.status)
-                if fromCache:
-                    self.cached += 1
-                else:
-                    self.parsed += 1
-
-                self.skipped += skipped
-                self.virtuals += virtuals
+        # current to the next file to parse
+        self.current = 0
+        self.result_queue = None
+        self.fromcache = None
 
-            except KeyboardInterrupt:
-                cooker.bb_cache.remove(f)
-                cooker.bb_cache.sync()
-                raise
-            except Exception as e:
-                self.error += 1
-                cooker.bb_cache.remove(f)
-                parselog.exception("Unable to open %s", f)
-            except:
-                cooker.bb_cache.remove(f)
-                raise
-            finally:
-                bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total), cooker.configuration.event_data)
+        self.launch_processes()
 
-            self.pointer += 1
+    def launch_processes(self):
+        self.task_queue = multiprocessing.Queue()
+        self.result_queue = multiprocessing.Queue()
+        self.fromcache = []
+        cfgdata = self.cooker.configuration.data
+        for filename in self.filelist:
+            appends = self.cooker.get_file_appends(filename)
+            if not self.cooker.bb_cache.cacheValid(filename):
+                self.task_queue.put((filename, appends))
+            else:
+                self.fromcache.append((filename, appends))
+
+        def worker(input, output, cfgdata):
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
+            for filename, appends in iter(input.get, 'STOP'):
+                infos = bb.cache.Cache.parse(filename, appends, cfgdata)
+                output.put(infos)
+
+        self.processes = []
+        num_processes = int(cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or
+                            multiprocessing.cpu_count())
+        for i in xrange(num_processes):
+            process = multiprocessing.Process(target=worker,
+                                              args=(self.task_queue,
+                                                    self.result_queue,
+                                                    cfgdata))
+            process.start()
+            self.processes.append(process)
+
+    def shutdown(self, clean=True):
+        self.result_queue.close()
+        for process in self.processes:
+            if clean:
+                self.task_queue.put('STOP')
+            else:
+                process.terminate()
+        self.task_queue.close()
+        for process in self.processes:
+            process.join()
+        self.cooker.bb_cache.sync()
+        bb.codeparser.parser_cache_save(self.cooker.configuration.data)
+        if self.error > 0:
+            raise ParsingErrorsFound()
+
+    def progress(self):
+        bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed,
+                                                self.skipped, self.masked,
+                                                self.virtuals, self.error,
+                                                self.total),
+                        self.cooker.configuration.event_data)
 
-        if self.pointer >= self.total:
-            cooker.bb_cache.sync()
-            bb.codeparser.parser_cache_save(cooker.configuration.data)
-            if self.error > 0:
-                raise ParsingErrorsFound
+    def parse_next(self):
+        cooker = self.cooker
+        if self.current >= self.total:
+            self.shutdown()
             return False
+
+        try:
+            if self.result_queue.empty() and self.fromcache:
+                filename, appends = self.fromcache.pop()
+                _, infos = cooker.bb_cache.load(filename, appends,
+                                                self.cooker.configuration.data)
+                parsed = False
+            else:
+                infos = self.result_queue.get()
+                parsed = True
+        except KeyboardInterrupt:
+            self.shutdown(clean=False)
+            raise
+        except Exception as e:
+            self.error += 1
+            parselog.critical(str(e))
+        else:
+            if parsed:
+                self.parsed += 1
+            else:
+                self.cached += 1
+            self.virtuals += len(infos)
+
+            for virtualfn, info in infos:
+                cooker.bb_cache.add_info(virtualfn, info, cooker.status,
+                                         parsed=parsed)
+                if info.skipped:
+                    self.skipped += 1
+        finally:
+            self.progress()
+
+        self.current += 1
         return True
+
index 8bb992732112adb23dbe75c5737c57bb04bb6a7d..3d6f54a58cb184a181139354203f790a273cefe8 100644 (file)
@@ -648,6 +648,7 @@ def p_error(p):
 try:
     import pyshtables
 except ImportError:
+    import os
     outputdir = os.path.dirname(__file__)
     if not os.access(outputdir, os.W_OK):
         outputdir = ''