inherit metadata_scm
inherit logging
-OE_IMPORTS += "os sys time oe.path oe.utils oe.data oe.package oe.packagegroup oe.sstatesig oe.lsb oe.cachedpath"
+OE_IMPORTS += "os sys time oe.path oe.utils oe.types oe.package oe.packagegroup oe.sstatesig oe.lsb oe.cachedpath"
OE_IMPORTS[type] = "list"
def oe_import(d):
elif name == "RCONFLICTS":
pkginfo.rconflicts = value
elif name == "PKGSIZE":
- pkginfo.size = long(value)
+ pkginfo.size = int(value)
elif name == "FILES":
pkginfo.files = value
elif name == "FILELIST":
d.setVar('scriptrelpath', scriptrelpath)
# Write out config file for devtool
- import ConfigParser
- config = ConfigParser.SafeConfigParser()
+ import configparser
+ config = configparser.SafeConfigParser()
config.add_section('General')
config.set('General', 'bitbake_subdir', conf_bbpath)
config.set('General', 'init_path', conf_initpath)
data['target_sys'] = e.data.getVar("TARGET_SYS", True)
data['failures'] = []
data['component'] = e.getPkgs()[0]
- data['branch_commit'] = base_detect_branch(e.data) + ": " + base_detect_revision(e.data)
+ data['branch_commit'] = str(base_detect_branch(e.data)) + ": " + str(base_detect_revision(e.data))
lock = bb.utils.lockfile(datafile + '.lock')
errorreport_savedata(e, data, "error-report.txt")
bb.utils.unlockfile(lock)
-class ClassRegistry(type):
+
+class ClassRegistryMeta(type):
+ """Give each ClassRegistry their own registry"""
+ def __init__(cls, name, bases, attrs):
+ cls.registry = {}
+ type.__init__(cls, name, bases, attrs)
+
+class ClassRegistry(type, metaclass=ClassRegistryMeta):
"""Maintain a registry of classes, indexed by name.
Note that this implementation requires that the names be unique, as it uses
control over whether the class will be added to the registry (e.g. to keep
abstract base classes out of the registry)."""
priority = 0
- class __metaclass__(type):
- """Give each ClassRegistry their own registry"""
- def __init__(cls, name, bases, attrs):
- cls.registry = {}
- type.__init__(cls, name, bases, attrs)
-
def __init__(cls, name, bases, attrs):
super(ClassRegistry, cls).__init__(name, bases, attrs)
try:
from contextlib import contextmanager
@contextmanager
def create_socket(url, d):
- import urllib
- socket = urllib.urlopen(url, proxies=get_proxies(d))
+ import urllib.request, urllib.parse, urllib.error
+ socket = urllib.request.urlopen(url, proxies=get_proxies(d))
try:
yield socket
finally:
"""
import inspect
-import types
+import oe.types as types
+import collections
available_types = {}
if type(obj) is type:
obj = obj.__init__
- args, varargs, keywords, defaults = inspect.getargspec(obj)
+ sig = inspect.signature(obj)
+ args = list(sig.parameters.keys())
+ defaults = list(s for s in sig.parameters.keys() if sig.parameters[s].default != inspect.Parameter.empty)
flaglist = []
if args:
if len(args) > 1 and args[0] == 'self':
continue
obj = getattr(types, name)
- if not callable(obj):
+ if not isinstance(obj, collections.Callable):
continue
register(name, obj)
import bb
-class Manifest(object):
+class Manifest(object, metaclass=ABCMeta):
"""
This is an abstract class. Do not instantiate this directly.
"""
- __metaclass__ = ABCMeta
PKG_TYPE_MUST_INSTALL = "mip"
PKG_TYPE_MULTILIB = "mlp"
# 8 - shared library
# 16 - kernel module
- import commands, stat, subprocess
+ import stat, subprocess
(file, elftype, strip) = arg
return output
-class Indexer(object):
- __metaclass__ = ABCMeta
-
+class Indexer(object, metaclass=ABCMeta):
def __init__(self, d, deploy_dir):
self.d = d
self.deploy_dir = deploy_dir
-class PkgsList(object):
- __metaclass__ = ABCMeta
-
+class PkgsList(object, metaclass=ABCMeta):
def __init__(self, d, rootfs_dir):
self.d = d
self.rootfs_dir = rootfs_dir
return opkg_query(cmd_output)
-class PackageManager(object):
+class PackageManager(object, metaclass=ABCMeta):
"""
This is an abstract class. Do not instantiate this directly.
"""
- __metaclass__ = ABCMeta
def __init__(self, d):
self.d = d
if len(self.data) < ELFFile.EI_NIDENT + 4:
raise NotELFFileError("%s is not an ELF" % self.name)
- self.my_assert(self.data[0], chr(0x7f) )
- self.my_assert(self.data[1], 'E')
- self.my_assert(self.data[2], 'L')
- self.my_assert(self.data[3], 'F')
+ self.my_assert(self.data[0], 0x7f)
+ self.my_assert(self.data[1], ord('E'))
+ self.my_assert(self.data[2], ord('L'))
+ self.my_assert(self.data[3], ord('F'))
if self.bits == 0:
- if self.data[ELFFile.EI_CLASS] == chr(ELFFile.ELFCLASS32):
+ if self.data[ELFFile.EI_CLASS] == ELFFile.ELFCLASS32:
self.bits = 32
- elif self.data[ELFFile.EI_CLASS] == chr(ELFFile.ELFCLASS64):
+ elif self.data[ELFFile.EI_CLASS] == ELFFile.ELFCLASS64:
self.bits = 64
else:
# Not 32-bit or 64.. lets assert
raise NotELFFileError("ELF but not 32 or 64 bit.")
elif self.bits == 32:
- self.my_assert(self.data[ELFFile.EI_CLASS], chr(ELFFile.ELFCLASS32))
+ self.my_assert(self.data[ELFFile.EI_CLASS], ELFFile.ELFCLASS32)
elif self.bits == 64:
- self.my_assert(self.data[ELFFile.EI_CLASS], chr(ELFFile.ELFCLASS64))
+ self.my_assert(self.data[ELFFile.EI_CLASS], ELFFile.ELFCLASS64)
else:
raise NotELFFileError("Must specify unknown, 32 or 64 bit size.")
- self.my_assert(self.data[ELFFile.EI_VERSION], chr(ELFFile.EV_CURRENT) )
+ self.my_assert(self.data[ELFFile.EI_VERSION], ELFFile.EV_CURRENT)
self.sex = self.data[ELFFile.EI_DATA]
- if self.sex == chr(ELFFile.ELFDATANONE):
+ if self.sex == ELFFile.ELFDATANONE:
raise NotELFFileError("self.sex == ELFDATANONE")
- elif self.sex == chr(ELFFile.ELFDATA2LSB):
+ elif self.sex == ELFFile.ELFDATA2LSB:
self.sex = "<"
- elif self.sex == chr(ELFFile.ELFDATA2MSB):
+ elif self.sex == ELFFile.ELFDATA2MSB:
self.sex = ">"
else:
raise NotELFFileError("Unknown self.sex")
def osAbi(self):
- return ord(self.data[ELFFile.EI_OSABI])
+ return self.data[ELFFile.EI_OSABI]
def abiVersion(self):
- return ord(self.data[ELFFile.EI_ABIVERSION])
+ return self.data[ELFFile.EI_ABIVERSION]
def abiSize(self):
return self.bits
import tempfile
import textwrap
import difflib
-import utils
+from . import utils
import shutil
import re
import fnmatch
if removevar in removevalues:
remove = removevalues[removevar]
- if isinstance(remove, basestring):
+ if isinstance(remove, str):
if remove in splitval:
splitval.remove(remove)
changed = True
import re
-class Rootfs(object):
+class Rootfs(object, metaclass=ABCMeta):
"""
This is an abstract class. Do not instantiate this directly.
"""
- __metaclass__ = ABCMeta
def __init__(self, d):
self.d = d
import traceback
-class Sdk(object):
- __metaclass__ = ABCMeta
-
+class Sdk(object, metaclass=ABCMeta):
def __init__(self, d, manifest_dir):
self.d = d
self.sdk_output = self.d.getVar('SDK_OUTPUT', True)
return bool(cls.command)
-class Terminal(Popen):
- __metaclass__ = Registry
-
+class Terminal(Popen, metaclass=Registry):
def __init__(self, sh_cmd, title=None, env=None, d=None):
fmt_sh_cmd = self.format_command(sh_cmd, title)
try:
def format_command(self, sh_cmd, title):
fmt = {'title': title or 'Terminal', 'command': sh_cmd}
- if isinstance(self.command, basestring):
+ if isinstance(self.command, str):
return shlex.split(self.command.format(**fmt))
else:
return [element.format(**fmt) for element in self.command]
def test_loop(self):
for e in self.EXCEPTIONS:
- self.assertRaisesRegexp(OSError, r'\[Errno %u\]' % e[1],
+ self.assertRaisesRegex(OSError, r'\[Errno %u\]' % e[1],
self.__realpath, e[0], False, False)
Acts as a multiple choice for the user. To use this, set the variable
type flag to 'choice', and set the 'choices' flag to a space separated
list of valid values."""
- if not isinstance(value, basestring):
+ if not isinstance(value, str):
raise TypeError("choice accepts a string, not '%s'" % type(value))
value = value.lower()
Valid values for false: 'no', 'n', 'false', 'f', '0'
"""
- if not isinstance(value, basestring):
+ if not isinstance(value, str):
raise TypeError("boolean accepts a string, not '%s'" % type(value))
value = value.lower()
val2 = d.getVar(variable2, True)
val1 = set(val1.split())
val2 = set(val2.split())
- if isinstance(checkvalue, basestring):
+ if isinstance(checkvalue, str):
checkvalue = set(checkvalue.split())
else:
checkvalue = set(checkvalue)
# so implement a version here
#
-from Queue import Queue
+from queue import Queue
from threading import Thread
class ThreadedWorker(Thread):
self.worker_end = worker_end
def run(self):
- from Queue import Empty
+ from queue import Empty
if self.worker_init is not None:
self.worker_init(self)
from abc import ABCMeta, abstractmethod
-class MasterImageHardwareTarget(oeqa.targetcontrol.BaseTarget):
-
- __metaclass__ = ABCMeta
+class MasterImageHardwareTarget(oeqa.targetcontrol.BaseTarget, metaclass=ABCMeta):
supported_image_fstypes = ['tar.gz', 'tar.bz2']
import subprocess
import signal
import shutil
+import functools
try:
import bb
except ImportError:
for index, suite in enumerate(suites):
set_suite_depth(suite)
suite.index = index
- suites.sort(cmp=lambda a,b: cmp((a.depth, a.index), (b.depth, b.index)))
+
+ def cmp(a, b):
+ return (a > b) - (a < b)
+
+ def cmpfunc(a, b):
+ return cmp((a.depth, a.index), (b.depth, b.index))
+
+ suites.sort(key=functools.cmp_to_key(cmpfunc))
self.suite = testloader.suiteClass(suites)
import os
import sys
import shlex, subprocess
-import urllib, commands, time, getpass, re, json, shlex
+import urllib.request, urllib.parse, urllib.error, subprocess, time, getpass, re, json, shlex
import oeqa.utils.ftools as ftools
from oeqa.selftest.base import oeSelfTest
layers = Layer.objects.values('id', 'layer_index_url')
cnt_err = []
for layer in layers:
- resp = urllib.urlopen(layer['layer_index_url'])
+ resp = urllib.request.urlopen(layer['layer_index_url'])
if (resp.getcode() != 200):
cnt_err.append(layer['id'])
self.assertEqual(len(cnt_err), 0, msg = 'Errors for layer id: %s' % cnt_err)
import os
import logging
import tempfile
-import urlparse
+import urllib.parse
from oeqa.utils.commands import runCmd, bitbake, get_bb_var, create_temp_layer
from oeqa.utils.decorators import testcase
'''Return the first file:// in SRC_URI for the specified recipe.'''
src_uri = get_bb_var('SRC_URI', recipe).split()
for uri in src_uri:
- p = urlparse.urlparse(uri)
+ p = urllib.parse.urlparse(uri)
if p.scheme == 'file':
return p.netloc + p.path
files2 = get_files(topdir + "/tmp-sstatesamehash2/stamps/")
files2 = [x.replace("tmp-sstatesamehash2", "tmp-sstatesamehash").replace("i686-linux", "x86_64-linux").replace("i686" + targetvendor + "-linux", "x86_64" + targetvendor + "-linux", ) for x in files2]
self.maxDiff = None
- self.assertItemsEqual(files1, files2)
+ self.assertCountEqual(files1, files2)
@testcase(1271)
files2 = get_files(topdir + "/tmp-sstatesamehash2/stamps/")
files2 = [x.replace("tmp-sstatesamehash2", "tmp-sstatesamehash") for x in files2]
self.maxDiff = None
- self.assertItemsEqual(files1, files2)
+ self.assertCountEqual(files1, files2)
@testcase(1368)
def test_sstate_allarch_samesigs(self):
files2 = get_files(topdir + "/tmp-sstatesamehash2/stamps")
files2 = [x.replace("tmp-sstatesamehash2", "tmp-sstatesamehash") for x in files2]
self.maxDiff = None
- self.assertItemsEqual(files1, files2)
+ self.assertCountEqual(files1, files2)
def test_sstate_noop_samesigs(self):
return controller(d)
-class BaseTarget(object):
-
- __metaclass__ = ABCMeta
+class BaseTarget(object, metaclass=ABCMeta):
supported_image_fstypes = []
self.data = data
self.options = dict(self.defaultopts)
- if isinstance(self.cmd, basestring):
+ if isinstance(self.cmd, str):
self.options["shell"] = True
if self.data:
self.options['stdin'] = subprocess.PIPE
else:
extra_args = ""
- if isinstance(command, basestring):
+ if isinstance(command, str):
cmd = "bitbake " + extra_args + " " + command
else:
cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]]
def filter(self, record):
return record.levelno == 100
+import inspect
+
def LogResults(original_class):
orig_method = original_class.run
logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log')
linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log')
+ def get_class_that_defined_method(meth):
+ if inspect.ismethod(meth):
+ for cls in inspect.getmro(meth.__self__.__class__):
+ if cls.__dict__.get(meth.__name__) is meth:
+ return cls
+ meth = meth.__func__ # fallback to __qualname__ parsing
+ if inspect.isfunction(meth):
+ cls = getattr(inspect.getmodule(meth),
+ meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
+ if isinstance(cls, type):
+ return cls
+ return None
+
#rewrite the run method of unittest.TestCase to add testcase logging
def run(self, result, *args, **kws):
orig_method(self, result, *args, **kws)
except AttributeError:
test_case = self._testMethodName
- class_name = str(testMethod.im_class).split("'")[1]
+ class_name = str(get_class_that_defined_method(testMethod)).split("'")[1]
#create custom logging level for filtering.
custom_log_level = 100
import errno
import datetime
import itertools
-from commands import runCmd
+from .commands import runCmd
def get_host_dumper(d):
cmds = d.getVar("testimage_dump_host", True)
-import SimpleHTTPServer
+import http.server
import multiprocessing
import os
-class HTTPServer(SimpleHTTPServer.BaseHTTPServer.HTTPServer):
+class HTTPServer(http.server.HTTPServer):
def server_start(self, root_dir):
import signal
os.chdir(root_dir)
self.serve_forever()
-class HTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
+class HTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format_str, *args):
pass
import sys
import os
import re
-import ftools
+from . import ftools
# A parser that can be used to identify weather a line is a test result or a section statement.
# Get Unicode non printable control chars
control_range = list(range(0,32))+list(range(127,160))
-control_chars = [unichr(x) for x in control_range
- if unichr(x) not in string.printable]
+control_chars = [chr(x) for x in control_range
+ if chr(x) not in string.printable]
re_control_char = re.compile('[%s]' % re.escape("".join(control_chars)))
class QemuRunner:
stopread = False
qemusock = None
bootlog = ''
+ data = b''
while time.time() < endtime and not stopread:
sread, swrite, serror = select.select(socklist, [], [], 5)
for sock in sread:
if hasattr(self, "origchldhandler"):
signal.signal(signal.SIGCHLD, self.origchldhandler)
if self.runqemu:
- os.kill(self.monitorpid, signal.SIGKILL)
- logger.info("Sending SIGTERM to runqemu")
- try:
- os.killpg(os.getpgid(self.runqemu.pid), signal.SIGTERM)
- except OSError as e:
- if e.errno != errno.ESRCH:
- raise
+ if hasattr(self, "monitorpid"):
+ os.kill(self.monitorpid, signal.SIGKILL)
+ logger.info("Sending SIGTERM to runqemu")
+ try:
+ os.killpg(os.getpgid(self.runqemu.pid), signal.SIGTERM)
+ except OSError as e:
+ if e.errno != errno.ESRCH:
+ raise
endtime = time.time() + self.runqemutime
while self.runqemu.poll() is None and time.time() < endtime:
time.sleep(1)
def stop(self):
self.logger.info("Stopping logging thread")
if self.running:
- os.write(self.writepipe, "stop")
+ os.write(self.writepipe, bytes("stop", "utf-8"))
def teardown(self):
self.logger.info("Tearing down logging thread")
import socket
import select
import bb
-from qemurunner import QemuRunner
+from .qemurunner import QemuRunner
class QemuTinyRunner(QemuRunner):
import subprocess
from abc import ABCMeta, abstractmethod
-class BuildProject():
-
- __metaclass__ = ABCMeta
+class BuildProject(metaclass=ABCMeta):
def __init__(self, d, uri, foldername=None, tmpdir="/tmp/"):
self.d = d
import os, re, glob as g, shutil as sh,sys
from time import sleep
-from commands import runCmd
+from .commands import runCmd
from difflib import SequenceMatcher as SM
try:
-#!/usr/bin/env python
+#!/usr/bin/env python3
# Copyright (c) 2013 Intel Corporation
#
import time as t
import re
import fnmatch
+import collections
+import imp
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/lib')
import scriptpath
# Coverage under version 4 uses coverage.coverage
from coverage import coverage as Coverage
- import cStringIO as StringIO
+ import io as StringIO
from coverage.misc import CoverageException
cov_output = StringIO.StringIO()
bbpath = get_bb_var('BBPATH').split(':')
layer_libdirs = [p for p in (os.path.join(l, 'lib') for l in bbpath) if os.path.exists(p)]
sys.path.extend(layer_libdirs)
- reload(oeqa.selftest)
+ imp.reload(oeqa.selftest)
if args.run_tests_by and len(args.run_tests_by) >= 2:
valid_options = ['name', 'class', 'module', 'id', 'tag']
if isinstance(t, type(oeSelfTest)) and issubclass(t, oeSelfTest) and t!=oeSelfTest:
print(" --", v)
for method in dir(t):
- if method.startswith("test_") and callable(vars(t)[method]):
+ if method.startswith("test_") and isinstance(vars(t)[method], collections.Callable):
print(" -- --", method)
except (AttributeError, ImportError) as e: