]> code.ossystems Code Review - openembedded-core.git/commitdiff
bitbake: Initial scenequeue implementation (needs major fixes)
authorRichard Purdie <rpurdie@linux.intel.com>
Thu, 19 Aug 2010 10:36:29 +0000 (11:36 +0100)
committerRichard Purdie <rpurdie@linux.intel.com>
Thu, 19 Aug 2010 19:06:25 +0000 (20:06 +0100)
bitbake: scenequeue: Skip setscene if the underlying task already ran
bitbake/setscene: Make sure uneeded dependencies are removed recursively

Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
bitbake/lib/bb/runqueue.py

index 488aa04d06b0434a085fd9069fb22711db55d726..9127f248d5830caf2ee82b6e8e8cc6f447973863 100644 (file)
@@ -27,6 +27,7 @@ from bb import msg, data, event
 import signal
 import stat
 import fcntl
+import copy
 
 class RunQueueStats:
     """
@@ -57,12 +58,14 @@ class RunQueueStats:
 # These values indicate the next step due to be run in the
 # runQueue state machine
 runQueuePrepare = 2
-runQueueRunInit = 3
-runQueueRunning = 4
-runQueueFailed = 6
-runQueueCleanUp = 7
-runQueueComplete = 8
-runQueueChildProcess = 9
+runQueueSceneInit = 3
+runQueueSceneRun = 4
+runQueueRunInit = 5
+runQueueRunning = 6
+runQueueFailed = 7
+runQueueCleanUp = 8
+runQueueComplete = 9
+runQueueChildProcess = 10
 
 class RunQueueScheduler(object):
     """
@@ -672,6 +675,16 @@ class RunQueueData:
 
         #self.dump_data(taskData)
 
