|
| 1 | +import unittest |
| 2 | +from test.support import import_helper, threading_helper |
| 3 | +import random |
| 4 | +from threading import Thread, Barrier |
| 5 | + |
| 6 | +py_bisect = import_helper.import_fresh_module('bisect', blocked=['_bisect']) |
| 7 | +c_bisect = import_helper.import_fresh_module('bisect', fresh=['_bisect']) |
| 8 | + |
| 9 | + |
| 10 | +NTHREADS = 4 |
| 11 | +OBJECT_COUNT = 500 |
| 12 | + |
| 13 | + |
| 14 | +class TestBase: |
| 15 | + def do_racing_insort(self, insert_method): |
| 16 | + def insert(data): |
| 17 | + for _ in range(OBJECT_COUNT): |
| 18 | + x = random.randint(-OBJECT_COUNT, OBJECT_COUNT) |
| 19 | + insert_method(data, x) |
| 20 | + |
| 21 | + data = list(range(OBJECT_COUNT)) |
| 22 | + self.run_concurrently( |
| 23 | + worker_func=insert, args=(data,), nthreads=NTHREADS |
| 24 | + ) |
| 25 | + if False: |
| 26 | + # These functions are not thread-safe and so the list can become |
| 27 | + # unsorted. However, we don't want Python to crash if these |
| 28 | + # functions are used concurrently on the same sequence. This |
| 29 | + # should also not produce any TSAN warnings. |
| 30 | + self.assertTrue(self.is_sorted_ascending(data)) |
| 31 | + |
| 32 | + def test_racing_insert_right(self): |
| 33 | + self.do_racing_insort(self.mod.insort_right) |
| 34 | + |
| 35 | + def test_racing_insert_left(self): |
| 36 | + self.do_racing_insort(self.mod.insort_left) |
| 37 | + |
| 38 | + @staticmethod |
| 39 | + def is_sorted_ascending(lst): |
| 40 | + """ |
| 41 | + Check if the list is sorted in ascending order (non-decreasing). |
| 42 | + """ |
| 43 | + return all(lst[i - 1] <= lst[i] for i in range(1, len(lst))) |
| 44 | + |
| 45 | + def run_concurrently(self, worker_func, args, nthreads): |
| 46 | + """ |
| 47 | + Run the worker function concurrently in multiple threads. |
| 48 | + """ |
| 49 | + barrier = Barrier(nthreads) |
| 50 | + |
| 51 | + def wrapper_func(*args): |
| 52 | + # Wait for all threads to reach this point before proceeding. |
| 53 | + barrier.wait() |
| 54 | + worker_func(*args) |
| 55 | + |
| 56 | + with threading_helper.catch_threading_exception() as cm: |
| 57 | + workers = ( |
| 58 | + Thread(target=wrapper_func, args=args) |
| 59 | + for _ in range(nthreads) |
| 60 | + ) |
| 61 | + with threading_helper.start_threads(workers): |
| 62 | + pass |
| 63 | + |
| 64 | + # Worker threads should not raise any exceptions |
| 65 | + self.assertIsNone(cm.exc_value) |
| 66 | + |
| 67 | + |
| 68 | +@threading_helper.requires_working_threading() |
| 69 | +class TestPyBisect(unittest.TestCase, TestBase): |
| 70 | + mod = py_bisect |
| 71 | + |
| 72 | + |
| 73 | +@threading_helper.requires_working_threading() |
| 74 | +class TestCBisect(unittest.TestCase, TestBase): |
| 75 | + mod = c_bisect |
| 76 | + |
| 77 | + |
| 78 | +if __name__ == "__main__": |
| 79 | + unittest.main() |
0 commit comments