diff options
Diffstat (limited to 'python/futures/test_futures.py')
-rw-r--r-- | python/futures/test_futures.py | 724 |
1 files changed, 724 insertions, 0 deletions
diff --git a/python/futures/test_futures.py b/python/futures/test_futures.py new file mode 100644 index 000000000..ace340cb0 --- /dev/null +++ b/python/futures/test_futures.py @@ -0,0 +1,724 @@ +import os +import subprocess +import sys +import threading +import functools +import contextlib +import logging +import re +import time +from StringIO import StringIO +from test import test_support + +from concurrent import futures +from concurrent.futures._base import ( + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + +try: + import unittest2 as unittest +except ImportError: + import unittest + + +def reap_threads(func): + """Use this function when threads are being used. This will + ensure that the threads are cleaned up even when the test fails. + If threading is unavailable this function does nothing. + """ + @functools.wraps(func) + def decorator(*args): + key = test_support.threading_setup() + try: + return func(*args) + finally: + test_support.threading_cleanup(*key) + return decorator + + +# Executing the interpreter in a subprocess +def _assert_python(expected_success, *args, **env_vars): + cmd_line = [sys.executable] + if not env_vars: + cmd_line.append('-E') + # Need to preserve the original environment, for in-place testing of + # shared library builds. + env = os.environ.copy() + # But a special flag that can be set to override -- in this case, the + # caller is responsible to pass the full environment. + if env_vars.pop('__cleanenv', None): + env = {} + env.update(env_vars) + cmd_line.extend(args) + p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + env=env) + try: + out, err = p.communicate() + finally: + subprocess._cleanup() + p.stdout.close() + p.stderr.close() + rc = p.returncode + err = strip_python_stderr(err) + if (rc and expected_success) or (not rc and not expected_success): + raise AssertionError( + "Process return code is %d, " + "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore'))) + return rc, out, err + + +def assert_python_ok(*args, **env_vars): + """ + Assert that running the interpreter with `args` and optional environment + variables `env_vars` is ok and return a (return code, stdout, stderr) tuple. + """ + return _assert_python(True, *args, **env_vars) + + +def strip_python_stderr(stderr): + """Strip the stderr of a Python process from potential debug output + emitted by the interpreter. + + This will typically be run on the result of the communicate() method + of a subprocess.Popen object. + """ + stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip() + return stderr + + +@contextlib.contextmanager +def captured_stderr(): + """Return a context manager used by captured_stdout/stdin/stderr + that temporarily replaces the sys stream *stream_name* with a StringIO.""" + logging_stream = StringIO() + handler = logging.StreamHandler(logging_stream) + logging.root.addHandler(handler) + + try: + yield logging_stream + finally: + logging.root.removeHandler(handler) + + +def create_future(state=PENDING, exception=None, result=None): + f = Future() + f._state = state + f._exception = exception + f._result = result + return f + + +PENDING_FUTURE = create_future(state=PENDING) +RUNNING_FUTURE = create_future(state=RUNNING) +CANCELLED_FUTURE = create_future(state=CANCELLED) +CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) +EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) +SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) + + +def mul(x, y): + return x * y + + +def sleep_and_raise(t): + time.sleep(t) + raise Exception('this is an exception') + +def sleep_and_print(t, msg): + time.sleep(t) + print(msg) + sys.stdout.flush() + + +class ExecutorMixin: + worker_count = 5 + + def setUp(self): + self.t1 = time.time() + try: + self.executor = self.executor_type(max_workers=self.worker_count) + except NotImplementedError: + e = sys.exc_info()[1] + self.skipTest(str(e)) + self._prime_executor() + + def tearDown(self): + self.executor.shutdown(wait=True) + dt = time.time() - self.t1 + if test_support.verbose: + print("%.2fs" % dt) + self.assertLess(dt, 60, "synchronization issue: test lasted too long") + + def _prime_executor(self): + # Make sure that the executor is ready to do work before running the + # tests. This should reduce the probability of timeouts in the tests. + futures = [self.executor.submit(time.sleep, 0.1) + for _ in range(self.worker_count)] + + for f in futures: + f.result() + + +class ThreadPoolMixin(ExecutorMixin): + executor_type = futures.ThreadPoolExecutor + + +class ProcessPoolMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor + + +class ExecutorShutdownTest(unittest.TestCase): + def test_run_after_shutdown(self): + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + + def test_interpreter_shutdown(self): + # Test the atexit hook for shutdown of worker threads and processes + rc, out, err = assert_python_ok('-c', """if 1: + from concurrent.futures import %s + from time import sleep + from test_futures import sleep_and_print + t = %s(5) + t.submit(sleep_and_print, 1.0, "apple") + """ % (self.executor_type.__name__, self.executor_type.__name__)) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertFalse(err) + self.assertEqual(out.strip(), "apple".encode()) + + def test_hang_issue12364(self): + fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] + self.executor.shutdown() + for f in fs: + f.result() + + +class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): + def _prime_executor(self): + pass + + def test_threads_terminate(self): + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) + self.assertEqual(len(self.executor._threads), 3) + self.executor.shutdown() + for t in self.executor._threads: + t.join() + + def test_context_manager_shutdown(self): + with futures.ThreadPoolExecutor(max_workers=5) as e: + executor = e + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for t in executor._threads: + t.join() + + def test_del_shutdown(self): + executor = futures.ThreadPoolExecutor(max_workers=5) + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + + for t in threads: + t.join() + + +class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): + def _prime_executor(self): + pass + + def test_processes_terminate(self): + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) + self.assertEqual(len(self.executor._processes), 5) + processes = self.executor._processes + self.executor.shutdown() + + for p in processes: + p.join() + + def test_context_manager_shutdown(self): + with futures.ProcessPoolExecutor(max_workers=5) as e: + processes = e._processes + self.assertEqual(list(e.map(abs, range(-5, 5))), + [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) + + for p in processes: + p.join() + + def test_del_shutdown(self): + executor = futures.ProcessPoolExecutor(max_workers=5) + list(executor.map(abs, range(-5, 5))) + queue_management_thread = executor._queue_management_thread + processes = executor._processes + del executor + + queue_management_thread.join() + for p in processes: + p.join() + + +class WaitTests(unittest.TestCase): + + def test_first_completed(self): + future1 = self.executor.submit(mul, 21, 2) + future2 = self.executor.submit(time.sleep, 1.5) + + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) + + self.assertEqual(set([future1]), done) + self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) + + def test_first_completed_some_already_completed(self): + future1 = self.executor.submit(time.sleep, 1.5) + + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) + + self.assertEqual( + set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), + finished) + self.assertEqual(set([future1]), pending) + + def test_first_exception(self): + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(sleep_and_raise, 1.5) + future3 = self.executor.submit(time.sleep, 3) + + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([future1, future2]), finished) + self.assertEqual(set([future3]), pending) + + def test_first_exception_some_already_complete(self): + future1 = self.executor.submit(divmod, 21, 0) + future2 = self.executor.submit(time.sleep, 1.5) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) + + def test_first_exception_one_already_failed(self): + future1 = self.executor.submit(time.sleep, 2) + + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) + + self.assertEqual(set([EXCEPTION_FUTURE]), finished) + self.assertEqual(set([future1]), pending) + + def test_all_completed(self): + future1 = self.executor.submit(divmod, 2, 0) + future2 = self.executor.submit(mul, 2, 21) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2], + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + future1, + future2]), finished) + self.assertEqual(set(), pending) + + def test_timeout(self): + future1 = self.executor.submit(mul, 6, 7) + future2 = self.executor.submit(time.sleep, 3) + + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=1.5, + return_when=futures.ALL_COMPLETED) + + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEqual(set([future2]), pending) + + +class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): + + def test_pending_calls_race(self): + # Issue #14406: multi-threaded race condition when waiting on all + # futures. + event = threading.Event() + def future_func(): + event.wait() + oldswitchinterval = sys.getcheckinterval() + sys.setcheckinterval(1) + try: + fs = set(self.executor.submit(future_func) for i in range(100)) + event.set() + futures.wait(fs, return_when=futures.ALL_COMPLETED) + finally: + sys.setcheckinterval(oldswitchinterval) + + +class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): + pass + + +class AsCompletedTests(unittest.TestCase): + # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. + def test_no_timeout(self): + future1 = self.executor.submit(mul, 2, 21) + future2 = self.executor.submit(mul, 7, 6) + + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEqual(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) + + def test_zero_timeout(self): + future1 = self.executor.submit(time.sleep, 2) + completed_futures = set() + try: + for future in futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass + + self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) + + def test_duplicate_futures(self): + # Issue 20367. Duplicate futures should not raise exceptions or give + # duplicate responses. + future1 = self.executor.submit(time.sleep, 2) + completed = [f for f in futures.as_completed([future1,future1])] + self.assertEqual(len(completed), 1) + + +class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): + pass + + +class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): + pass + + +class ExecutorTest(unittest.TestCase): + # Executor.shutdown() and context manager usage is tested by + # ExecutorShutdownTest. + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEqual(256, future.result()) + + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEqual(16, future.result()) + + def test_map(self): + self.assertEqual( + list(self.executor.map(pow, range(10), range(10))), + list(map(pow, range(10), range(10)))) + + def test_map_exception(self): + i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) + self.assertEqual(next(i), (0, 1)) + self.assertEqual(next(i), (0, 1)) + self.assertRaises(ZeroDivisionError, next, i) + + def test_map_timeout(self): + results = [] + try: + for i in self.executor.map(time.sleep, + [0, 0, 3], + timeout=1.5): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + + self.assertEqual([None, None], results) + + +class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): + def test_map_submits_without_iteration(self): + """Tests verifying issue 11777.""" + finished = [] + def record_finished(n): + finished.append(n) + + self.executor.map(record_finished, range(10)) + self.executor.shutdown(wait=True) + self.assertEqual(len(finished), 10) + + +class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): + pass + + +class FutureTests(unittest.TestCase): + def test_done_callback_with_result(self): + callback_result = [None] + def fn(callback_future): + callback_result[0] = callback_future.result() + + f = Future() + f.add_done_callback(fn) + f.set_result(5) + self.assertEqual(5, callback_result[0]) + + def test_done_callback_with_exception(self): + callback_exception = [None] + def fn(callback_future): + callback_exception[0] = callback_future.exception() + + f = Future() + f.add_done_callback(fn) + f.set_exception(Exception('test')) + self.assertEqual(('test',), callback_exception[0].args) + + def test_done_callback_with_cancel(self): + was_cancelled = [None] + def fn(callback_future): + was_cancelled[0] = callback_future.cancelled() + + f = Future() + f.add_done_callback(fn) + self.assertTrue(f.cancel()) + self.assertTrue(was_cancelled[0]) + + def test_done_callback_raises(self): + with captured_stderr() as stderr: + raising_was_called = [False] + fn_was_called = [False] + + def raising_fn(callback_future): + raising_was_called[0] = True + raise Exception('doh!') + + def fn(callback_future): + fn_was_called[0] = True + + f = Future() + f.add_done_callback(raising_fn) + f.add_done_callback(fn) + f.set_result(5) + self.assertTrue(raising_was_called) + self.assertTrue(fn_was_called) + self.assertIn('Exception: doh!', stderr.getvalue()) + + def test_done_callback_already_successful(self): + callback_result = [None] + def fn(callback_future): + callback_result[0] = callback_future.result() + + f = Future() + f.set_result(5) + f.add_done_callback(fn) + self.assertEqual(5, callback_result[0]) + + def test_done_callback_already_failed(self): + callback_exception = [None] + def fn(callback_future): + callback_exception[0] = callback_future.exception() + + f = Future() + f.set_exception(Exception('test')) + f.add_done_callback(fn) + self.assertEqual(('test',), callback_exception[0].args) + + def test_done_callback_already_cancelled(self): + was_cancelled = [None] + def fn(callback_future): + was_cancelled[0] = callback_future.cancelled() + + f = Future() + self.assertTrue(f.cancel()) + f.add_done_callback(fn) + self.assertTrue(was_cancelled[0]) + + def test_repr(self): + self.assertRegexpMatches(repr(PENDING_FUTURE), + '<Future at 0x[0-9a-f]+ state=pending>') + self.assertRegexpMatches(repr(RUNNING_FUTURE), + '<Future at 0x[0-9a-f]+ state=running>') + self.assertRegexpMatches(repr(CANCELLED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), + '<Future at 0x[0-9a-f]+ state=cancelled>') + self.assertRegexpMatches( + repr(EXCEPTION_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished raised IOError>') + self.assertRegexpMatches( + repr(SUCCESSFUL_FUTURE), + '<Future at 0x[0-9a-f]+ state=finished returned int>') + + def test_cancel(self): + f1 = create_future(state=PENDING) + f2 = create_future(state=RUNNING) + f3 = create_future(state=CANCELLED) + f4 = create_future(state=CANCELLED_AND_NOTIFIED) + f5 = create_future(state=FINISHED, exception=IOError()) + f6 = create_future(state=FINISHED, result=5) + + self.assertTrue(f1.cancel()) + self.assertEqual(f1._state, CANCELLED) + + self.assertFalse(f2.cancel()) + self.assertEqual(f2._state, RUNNING) + + self.assertTrue(f3.cancel()) + self.assertEqual(f3._state, CANCELLED) + + self.assertTrue(f4.cancel()) + self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) + + self.assertFalse(f5.cancel()) + self.assertEqual(f5._state, FINISHED) + + self.assertFalse(f6.cancel()) + self.assertEqual(f6._state, FINISHED) + + def test_cancelled(self): + self.assertFalse(PENDING_FUTURE.cancelled()) + self.assertFalse(RUNNING_FUTURE.cancelled()) + self.assertTrue(CANCELLED_FUTURE.cancelled()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) + self.assertFalse(EXCEPTION_FUTURE.cancelled()) + self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) + + def test_done(self): + self.assertFalse(PENDING_FUTURE.done()) + self.assertFalse(RUNNING_FUTURE.done()) + self.assertTrue(CANCELLED_FUTURE.done()) + self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) + self.assertTrue(EXCEPTION_FUTURE.done()) + self.assertTrue(SUCCESSFUL_FUTURE.done()) + + def test_running(self): + self.assertFalse(PENDING_FUTURE.running()) + self.assertTrue(RUNNING_FUTURE.running()) + self.assertFalse(CANCELLED_FUTURE.running()) + self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) + self.assertFalse(EXCEPTION_FUTURE.running()) + self.assertFalse(SUCCESSFUL_FUTURE.running()) + + def test_result_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.result, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.result, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) + self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) + self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) + + def test_result_with_success(self): + # TODO(brian@sweetapp.com): This test is timing dependant. + def notification(): + # Wait until the main thread is waiting for the result. + time.sleep(1) + f1.set_result(42) + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertEqual(f1.result(timeout=5), 42) + + def test_result_with_cancel(self): + # TODO(brian@sweetapp.com): This test is timing dependant. + def notification(): + # Wait until the main thread is waiting for the result. + time.sleep(1) + f1.cancel() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertRaises(futures.CancelledError, f1.result, timeout=5) + + def test_exception_with_timeout(self): + self.assertRaises(futures.TimeoutError, + PENDING_FUTURE.exception, timeout=0) + self.assertRaises(futures.TimeoutError, + RUNNING_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_FUTURE.exception, timeout=0) + self.assertRaises(futures.CancelledError, + CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) + self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), + IOError)) + self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) + + def test_exception_with_success(self): + def notification(): + # Wait until the main thread is waiting for the exception. + time.sleep(1) + with f1._condition: + f1._state = FINISHED + f1._exception = IOError() + f1._condition.notify_all() + + f1 = create_future(state=PENDING) + t = threading.Thread(target=notification) + t.start() + + self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) + +@reap_threads +def test_main(): + try: + test_support.run_unittest(ProcessPoolExecutorTest, + ThreadPoolExecutorTest, + ProcessPoolWaitTests, + ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, + FutureTests, + ProcessPoolShutdownTest, + ThreadPoolShutdownTest) + finally: + test_support.reap_children() + +if __name__ == "__main__": + test_main() |