+        # Interate over the task list looking for tasks with a 'setscene' function
+
+        self.runq_setscene = []
+        for task in range(len(self.runq_fnid)):
+            setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
+            if not setscene:
+                continue
+            bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task]))
+            self.runq_setscene.append(task)
+
     def dump_data(self, taskQueue):
         """
         Dump some debug information on the internal data structures
@@ -802,6 +815,13 @@ class RunQueue:
         return current
 
     def check_stamp_task(self, task, taskname = None):
+        def get_timestamp(f):
+            try:
+                if not os.access(f, os.F_OK):
+                    return None
+                return os.stat(f)[stat.ST_MTIME]
+            except:
+                return None
 
         if self.stamppolicy == "perfile":
             fulldeptree = False
@@ -825,23 +845,24 @@ class RunQueue:
             bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
             return False
 
+        if taskname.endswith("_setscene"):
+            return True
+
         iscurrent = True
-        t1 = os.stat(stampfile)[stat.ST_MTIME]
+        t1 = get_timestamp(stampfile)
         for dep in self.rqdata.runq_depends[task]:
             if iscurrent:
                 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
                 taskname2 = self.rqdata.runq_task[dep]
                 stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2)
+                t2 = get_timestamp(stampfile2)
+                t3 = get_timestamp(stampfile2 + "_setscene")
+                if t3 and t3 > t2:
+                   continue
                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
-                    try:
-                        t2 = os.stat(stampfile2)[stat.ST_MTIME]
-                        if t1 < t2:
-                            bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile, stampfile2))
-                            iscurrent = False
-                    except:
-                        bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2, stampfile))
+                    if not t2 or t1 < t2:
+                        bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s (or does not exist)" % (stampfile, stampfile2))
                         iscurrent = False
-
         return iscurrent
 
     def execute_runqueue(self):
@@ -855,7 +876,13 @@ class RunQueue:
 
         if self.state is runQueuePrepare:
             self.rqdata.prepare()
-            self.state = runQueueRunInit
+            self.state = runQueueSceneInit
+
+        if self.state is runQueueSceneInit:
+            self.rqexe = RunQueueExecuteScenequeue(self)
+
+        if self.state is runQueueSceneRun:
+            self.rqexe.execute()
 
         if self.state is runQueueRunInit:
             bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
@@ -948,14 +975,10 @@ class RunQueueExecute:
         for pipe in self.build_pipes:
             self.build_pipes[pipe].read()
 
-        try:
-            while self.stats.active > 0:
-                bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
-                if self.runqueue_process_waitpid() is None:
-                    return
-        except:
-            self.finish_now()
-            raise
+        if self.stats.active > 0:
+            bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
+            self.runqueue_process_waitpid()
+            return
 
         if len(self.failed_fnids) != 0:
             self.rq.state = runQueueFailed
@@ -1034,6 +1057,23 @@ class RunQueueExecuteTasks(RunQueueExecute):
                 self.runq_buildable.append(1)
             else:
                 self.runq_buildable.append(0)
+            if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
+                self.rq.scenequeue_covered.add(task)
+
+        found = True
+        while found:
+            found = False
+            for task in range(self.stats.total):
+                if task in self.rq.scenequeue_covered:
+                    continue
+                if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
+                    self.rq.scenequeue_covered.add(task)
+                    found = True
+
+        bb.note("Full skip list %s" % self.rq.scenequeue_covered)
+
+        for task in self.rq.scenequeue_covered:
+            self.task_skip(task)
 
         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
 
@@ -1145,6 +1185,204 @@ class RunQueueExecuteTasks(RunQueueExecute):
             self.rq.state = runQueueComplete
             return
 
+class RunQueueExecuteScenequeue(RunQueueExecute):
+    def __init__(self, rq):
+        RunQueueExecute.__init__(self, rq)
+
+        self.scenequeue_covered = set()
+        self.scenequeue_notcovered = set()
+
+        # If we don't have any setscene functions, skip this step
+        if len(self.rqdata.runq_setscene) == 0:
+            rq.scenequeue_covered = set()
+            rq.state = runQueueRunInit
+            return
+
+        self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
+
+        endpoints = {}
+        sq_revdeps = []
+        sq_revdeps_new = []
+        sq_revdeps_squash = []
+
+        # We need to construct a dependency graph for the setscene functions. Intermediate
+        # dependencies between the setscene tasks only complicate the code. This code
+        # therefore aims to collapse the huge runqueue dependency tree into a smaller one
+        # only containing the setscene functions.
+
+        for task in range(self.stats.total):
+            self.runq_running.append(0)
+            self.runq_complete.append(0)
+            self.runq_buildable.append(0)
+
+        for task in range(len(self.rqdata.runq_fnid)):
+            sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
+            sq_revdeps_new.append(set())
+            if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
+                endpoints[task] = None
+
+        for task in self.rqdata.runq_setscene:
+            for dep in self.rqdata.runq_depends[task]:
+                    endpoints[dep] = task
+
+        def process_endpoints(endpoints):
+            newendpoints = {}
+            for point, task in endpoints.items():
+                tasks = set()
+                if task:
+                    tasks.add(task)
+                if sq_revdeps_new[point]:
+                    tasks |= sq_revdeps_new[point]
+                sq_revdeps_new[point] = set()
+                for dep in self.rqdata.runq_depends[point]:
+                    if point in sq_revdeps[dep]:
+                        sq_revdeps[dep].remove(point)
+                    if tasks:
+                        sq_revdeps_new[dep] |= tasks
+                    if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
+                        newendpoints[dep] = task
+            if len(newendpoints) != 0:
+                process_endpoints(newendpoints)
+
+        process_endpoints(endpoints)
+
+        for task in range(len(self.rqdata.runq_fnid)):
+            if task in self.rqdata.runq_setscene:
+                deps = set()
+                for dep in sq_revdeps_new[task]:
+                    deps.add(self.rqdata.runq_setscene.index(dep))
+                sq_revdeps_squash.append(deps)
+            elif len(sq_revdeps_new[task]) != 0:
+                bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
+
+        #for task in range(len(sq_revdeps_squash)):
+        #    print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
+
+        self.sq_deps = []
+        self.sq_revdeps = sq_revdeps_squash
+        self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
+
+        for task in range(len(self.sq_revdeps)):
+            self.sq_deps.append(set())
+        for task in range(len(self.sq_revdeps)):
+            for dep in self.sq_revdeps[task]:
+                self.sq_deps[dep].add(task)
+
+        for task in range(len(self.sq_revdeps)):
+            if len(self.sq_revdeps[task]) == 0:
+                self.runq_buildable[task] = 1
+
+        bb.msg.note(1, bb.msg.domain.RunQueue, "Executing setscene Tasks")
+
+        self.rq.state = runQueueSceneRun
+
+    def scenequeue_updatecounters(self, task):
+        for dep in self.sq_deps[task]:
+            self.sq_revdeps2[dep].remove(task)
+            if len(self.sq_revdeps2[dep]) == 0:
+                self.runq_buildable[dep] = 1
+
+    def task_complete(self, task):
+        """
+        Mark a task as completed
+        Look at the reverse dependencies and mark any task with
+        completed dependencies as buildable
+        """
+
+        index = self.rqdata.runq_setscene[task]
+        bb.msg.note(1, bb.msg.domain.RunQueue, "Found task %s could be accelerated" % self.rqdata.get_user_idstring(index))
+
+        self.scenequeue_covered.add(task)
+        self.scenequeue_updatecounters(task)
+
+    def task_fail(self, task, result):
+        self.stats.taskFailed()
+        index = self.rqdata.runq_setscene[task]
+        bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
+        self.scenequeue_notcovered.add(task)
+        self.scenequeue_updatecounters(task)
+
+    def task_failoutright(self, task):
+        self.runq_running[task] = 1
+        self.runq_buildable[task] = 1
+        self.stats.taskCompleted()
+        self.stats.taskSkipped()
+        index = self.rqdata.runq_setscene[task]
+        self.scenequeue_notcovered.add(task)
+        self.scenequeue_updatecounters(task)
+
+    def task_skip(self, task):
+        self.runq_running[task] = 1
+        self.runq_buildable[task] = 1
+        self.task_complete(task)
+        self.stats.taskCompleted()
+        self.stats.taskSkipped()
+
+    def execute(self):
+        """
+        Run the tasks in a queue prepared by prepare_runqueue
+        """
+
+        task = None
+        if self.stats.active < self.number_tasks:
+            # Find the next setscene to run
+            for nexttask in range(self.stats.total):
+                if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
+                    #bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered))
+                    #if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered):
+                    #    bb.note("Skipping task %s" % nexttask)
+                    #    self.scenequeue_skip(nexttask)
+                    #    return True
+                    task = nexttask
+                    break
+        if task is not None:
+            realtask = self.rqdata.runq_setscene[task]
+            fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
+
+            taskname = self.rqdata.runq_task[realtask] + "_setscene"
+            if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
+                bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task)))
+                self.task_failoutright(task)
+                return True
+
+            if self.cooker.configuration.force:
+                for target in self.target_pairs:
+                    if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
+                        self.task_failoutright(task)
+                        return True
+
+            if self.rq.check_stamp_task(realtask, taskname):
+                bb.msg.debug(2, bb.msg.domain.RunQueue, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask)))
+                self.task_skip(task)
+                return True
+
+            pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
+
+            self.build_pids[pid] = task
+            self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
+            self.runq_running[task] = 1
+            self.stats.taskActive()
+            if self.stats.active < self.number_tasks:
+                return True
+
+        for pipe in self.build_pipes:
+            self.build_pipes[pipe].read()
+
+        if self.stats.active > 0:
+            if self.runqueue_process_waitpid() is None:
+                return True
+            return True
+
+        # Convert scenequeue_covered task numbers into full taskgraph ids
+        oldcovered = self.scenequeue_covered
+        self.rq.scenequeue_covered = set()
+        for task in oldcovered:
+            self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
+
+        bb.note("We can skip tasks %s" % self.rq.scenequeue_covered)
+
+        self.rq.state = runQueueRunInit
+        return True
 
 
 class TaskFailure(Exception):