/usr/lib/plainbox-providers-1/checkbox/bin/memory_test is in plainbox-provider-checkbox 0.3-2.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | #!/usr/bin/env python3
import os
import sys
import re
from argparse import ArgumentParser
from subprocess import Popen, PIPE
class MemoryTest():
def __init__(self):
self.free_memory = 0
self.system_memory = 0
self.swap_memory = 0
self.process_memory = 0
self.is_process_limited = False
@property
def threaded_memtest_script(self):
directory = os.path.dirname(os.path.abspath(__file__))
return os.path.join(directory, "threaded_memtest")
def _get_memory(self):
mem_info = open("/proc/meminfo", "r")
try:
while True:
line = mem_info.readline()
if line:
tokens = line.split()
if len(tokens) == 3:
if "MemTotal:" == tokens[0].strip():
self.system_memory = \
int(tokens[1].strip()) // 1024
elif tokens[0].strip() in ["MemFree:",
"Cached:",
"Buffers:"]:
self.free_memory += \
int(tokens[1].strip()) // 1024
elif "SwapTotal:" == tokens[0].strip():
self.swap_memory = \
int(tokens[1].strip()) // 1024
else:
break
except Exception as e:
print("ERROR: Unable to get data from /proc/meminfo",
file=sys.stderr)
print(e, file=sys.stderr)
finally:
mem_info.close()
def _command(self, command, shell=True):
proc = Popen(command,
shell=shell,
stdout=PIPE,
stderr=PIPE)
return proc
def _command_out(self, command, shell=True):
proc = self._command(command, shell)
return proc.communicate()[0].strip()
def get_limits(self):
self._get_memory()
print("System Memory: %u MB" % self.system_memory)
print("Free Memory: %u MB" % self.free_memory)
print("Swap Memory: %u MB" % self.swap_memory)
if self.system_memory == 0:
print("ERROR: could not determine system RAM", file=sys.stderr)
return False
# Process Memory
self.process_memory = self.free_memory
try:
arch = self._command_out("arch").decode()
if (re.match(r"(i[0-9]86|s390|arm.*)", arch)
and self.free_memory > 1024):
self.is_process_limited = True
self.process_memory = 1024 # MB, due to 32-bit address space
print("%s arch, Limiting Process Memory: %u" % (
arch, self.process_memory))
# others? what about PAE kernel?
except Exception as e:
print("ERROR: could not determine system architecture via arch",
file=sys.stderr)
print(e, file=sys.stderr)
return False
return True
def run(self):
PASSED = 0
FAILED = 1
limits = self.get_limits()
if not limits:
return FAILED
# if process memory is limited, run multiple processes
if self.is_process_limited:
print("Running Multiple Process Memory Test")
if not self.run_multiple_process_test():
return FAILED
else:
print("Running Single Process Memory Test")
if not self.run_single_process_test():
return FAILED
# otherwised, passed
return PASSED
def run_single_process_test(self):
if not self.run_threaded_memory_test():
return False
return True
def run_multiple_process_test(self):
processes = self.free_memory // self.process_memory
# if not swap-less, add a process to hit swap
if not self.swap_memory == 0:
processes += 1
# check to make sure there's enough swap
required_memory = self.process_memory * processes
if required_memory > self.system_memory + self.swap_memory:
print("ERROR: this test requires a minimum of %u KB of swap "
"memory (%u configured)" % (
required_memory - self.system_memory,
self.swap_memory), file=sys.stderr)
print("Testing memory with %u processes" % processes)
print("Running threaded memory test:")
run_time = 60 # sec.
if not self.run_processes(processes, "%s -qpv -m%um -t%u" % (
self.threaded_memtest_script, self.process_memory, run_time)):
print("Multi-process, threaded memory Test FAILED",
file=sys.stderr)
return False
return True
def run_threaded_memory_test(self):
# single-process threaded test
print("Starting Threaded Memory Test")
# run for Free Memory plus the lessor of 5% or 1GB
memory = (self.free_memory * 5) / 100
if memory > 1024: # MB
memory = 1024 # MB
memory = memory + self.free_memory
print("Running for %d MB total memory" % memory)
# run a test that will swap
if not self.swap_memory == 0:
# is there enough swap memory for the test?
if memory > self.system_memory + self.swap_memory:
print("ERROR: this test requires a minimum of %u KB of swap "
"memory (%u configured)"
% (memory - self.system_memory, self.swap_memory),
file=sys.stderr)
return False
# otherwise
run_time = 60 # sec.
print("Running for more than free memory at %u MB for %u sec." % (
memory, run_time))
command = "%s -qpv -m%um -t%u" % (
self.threaded_memtest_script, memory, run_time)
print("Command is: %s" % command)
process = self._command(command)
process.communicate()
if process.returncode != 0:
print("%s returned code %s" % (self.threaded_memtest_script,
process.returncode), file=sys.stderr)
print("More Than Free Memory Test failed", file=sys.stderr)
return False
print("More than free memory test complete.")
# run again for 15 minutes
print("Running for free memory")
process = self._command("%s -qpv" % self.threaded_memtest_script)
process.communicate()
if process.returncode != 0:
print("Free Memory Test failed", file=sys.stderr)
else:
print("Free Memory Test succeeded")
sys.stdout.flush()
return (process.returncode == 0)
def run_processes(self, number, command):
passed = True
pipe = []
for i in range(number):
pipe.append(self._command(command))
print("Started: process %u pid %u: %s" % (i, pipe[i].pid, command))
sys.stdout.flush()
waiting = True
while waiting:
waiting = False
for i in range(number):
if pipe[i]:
line = pipe[i].communicate()[0]
if line and len(line) > 1:
print("process %u pid %u: %s" % (i, pipe[i].pid, line))
sys.stdout.flush()
if pipe[i].poll() == -1:
waiting = True
else:
return_value = pipe[i].poll()
if return_value != 0:
print("ERROR: process %u pid %u retuned %u"
% (i, pipe[i].pid, return_value),
file=sys.stderr)
passed = False
print("process %u pid %u returned success"
% (i, pipe[i].pid))
pipe[i] = None
sys.stdout.flush()
return passed
def main(args):
parser = ArgumentParser()
parser.add_argument("-q", "--quiet", action="store_true",
help="Suppress output.")
args = parser.parse_args(args)
if args.quiet:
sys.stdout = open(os.devnull, 'a')
sys.stderr = open(os.devnull, 'a')
test = MemoryTest()
return test.run()
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
|