X Tutup
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cspell.dict/cpython.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fromlist
heaptype
HIGHRES
IMMUTABLETYPE
ismine
Itertool
keeped
kwonlyarg
Expand All @@ -40,6 +41,7 @@ lsprof
maxdepth
mult
multibytecodec
newsemlockobject
nkwargs
noraise
numer
Expand Down
113 changes: 89 additions & 24 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from test.support import socket_helper
from test.support import threading_helper
from test.support import warnings_helper

from test.support import subTests

# Skip tests if _multiprocessing wasn't built.
_multiprocessing = import_helper.import_module('_multiprocessing')
Expand Down Expand Up @@ -1109,7 +1109,7 @@ def test_put(self):
@classmethod
def _test_get(cls, queue, child_can_start, parent_can_continue):
child_can_start.wait()
#queue.put(1)
queue.put(1)
queue.put(2)
queue.put(3)
queue.put(4)
Expand All @@ -1133,15 +1133,16 @@ def test_get(self):
child_can_start.set()
parent_can_continue.wait()

time.sleep(DELTA)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if not queue_empty(queue):
break
self.assertEqual(queue_empty(queue), False)

# Hangs unexpectedly, remove for now
#self.assertEqual(queue.get(), 1)
self.assertEqual(queue.get_nowait(), 1)
self.assertEqual(queue.get(True, None), 2)
self.assertEqual(queue.get(True), 3)
self.assertEqual(queue.get(timeout=1), 4)
self.assertEqual(queue.get_nowait(), 5)
self.assertEqual(queue.get(), 5)

self.assertEqual(queue_empty(queue), True)

Expand Down Expand Up @@ -2970,6 +2971,8 @@ def test_map_no_failfast(self):
# check that we indeed waited for all jobs
self.assertGreater(time.monotonic() - t_start, 0.9)

# TODO: RUSTPYTHON - reference counting differences
@unittest.skip("TODO: RUSTPYTHON")
def test_release_task_refs(self):
# Issue #29861: task arguments and results should not be kept
# alive after we are done with them.
Expand Down Expand Up @@ -3882,6 +3885,8 @@ def _remote(cls, conn):

conn.close()

# TODO: RUSTPYTHON - hangs
@unittest.skip("TODO: RUSTPYTHON")
def test_pickling(self):
families = self.connection.families

Expand Down Expand Up @@ -4051,6 +4056,8 @@ def test_heap(self):
self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
self.assertEqual(len(heap._len_to_seq), 0)

# TODO: RUSTPYTHON - gc.enable() not implemented
@unittest.expectedFailure
def test_free_from_gc(self):
# Check that freeing of blocks by the garbage collector doesn't deadlock
# (issue #12352).
Expand Down Expand Up @@ -4103,6 +4110,8 @@ def _double(cls, x, y, z, foo, arr, string):
for i in range(len(arr)):
arr[i] *= 2

# TODO: RUSTPYTHON - ctypes Structure shared memory not working
@unittest.expectedFailure
def test_sharedctypes(self, lock=False):
x = Value('i', 7, lock=lock)
y = Value(c_double, 1.0/3.0, lock=lock)
Expand All @@ -4126,6 +4135,8 @@ def test_sharedctypes(self, lock=False):
self.assertAlmostEqual(arr[i], i*2)
self.assertEqual(string.value, latin('hellohello'))

# TODO: RUSTPYTHON - calls test_sharedctypes which fails
@unittest.expectedFailure
def test_synchronize(self):
self.test_sharedctypes(lock=True)

Expand All @@ -4140,6 +4151,19 @@ def test_copy(self):
self.assertEqual(bar.z, 2 ** 33)


def resource_tracker_format_subtests(func):
"""Run given test using both resource tracker communication formats"""
def _inner(self, *args, **kwargs):
tracker = resource_tracker._resource_tracker
for use_simple_format in False, True:
with (
self.subTest(use_simple_format=use_simple_format),
unittest.mock.patch.object(
tracker, '_use_simple_format', use_simple_format)
):
func(self, *args, **kwargs)
return _inner

@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
@hashlib_helper.requires_hashdigest('sha256')
class _TestSharedMemory(BaseTestCase):
Expand Down Expand Up @@ -4417,6 +4441,7 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
smm.shutdown()

@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
@resource_tracker_format_subtests
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
# bpo-36867: test that a SharedMemoryManager uses the
# same resource_tracker process as its parent.
Expand Down Expand Up @@ -4667,6 +4692,7 @@ def test_shared_memory_cleaned_after_process_termination(self):
"shared_memory objects to clean up at shutdown", err)

@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
@resource_tracker_format_subtests
def test_shared_memory_untracking(self):
# gh-82300: When a separate Python process accesses shared memory
# with track=False, it must not cause the memory to be deleted
Expand Down Expand Up @@ -4694,6 +4720,7 @@ def test_shared_memory_untracking(self):
mem.close()

@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
@resource_tracker_format_subtests
def test_shared_memory_tracking(self):
# gh-82300: When a separate Python process accesses shared memory
# with track=True, it must cause the memory to be deleted when
Expand Down Expand Up @@ -4787,6 +4814,8 @@ def test_finalize(self):
result = [obj for obj in iter(conn.recv, 'STOP')]
self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])

# TODO: RUSTPYTHON - gc.get_threshold() and gc.set_threshold() not implemented
@unittest.expectedFailure
@support.requires_resource('cpu')
def test_thread_safety(self):
# bpo-24484: _run_finalizers() should be thread-safe
Expand Down Expand Up @@ -5414,6 +5443,8 @@ def run_in_child(cls, start_method):
flags = (tuple(sys.flags), grandchild_flags)
print(json.dumps(flags))

# TODO: RUSTPYTHON - SyntaxError in subprocess after fork
@unittest.expectedFailure
def test_flags(self):
import json
# start child process using unusual flags
Expand Down Expand Up @@ -6457,28 +6488,13 @@ def test_std_streams_flushed_after_preload(self):
if multiprocessing.get_start_method() != "forkserver":
self.skipTest("forkserver specific test")

# Create a test module in the temporary directory on the child's path
# TODO: This can all be simplified once gh-126631 is fixed and we can
# use __main__ instead of a module.
dirname = os.path.join(self._temp_dir, 'preloaded_module')
init_name = os.path.join(dirname, '__init__.py')
os.mkdir(dirname)
with open(init_name, "w") as f:
cmd = '''if 1:
import sys
print('stderr', end='', file=sys.stderr)
print('stdout', end='', file=sys.stdout)
'''
f.write(cmd)

name = os.path.join(os.path.dirname(__file__), 'mp_preload_flush.py')
env = {'PYTHONPATH': self._temp_dir}
_, out, err = test.support.script_helper.assert_python_ok(name, **env)
_, out, err = test.support.script_helper.assert_python_ok(name)

# Check stderr first, as it is more likely to be useful to see in the
# event of a failure.
self.assertEqual(err.decode().rstrip(), 'stderr')
self.assertEqual(out.decode().rstrip(), 'stdout')
self.assertEqual(err.decode().rstrip(), '__main____mp_main__')
self.assertEqual(out.decode().rstrip(), '__main____mp_main__')


class MiscTestCase(unittest.TestCase):
Expand Down Expand Up @@ -6804,3 +6820,52 @@ class SemLock(_multiprocessing.SemLock):
name = f'test_semlock_subclass-{os.getpid()}'
s = SemLock(1, 0, 10, name, False)
_multiprocessing.sem_unlink(name)


@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
class TestSharedMemoryNames(unittest.TestCase):
@subTests('use_simple_format', (True, False))
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
self, use_simple_format):
# Test script that creates and cleans up shared memory with colon in name
test_script = textwrap.dedent("""
import sys
from multiprocessing import shared_memory
from multiprocessing import resource_tracker
import time

resource_tracker._resource_tracker._use_simple_format = %s

# Test various patterns of colons in names
test_names = [
"a:b",
"a:b:c",
"test:name:with:many:colons",
":starts:with:colon",
"ends:with:colon:",
"::double::colons::",
"name\\nwithnewline",
"name-with-trailing-newline\\n",
"\\nname-starts-with-newline",
"colons:and\\nnewlines:mix",
"multi\\nline\\nname",
]

for name in test_names:
try:
shm = shared_memory.SharedMemory(create=True, size=100, name=name)
shm.buf[:5] = b'hello' # Write something to the shared memory
shm.close()
shm.unlink()

except Exception as e:
print(f"Error with name '{name}': {e}", file=sys.stderr)
sys.exit(1)

print("SUCCESS")
""" % use_simple_format)

rc, out, err = script_helper.assert_python_ok("-c", test_script)
self.assertIn(b"SUCCESS", out)
self.assertNotIn(b"traceback", err.lower(), err)
self.assertNotIn(b"resource_tracker.py", err, err)
18 changes: 18 additions & 0 deletions Lib/test/mp_fork_bomb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import multiprocessing, sys

def foo():
print("123")

# Because "if __name__ == '__main__'" is missing this will not work
# correctly on Windows. However, we should get a RuntimeError rather
# than the Windows equivalent of a fork bomb.

if len(sys.argv) > 1:
multiprocessing.set_start_method(sys.argv[1])
else:
multiprocessing.set_start_method('spawn')

p = multiprocessing.Process(target=foo)
p.start()
p.join()
sys.exit(p.exitcode)
18 changes: 18 additions & 0 deletions Lib/test/mp_preload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import multiprocessing

multiprocessing.Lock()


def f():
print("ok")


if __name__ == "__main__":
ctx = multiprocessing.get_context("forkserver")
modname = "test.mp_preload"
# Make sure it's importable
__import__(modname)
ctx.set_forkserver_preload([modname])
proc = ctx.Process(target=f)
proc.start()
proc.join()
11 changes: 11 additions & 0 deletions Lib/test/mp_preload_flush.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import multiprocessing
import sys

print(__name__, end='', file=sys.stderr)
print(__name__, end='', file=sys.stdout)
if __name__ == '__main__':
multiprocessing.set_start_method('forkserver')
for _ in range(2):
p = multiprocessing.Process()
p.start()
p.join()
3 changes: 1 addition & 2 deletions Lib/test/test_importlib/test_threaded_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ def test_concurrent_futures_circular_import(self):
'partial', 'cfimport.py')
script_helper.assert_python_ok(fn)

@unittest.skipUnless(hasattr(_multiprocessing, "SemLock"), "TODO: RUSTPYTHON, pool_in_threads.py needs _multiprocessing.SemLock")
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.skip("TODO: RUSTPYTHON - fails on Linux due to multiprocessing issues")
def test_multiprocessing_pool_circular_import(self):
# Regression test for bpo-41567
fn = os.path.join(os.path.dirname(__file__),
Expand Down
6 changes: 4 additions & 2 deletions Lib/test/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4058,7 +4058,8 @@ def _mpinit_issue121723(qspec, message_to_log):
# log a message (this creates a record put in the queue)
logging.getLogger().info(message_to_log)

@unittest.expectedFailure # TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
# TODO: RUSTPYTHON - SemLock not implemented on Windows
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@skip_if_tsan_fork
@support.requires_subprocess()
def test_multiprocessing_queues(self):
Expand Down Expand Up @@ -4118,7 +4119,8 @@ def test_90195(self):
# Logger should be enabled, since explicitly mentioned
self.assertFalse(logger.disabled)

@unittest.expectedFailure # TODO: RUSTPYTHON; ImportError: cannot import name 'SemLock'
# TODO: RUSTPYTHON - SemLock not implemented on Windows
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_111615(self):
# See gh-111615
import_helper.import_module('_multiprocessing') # see gh-113692
Expand Down
Loading
Loading
X Tutup