mirror of
https://git.freebsd.org/src.git
synced 2026-01-16 23:02:24 +00:00
This changes intends to reduce the bar to the kernel unit-testing by
introducing a new kernel-testing framework ("ktest") based on Netlink,
loadable test modules and python test suite integration.
This framework provides the following features:
* Integration to the FreeBSD test suite
* Automatic test discovery
* Automatic test module loading
* Minimal boiler-plate code in both kernel and userland
* Passing any metadata to the test
* Convenient environment pre-setup using python testing framework
* Streaming messages from the kernel to the userland
* Running tests in the dedicated taskqueues
* Skipping or parametrizing tests
Differential Revision: https://reviews.freebsd.org/D39385
MFC after: 2 weeks
289 lines
7.6 KiB
Python
289 lines
7.6 KiB
Python
import socket
|
|
import struct
|
|
from enum import Enum
|
|
|
|
from atf_python.sys.netlink.utils import align4
|
|
from atf_python.sys.netlink.utils import enum_or_int
|
|
|
|
|
|
class NlAttr(object):
|
|
HDR_LEN = 4 # sizeof(struct nlattr)
|
|
|
|
def __init__(self, nla_type, data):
|
|
if isinstance(nla_type, Enum):
|
|
self._nla_type = nla_type.value
|
|
self._enum = nla_type
|
|
else:
|
|
self._nla_type = nla_type
|
|
self._enum = None
|
|
self.nla_list = []
|
|
self._data = data
|
|
|
|
@property
|
|
def nla_type(self):
|
|
return self._nla_type & 0x3F
|
|
|
|
@property
|
|
def nla_len(self):
|
|
return len(self._data) + 4
|
|
|
|
def add_nla(self, nla):
|
|
self.nla_list.append(nla)
|
|
|
|
def print_attr(self, prepend=""):
|
|
if self._enum is not None:
|
|
type_str = self._enum.name
|
|
else:
|
|
type_str = "nla#{}".format(self.nla_type)
|
|
print(
|
|
"{}len={} type={}({}){}".format(
|
|
prepend, self.nla_len, type_str, self.nla_type, self._print_attr_value()
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def _validate(data):
|
|
if len(data) < 4:
|
|
raise ValueError("attribute too short")
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
if nla_len > len(data):
|
|
raise ValueError("attribute length too big")
|
|
if nla_len < 4:
|
|
raise ValueError("attribute length too short")
|
|
|
|
@classmethod
|
|
def _parse(cls, data):
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
return cls(nla_type, data[4:])
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data, attr_type_enum=None):
|
|
cls._validate(data)
|
|
attr = cls._parse(data)
|
|
attr._enum = attr_type_enum
|
|
return attr
|
|
|
|
def _to_bytes(self, data: bytes):
|
|
ret = data
|
|
if align4(len(ret)) != len(ret):
|
|
ret = data + bytes(align4(len(ret)) - len(ret))
|
|
return struct.pack("@HH", len(data) + 4, self._nla_type) + ret
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(self._data)
|
|
|
|
def _print_attr_value(self):
|
|
return " " + " ".join(["x{:02X}".format(b) for b in self._data])
|
|
|
|
|
|
class NlAttrNested(NlAttr):
|
|
def __init__(self, nla_type, val):
|
|
super().__init__(nla_type, b"")
|
|
self.nla_list = val
|
|
|
|
@property
|
|
def nla_len(self):
|
|
return align4(len(b"".join([bytes(nla) for nla in self.nla_list]))) + 4
|
|
|
|
def print_attr(self, prepend=""):
|
|
if self._enum is not None:
|
|
type_str = self._enum.name
|
|
else:
|
|
type_str = "nla#{}".format(self.nla_type)
|
|
print(
|
|
"{}len={} type={}({}) {{".format(
|
|
prepend, self.nla_len, type_str, self.nla_type
|
|
)
|
|
)
|
|
for nla in self.nla_list:
|
|
nla.print_attr(prepend + " ")
|
|
print("{}}}".format(prepend))
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(b"".join([bytes(nla) for nla in self.nla_list]))
|
|
|
|
|
|
class NlAttrU32(NlAttr):
|
|
def __init__(self, nla_type, val):
|
|
self.u32 = enum_or_int(val)
|
|
super().__init__(nla_type, b"")
|
|
|
|
@property
|
|
def nla_len(self):
|
|
return 8
|
|
|
|
def _print_attr_value(self):
|
|
return " val={}".format(self.u32)
|
|
|
|
@staticmethod
|
|
def _validate(data):
|
|
assert len(data) == 8
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
assert nla_len == 8
|
|
|
|
@classmethod
|
|
def _parse(cls, data):
|
|
nla_len, nla_type, val = struct.unpack("@HHI", data)
|
|
return cls(nla_type, val)
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(struct.pack("@I", self.u32))
|
|
|
|
|
|
class NlAttrU16(NlAttr):
|
|
def __init__(self, nla_type, val):
|
|
self.u16 = enum_or_int(val)
|
|
super().__init__(nla_type, b"")
|
|
|
|
@property
|
|
def nla_len(self):
|
|
return 6
|
|
|
|
def _print_attr_value(self):
|
|
return " val={}".format(self.u16)
|
|
|
|
@staticmethod
|
|
def _validate(data):
|
|
assert len(data) == 6
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
assert nla_len == 6
|
|
|
|
@classmethod
|
|
def _parse(cls, data):
|
|
nla_len, nla_type, val = struct.unpack("@HHH", data)
|
|
return cls(nla_type, val)
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(struct.pack("@H", self.u16))
|
|
|
|
|
|
class NlAttrU8(NlAttr):
|
|
def __init__(self, nla_type, val):
|
|
self.u8 = enum_or_int(val)
|
|
super().__init__(nla_type, b"")
|
|
|
|
@property
|
|
def nla_len(self):
|
|
return 5
|
|
|
|
def _print_attr_value(self):
|
|
return " val={}".format(self.u8)
|
|
|
|
@staticmethod
|
|
def _validate(data):
|
|
assert len(data) == 5
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
assert nla_len == 5
|
|
|
|
@classmethod
|
|
def _parse(cls, data):
|
|
nla_len, nla_type, val = struct.unpack("@HHB", data)
|
|
return cls(nla_type, val)
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(struct.pack("@B", self.u8))
|
|
|
|
|
|
class NlAttrIp(NlAttr):
|
|
def __init__(self, nla_type, addr: str):
|
|
super().__init__(nla_type, b"")
|
|
self.addr = addr
|
|
if ":" in self.addr:
|
|
self.family = socket.AF_INET6
|
|
else:
|
|
self.family = socket.AF_INET
|
|
|
|
@staticmethod
|
|
def _validate(data):
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
data_len = nla_len - 4
|
|
if data_len != 4 and data_len != 16:
|
|
raise ValueError(
|
|
"Error validating attr {}: nla_len is not valid".format( # noqa: E501
|
|
nla_type
|
|
)
|
|
)
|
|
|
|
@property
|
|
def nla_len(self):
|
|
if self.family == socket.AF_INET6:
|
|
return 20
|
|
else:
|
|
return 8
|
|
return align4(len(self._data)) + 4
|
|
|
|
@classmethod
|
|
def _parse(cls, data):
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
data_len = len(data) - 4
|
|
if data_len == 4:
|
|
addr = socket.inet_ntop(socket.AF_INET, data[4:8])
|
|
else:
|
|
addr = socket.inet_ntop(socket.AF_INET6, data[4:20])
|
|
return cls(nla_type, addr)
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(socket.inet_pton(self.family, self.addr))
|
|
|
|
def _print_attr_value(self):
|
|
return " addr={}".format(self.addr)
|
|
|
|
|
|
class NlAttrStr(NlAttr):
|
|
def __init__(self, nla_type, text):
|
|
super().__init__(nla_type, b"")
|
|
self.text = text
|
|
|
|
@staticmethod
|
|
def _validate(data):
|
|
NlAttr._validate(data)
|
|
try:
|
|
data[4:].decode("utf-8")
|
|
except Exception as e:
|
|
raise ValueError("wrong utf-8 string: {}".format(e))
|
|
|
|
@property
|
|
def nla_len(self):
|
|
return len(self.text) + 5
|
|
|
|
@classmethod
|
|
def _parse(cls, data):
|
|
text = data[4:-1].decode("utf-8")
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
return cls(nla_type, text)
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(bytes(self.text, encoding="utf-8") + bytes(1))
|
|
|
|
def _print_attr_value(self):
|
|
return ' val="{}"'.format(self.text)
|
|
|
|
|
|
class NlAttrStrn(NlAttr):
|
|
def __init__(self, nla_type, text):
|
|
super().__init__(nla_type, b"")
|
|
self.text = text
|
|
|
|
@staticmethod
|
|
def _validate(data):
|
|
NlAttr._validate(data)
|
|
try:
|
|
data[4:].decode("utf-8")
|
|
except Exception as e:
|
|
raise ValueError("wrong utf-8 string: {}".format(e))
|
|
|
|
@property
|
|
def nla_len(self):
|
|
return len(self.text) + 4
|
|
|
|
@classmethod
|
|
def _parse(cls, data):
|
|
text = data[4:].decode("utf-8")
|
|
nla_len, nla_type = struct.unpack("@HH", data[:4])
|
|
return cls(nla_type, text)
|
|
|
|
def __bytes__(self):
|
|
return self._to_bytes(bytes(self.text, encoding="utf-8"))
|
|
|
|
def _print_attr_value(self):
|
|
return ' val="{}"'.format(self.text)
|