diff options
Diffstat (limited to 'python/psutil/test/_windows.py')
-rw-r--r-- | python/psutil/test/_windows.py | 464 |
1 files changed, 464 insertions, 0 deletions
diff --git a/python/psutil/test/_windows.py b/python/psutil/test/_windows.py new file mode 100644 index 0000000000..b7477bfeb4 --- /dev/null +++ b/python/psutil/test/_windows.py @@ -0,0 +1,464 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Windows specific tests. These are implicitly run by test_psutil.py.""" + +import errno +import os +import platform +import signal +import subprocess +import sys +import time +import traceback + +from test_psutil import APPVEYOR, WINDOWS +from test_psutil import get_test_subprocess, reap_children, unittest + +import mock +try: + import wmi +except ImportError: + wmi = None +try: + import win32api + import win32con +except ImportError: + win32api = win32con = None + +from psutil._compat import PY3, callable, long +import psutil + + +cext = psutil._psplatform.cext + + +def wrap_exceptions(fun): + def wrapper(self, *args, **kwargs): + try: + return fun(self, *args, **kwargs) + except OSError as err: + from psutil._pswindows import ACCESS_DENIED_SET + if err.errno in ACCESS_DENIED_SET: + raise psutil.AccessDenied(None, None) + if err.errno == errno.ESRCH: + raise psutil.NoSuchProcess(None, None) + raise + return wrapper + + +@unittest.skipUnless(WINDOWS, "not a Windows system") +class WindowsSpecificTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_issue_24(self): + p = psutil.Process(0) + self.assertRaises(psutil.AccessDenied, p.kill) + + def test_special_pid(self): + p = psutil.Process(4) + self.assertEqual(p.name(), 'System') + # use __str__ to access all common Process properties to check + # that nothing strange happens + str(p) + p.username() + self.assertTrue(p.create_time() >= 0.0) + try: + rss, vms = p.memory_info() + except psutil.AccessDenied: + # expected on Windows Vista and Windows 7 + if not platform.uname()[1] in ('vista', 'win-7', 'win7'): + raise + else: + self.assertTrue(rss > 0) + + def test_send_signal(self): + p = psutil.Process(self.pid) + self.assertRaises(ValueError, p.send_signal, signal.SIGINT) + + def test_nic_names(self): + p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) + out = p.communicate()[0] + if PY3: + out = str(out, sys.stdout.encoding) + nics = psutil.net_io_counters(pernic=True).keys() + for nic in nics: + if "pseudo-interface" in nic.replace(' ', '-').lower(): + continue + if nic not in out: + self.fail( + "%r nic wasn't found in 'ipconfig /all' output" % nic) + + def test_exe(self): + for p in psutil.process_iter(): + try: + self.assertEqual(os.path.basename(p.exe()), p.name()) + except psutil.Error: + pass + + # --- Process class tests + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_name(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(p.name(), w.Caption) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_exe(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + # Note: wmi reports the exe as a lower case string. + # Being Windows paths case-insensitive we ignore that. + self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_cmdline(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(' '.join(p.cmdline()), + w.CommandLine.replace('"', '')) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_username(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + domain, _, username = w.GetOwner() + username = "%s\\%s" % (domain, username) + self.assertEqual(p.username(), username) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_rss_memory(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + rss = p.memory_info().rss + self.assertEqual(rss, int(w.WorkingSetSize)) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_vms_memory(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + vms = p.memory_info().vms + # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx + # ...claims that PageFileUsage is represented in Kilo + # bytes but funnily enough on certain platforms bytes are + # returned instead. + wmi_usage = int(w.PageFileUsage) + if (vms != wmi_usage) and (vms != wmi_usage * 1024): + self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_create_time(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + wmic_create = str(w.CreationDate.split('.')[0]) + psutil_create = time.strftime("%Y%m%d%H%M%S", + time.localtime(p.create_time())) + self.assertEqual(wmic_create, psutil_create) + + # --- psutil namespace functions and constants tests + + @unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ, + 'NUMBER_OF_PROCESSORS env var is not available') + def test_cpu_count(self): + num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) + self.assertEqual(num_cpus, psutil.cpu_count()) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_total_phymem(self): + w = wmi.WMI().Win32_ComputerSystem()[0] + self.assertEqual(int(w.TotalPhysicalMemory), + psutil.virtual_memory().total) + + # @unittest.skipIf(wmi is None, "wmi module is not installed") + # def test__UPTIME(self): + # # _UPTIME constant is not public but it is used internally + # # as value to return for pid 0 creation time. + # # WMI behaves the same. + # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + # p = psutil.Process(0) + # wmic_create = str(w.CreationDate.split('.')[0]) + # psutil_create = time.strftime("%Y%m%d%H%M%S", + # time.localtime(p.create_time())) + # + + # Note: this test is not very reliable + @unittest.skipIf(wmi is None, "wmi module is not installed") + @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") + def test_pids(self): + # Note: this test might fail if the OS is starting/killing + # other processes in the meantime + w = wmi.WMI().Win32_Process() + wmi_pids = set([x.ProcessId for x in w]) + psutil_pids = set(psutil.pids()) + self.assertEqual(wmi_pids, psutil_pids) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_disks(self): + ps_parts = psutil.disk_partitions(all=True) + wmi_parts = wmi.WMI().Win32_LogicalDisk() + for ps_part in ps_parts: + for wmi_part in wmi_parts: + if ps_part.device.replace('\\', '') == wmi_part.DeviceID: + if not ps_part.mountpoint: + # this is usually a CD-ROM with no disk inserted + break + try: + usage = psutil.disk_usage(ps_part.mountpoint) + except OSError as err: + if err.errno == errno.ENOENT: + # usually this is the floppy + break + else: + raise + self.assertEqual(usage.total, int(wmi_part.Size)) + wmi_free = int(wmi_part.FreeSpace) + self.assertEqual(usage.free, wmi_free) + # 10 MB tollerance + if abs(usage.free - wmi_free) > 10 * 1024 * 1024: + self.fail("psutil=%s, wmi=%s" % ( + usage.free, wmi_free)) + break + else: + self.fail("can't find partition %s" % repr(ps_part)) + + @unittest.skipIf(win32api is None, "pywin32 module is not installed") + def test_num_handles(self): + p = psutil.Process(os.getpid()) + before = p.num_handles() + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, os.getpid()) + after = p.num_handles() + self.assertEqual(after, before + 1) + win32api.CloseHandle(handle) + self.assertEqual(p.num_handles(), before) + + @unittest.skipIf(win32api is None, "pywin32 module is not installed") + def test_num_handles_2(self): + # Note: this fails from time to time; I'm keen on thinking + # it doesn't mean something is broken + def call(p, attr): + attr = getattr(p, name, None) + if attr is not None and callable(attr): + attr() + else: + attr + + p = psutil.Process(self.pid) + failures = [] + for name in dir(psutil.Process): + if name.startswith('_') \ + or name in ('terminate', 'kill', 'suspend', 'resume', + 'nice', 'send_signal', 'wait', 'children', + 'as_dict'): + continue + else: + try: + call(p, name) + num1 = p.num_handles() + call(p, name) + num2 = p.num_handles() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + else: + if num2 > num1: + fail = \ + "failure while processing Process.%s method " \ + "(before=%s, after=%s)" % (name, num1, num2) + failures.append(fail) + if failures: + self.fail('\n' + '\n'.join(failures)) + + def test_name_always_available(self): + # On Windows name() is never supposed to raise AccessDenied, + # see https://github.com/giampaolo/psutil/issues/627 + for p in psutil.process_iter(): + try: + p.name() + except psutil.NoSuchProcess(): + pass + + +@unittest.skipUnless(WINDOWS, "not a Windows system") +class TestDualProcessImplementation(unittest.TestCase): + """ + Certain APIs on Windows have 2 internal implementations, one + based on documented Windows APIs, another one based + NtQuerySystemInformation() which gets called as fallback in + case the first fails because of limited permission error. + Here we test that the two methods return the exact same value, + see: + https://github.com/giampaolo/psutil/issues/304 + """ + + fun_names = [ + # function name, tolerance + ('proc_cpu_times', 0.2), + ('proc_create_time', 0.5), + ('proc_num_handles', 1), # 1 because impl #1 opens a handle + ('proc_memory_info', 1024), # KB + ('proc_io_counters', 0), + ] + + def test_compare_values(self): + def assert_ge_0(obj): + if isinstance(obj, tuple): + for value in obj: + self.assertGreaterEqual(value, 0, msg=obj) + elif isinstance(obj, (int, long, float)): + self.assertGreaterEqual(obj, 0) + else: + assert 0 # case not handled which needs to be fixed + + def compare_with_tolerance(ret1, ret2, tolerance): + if ret1 == ret2: + return + else: + if isinstance(ret2, (int, long, float)): + diff = abs(ret1 - ret2) + self.assertLessEqual(diff, tolerance) + elif isinstance(ret2, tuple): + for a, b in zip(ret1, ret2): + diff = abs(a - b) + self.assertLessEqual(diff, tolerance) + + from psutil._pswindows import ntpinfo + failures = [] + for p in psutil.process_iter(): + try: + nt = ntpinfo(*cext.proc_info(p.pid)) + except psutil.NoSuchProcess: + continue + assert_ge_0(nt) + + for name, tolerance in self.fun_names: + if name == 'proc_memory_info' and p.pid == os.getpid(): + continue + if name == 'proc_create_time' and p.pid in (0, 4): + continue + meth = wrap_exceptions(getattr(cext, name)) + try: + ret = meth(p.pid) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + # compare values + try: + if name == 'proc_cpu_times': + compare_with_tolerance(ret[0], nt.user_time, tolerance) + compare_with_tolerance(ret[1], + nt.kernel_time, tolerance) + elif name == 'proc_create_time': + compare_with_tolerance(ret, nt.create_time, tolerance) + elif name == 'proc_num_handles': + compare_with_tolerance(ret, nt.num_handles, tolerance) + elif name == 'proc_io_counters': + compare_with_tolerance(ret[0], nt.io_rcount, tolerance) + compare_with_tolerance(ret[1], nt.io_wcount, tolerance) + compare_with_tolerance(ret[2], nt.io_rbytes, tolerance) + compare_with_tolerance(ret[3], nt.io_wbytes, tolerance) + elif name == 'proc_memory_info': + try: + rawtupl = cext.proc_memory_info_2(p.pid) + except psutil.NoSuchProcess: + continue + compare_with_tolerance(ret, rawtupl, tolerance) + except AssertionError: + trace = traceback.format_exc() + msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % ( + trace, p.pid, name, ret, nt) + failures.append(msg) + break + + if failures: + self.fail('\n\n'.join(failures)) + + # --- + # same tests as above but mimicks the AccessDenied failure of + # the first (fast) method failing with AD. + # TODO: currently does not take tolerance into account. + + def test_name(self): + name = psutil.Process().name() + with mock.patch("psutil._psplatform.cext.proc_exe", + side_effect=psutil.AccessDenied(os.getpid())) as fun: + psutil.Process().name() == name + assert fun.called + + def test_memory_info(self): + mem = psutil.Process().memory_info() + with mock.patch("psutil._psplatform.cext.proc_memory_info", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().memory_info() == mem + assert fun.called + + def test_create_time(self): + ctime = psutil.Process().create_time() + with mock.patch("psutil._psplatform.cext.proc_create_time", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().create_time() == ctime + assert fun.called + + def test_cpu_times(self): + cpu_times = psutil.Process().cpu_times() + with mock.patch("psutil._psplatform.cext.proc_cpu_times", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().cpu_times() == cpu_times + assert fun.called + + def test_io_counters(self): + io_counters = psutil.Process().io_counters() + with mock.patch("psutil._psplatform.cext.proc_io_counters", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().io_counters() == io_counters + assert fun.called + + def test_num_handles(self): + io_counters = psutil.Process().io_counters() + with mock.patch("psutil._psplatform.cext.proc_io_counters", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().io_counters() == io_counters + assert fun.called + + # --- other tests + + def test_compare_name_exe(self): + for p in psutil.process_iter(): + try: + a = os.path.basename(p.exe()) + b = p.name() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + else: + self.assertEqual(a, b) + + def test_zombies(self): + # test that NPS is raised by the 2nd implementation in case a + # process no longer exists + ZOMBIE_PID = max(psutil.pids()) + 5000 + for name, _ in self.fun_names: + meth = wrap_exceptions(getattr(cext, name)) + self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) + test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) |