import bt2
except ImportError:
# quick fix for debian-based distros
- sys.path.append("/usr/local/lib/python%d.%d/site-packages" %
- (sys.version_info.major, sys.version_info.minor))
+ sys.path.append(
+ "/usr/local/lib/python%d.%d/site-packages"
+ % (sys.version_info.major, sys.version_info.minor)
+ )
import bt2
NSEC_PER_SEC = 1000000000
+
class TraceParser:
def __init__(self, trace_msg_iter, pid):
self.trace = trace_msg_iter
# Each test classes checks the payload of different events. Each of
# those checks are stored in a event_name specific dictionnary in this
# data structure.
- self.expect = defaultdict(lambda : defaultdict(int))
+ self.expect = defaultdict(lambda: defaultdict(int))
# This dictionnary holds the value recorded in the trace that are
# tested. Its content is use to print the values that caused a test to
self.recorded_values = {}
def ns_to_hour_nsec(self, ns):
- d = time.localtime(ns/NSEC_PER_SEC)
- return "%02d:%02d:%02d.%09d" % (d.tm_hour, d.tm_min, d.tm_sec,
- ns % NSEC_PER_SEC)
+ d = time.localtime(ns / NSEC_PER_SEC)
+ return "%02d:%02d:%02d.%09d" % (
+ d.tm_hour,
+ d.tm_min,
+ d.tm_sec,
+ ns % NSEC_PER_SEC,
+ )
def parse(self):
# iterate over all the events
continue
method_name = "handle_%s" % msg.event.name.replace(":", "_").replace(
- "+", "_")
+ "+", "_"
+ )
# call the function to handle each event individually
if hasattr(TraceParser, method_name):
func = getattr(TraceParser, method_name)
pass
-class Test1(TraceParser):
+class WorkingCases(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
- self.epoll_wait_fd = validation_args['epoll_wait_fd']
- self.epoll_pwait_fd = validation_args['epoll_pwait_fd']
+ self.epoll_wait_fd = validation_args["epoll_wait_fd"]
+ self.epoll_pwait_fd = validation_args["epoll_pwait_fd"]
self.expect["select_entry"]["select_in_fd0"] = 0
self.expect["select_entry"]["select_in_fd1023"] = 0
exceptfd_127 = event["exceptfds"][127]
# check that the FD 1023 is actually set in the readfds
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and overflow == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and overflow == 0
+ ):
self.expect["select_entry"]["select_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
readfd_127 = event["readfds"][127]
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp == 0
+ ):
self.expect["select_exit"]["select_out_fd1023"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_in_nfds1"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x1 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x1
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_exit"]["poll_out_nfds1"] = 1
# Save values of local variables to print in case of test failure
# check that we have FD 0 waiting for EPOLLIN|EPOLLPRI and that
# data.fd = 0
- if (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd) and 'EPOLL_CTL_ADD' in op_enum.labels and fd == 0 and \
- _event["data_union"]["fd"] == 0 and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd)
+ and "EPOLL_CTL_ADD" in op_enum.labels
+ and fd == 0
+ and _event["data_union"]["fd"] == 0
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_in_add"] = 1
# Save values of local variables to print in case of test failure
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
- fd_0["events"]["EPOLLIN"] == 1:
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
self.expect["epoll_wait_exit"]["epoll_wait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
- fd_0["events"]["EPOLLIN"] == 1:
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_pwait_exit"] = locals()
-class Test2(TraceParser):
+
+class WorkingCasesTimeout(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_timeout_in_fd0"] = 0
self.expect["select_entry"]["select_timeout_in_fd1023"] = 0
self.expect["select_exit"]["select_timeout_out"] = 0
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp != 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp != 0
+ ):
self.expect["select_entry"]["select_timeout_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
# field matches the value of POLLIN
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and \
- fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_timeout_in"] = 1
# Save values of local variables to print in case of test failure
_event = event["event"]
# make sure we see a EPOLLIN|EPOLLPRI
- if 'EPOLL_CTL_ADD' in op_enum.labels and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ "EPOLL_CTL_ADD" in op_enum.labels
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_timeout_in_add"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_wait_exit"] = locals()
-class Test3(TraceParser):
+class PselectInvalidFd(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_invalid_fd_in"] = 0
self.expect["select_exit"]["select_invalid_fd_out"] = 0
self.recorded_values["select_exit"] = locals()
-class Test4(TraceParser):
+class PpollBig(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["big_poll_in"] = 0
self.expect["poll_exit"]["big_poll_out"] = 0
if nfds == 2047 and fds_length == 512 and overflow == 1:
fd_0 = event["fds"][0]
fd_511 = event["fds"][511]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0 and \
- fd_511["events"]["POLLIN"] == 1 and \
- fd_511["events"]["POLLPRI"] == 1:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ and fd_511["events"]["POLLIN"] == 1
+ and fd_511["events"]["POLLPRI"] == 1
+ ):
self.expect["poll_entry"]["big_poll_in"] = 1
# Save values of local variables to print in case of test failure
# test of big list of FDs and the behaviour of the overflow
if ret == 2047 and nfds == 2047 and fds_length == 512 and overflow == 1:
- fd_0 = event["fds"][0]
- fd_511 = event["fds"][511]
- if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
- self.expect["poll_exit"]["big_poll_out"] = 1
+ fd_0 = event["fds"][0]
+ fd_511 = event["fds"][511]
+ if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
+ self.expect["poll_exit"]["big_poll_out"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["poll_exit"] = locals()
-class Test5(TraceParser):
+
+class PpollFdsBufferOverflow(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_overflow_in"] = 0
self.expect["poll_exit"]["poll_overflow_out"] = 0
self.recorded_values["poll_exit"] = locals()
-class Test6(TraceParser):
+class PselectInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["pselect_invalid_in"] = 0
self.expect["select_exit"]["pselect_invalid_out"] = 0
self.recorded_values["select_exit"] = locals()
-class Test7(TraceParser):
+class PpollFdsULongMax(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_max_in"] = 0
self.expect["poll_exit"]["poll_max_out"] = 0
# Save values of local variables to print in case of test failure
self.recorded_values["poll_entry"] = locals()
-
def poll_exit(self, event):
ret = event["ret"]
nfds = event["nfds"]
self.recorded_values["poll_exit"] = locals()
-class Test8(TraceParser):
+class EpollPwaitInvalidPointer(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
- self.epoll_fd = validation_args['epollfd']
+ self.epoll_fd = validation_args["epollfd"]
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_invalid_out"] = 0
self.recorded_values["epoll_wait_exit"] = locals()
-class Test9(TraceParser):
+class EpollPwaitIntMax(TraceParser):
def __init__(self, trace, validation_args):
- super().__init__(trace, validation_args['pid'])
+ super().__init__(trace, validation_args["pid"])
# Values expected in the trace
- self.epoll_fd = validation_args['epollfd']
+ self.epoll_fd = validation_args["epollfd"]
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_max_out"] = 0
if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Trace parser')
- parser.add_argument('path', metavar="<path/to/trace>", help='Trace path')
- parser.add_argument('-t', '--test', type=int, help='Test to validate')
- parser.add_argument('-o', '--validation-file', type=str, help='Validation file path')
+ parser = argparse.ArgumentParser(description="Trace parser")
+ parser.add_argument("path", metavar="<path/to/trace>", help="Trace path")
+ parser.add_argument("-t", "--test", type=str, help="Test to validate")
+ parser.add_argument(
+ "-o", "--validation-file", type=str, help="Validation file path"
+ )
args = parser.parse_args()
if not args.test:
try:
test_validation_args = json.load(f)
except Exception as e:
- print('Failed to parse validation file: ' + str(e))
+ print("Failed to parse validation file: " + str(e))
sys.exit(1)
t = None
- if args.test == 1:
- t = Test1(traces, test_validation_args)
- elif args.test == 2:
- t = Test2(traces, test_validation_args)
- elif args.test == 3:
- t = Test3(traces, test_validation_args)
- elif args.test == 4:
- t = Test4(traces, test_validation_args)
- elif args.test == 5:
- t = Test5(traces, test_validation_args)
- elif args.test == 6:
- t = Test6(traces, test_validation_args)
- elif args.test == 7:
- t = Test7(traces, test_validation_args)
- elif args.test == 8:
- t = Test8(traces, test_validation_args)
- elif args.test == 9:
- t = Test9(traces, test_validation_args)
- elif args.test == 10:
+ if args.test == "working_cases":
+ t = WorkingCases(traces, test_validation_args)
+ elif args.test == "working_cases_timeout":
+ t = WorkingCasesTimeout(traces, test_validation_args)
+ elif args.test == "pselect_invalid_fd":
+ t = PselectInvalidFd(traces, test_validation_args)
+ elif args.test == "ppoll_big":
+ t = PpollBig(traces, test_validation_args)
+ elif args.test == "ppoll_fds_buffer_overflow":
+ t = PpollFdsBufferOverflow(traces, test_validation_args)
+ elif args.test == "pselect_invalid_pointer":
+ t = PselectInvalidPointer(traces, test_validation_args)
+ elif args.test == "ppoll_fds_ulong_max":
+ t = PpollFdsULongMax(traces, test_validation_args)
+ elif args.test == "epoll_pwait_invalid_pointer":
+ t = EpollPwaitInvalidPointer(traces, test_validation_args)
+ elif args.test == "epoll_pwait_int_max":
+ t = EpollPwaitIntMax(traces, test_validation_args)
+ elif args.test == "ppoll_concurrent_write":
# stress test, nothing reliable to check
ret = 0
- elif args.test == 11:
+ elif args.test == "epoll_pwait_concurrent_munmap":
# stress test, nothing reliable to check
ret = 0
else: