|
| 1 | +r""" |
| 2 | +Multithreading from Python |
| 3 | +************************** |
| 4 | +""" |
| 5 | + |
| 6 | +#***************************************************************************** |
| 7 | +# Copyright (C) 2022 Vincent Delecroix <[email protected]> |
| 8 | +# |
| 9 | +# Distributed under the terms of the GNU General Public License (GPL) |
| 10 | +# as published by the Free Software Foundation; either version 2 of |
| 11 | +# the License, or (at your option) any later version. |
| 12 | +# http://www.gnu.org/licenses/ |
| 13 | +#***************************************************************************** |
| 14 | + |
| 15 | +from libc.stdlib cimport malloc, calloc, free |
| 16 | + |
| 17 | +from .types cimport * |
| 18 | +from .paridecl cimport * |
| 19 | +from gen cimport Gen, objtogen |
| 20 | + |
| 21 | +cdef class PariThreadPool: |
| 22 | + r""" |
| 23 | + Pari thread allocator |
| 24 | +
|
| 25 | + This class is intended to be used in conjunction with the multithreading |
| 26 | + capabilities of the ``ThreadPoolExecutor`` from the ``concurrent.futures`` |
| 27 | + Python library. |
| 28 | +
|
| 29 | + Examples: |
| 30 | +
|
| 31 | + >>> from concurrent.futures import ThreadPoolExecutor, as_completed |
| 32 | + >>> from cypari2 import Pari, PariThreadPool |
| 33 | + >>> pari = Pari() |
| 34 | + >>> pari.default('nbthreads', 1) |
| 35 | + >>> max_workers = 4 |
| 36 | + >>> pari_pool = PariThreadPool(max_workers) |
| 37 | + >>> square_free = [] |
| 38 | + >>> with ThreadPoolExecutor(max_workers=max_workers, initializer=pari_pool.initializer) as executor: |
| 39 | + ... futures = {executor.submit(pari.issquarefree, n): n for n in range(10**6, 10**6 + 1000)} |
| 40 | + ... for future in as_completed(futures): |
| 41 | + ... n = futures[future] |
| 42 | + ... if future.result(): |
| 43 | + ... square_free.append(n) |
| 44 | + >>> square_free.sort() |
| 45 | + >>> square_free |
| 46 | + [1000001, 1000002, 1000003, 1000005, 1000006, ..., 1000994, 1000995, 1000997, 1000999] |
| 47 | + """ |
| 48 | + def __init__(self, size_t nbthreads, size_t size=8000000, size_t sizemax=0): |
| 49 | + r""" |
| 50 | + INPUT: |
| 51 | +
|
| 52 | + - ``nbthreads`` -- the number of threads to allocate |
| 53 | +
|
| 54 | + - ``size`` -- (default: 8000000) the number of bytes for the |
| 55 | + initial PARI stack (see notes below) |
| 56 | +
|
| 57 | + - ``sizemax`` -- (default: 0) the maximal number of bytes for the |
| 58 | + dynamically increasing PARI stack. |
| 59 | + """ |
| 60 | + cdef size_t i |
| 61 | + size = max(size, pari_mainstack.rsize) |
| 62 | + sizemax = max(max(size, pari_mainstack.vsize), sizemax) |
| 63 | + self.pths = <pari_thread *> calloc(nbthreads, sizeof(pari_thread)) |
| 64 | + for i in range(nbthreads): |
| 65 | + pari_thread_valloc(self.pths + i, size, sizemax, NULL) |
| 66 | + self.ithread = 0 |
| 67 | + self.nbthreads = nbthreads |
| 68 | + |
| 69 | + def __dealloc__(self): |
| 70 | + cdef size_t i |
| 71 | + for i in range(self.ithread): |
| 72 | + pari_thread_free(self.pths + i) |
| 73 | + free(self.pths) |
| 74 | + |
| 75 | + def __repr__(self): |
| 76 | + return 'Pari thread pool with {} threads'.format(self.nbthreads) |
| 77 | + |
| 78 | + def initializer(self): |
| 79 | + if self.ithread >= self.nbthreads: |
| 80 | + raise ValueError('no more thread available') |
| 81 | + pari_thread_start(self.pths + self.ithread) |
| 82 | + self.ithread += 1 |
0 commit comments