diff --git a/csv_schema_inference/csv_schema_inference.py b/csv_schema_inference/csv_schema_inference.py index 0937f73..5cbfa00 100644 --- a/csv_schema_inference/csv_schema_inference.py +++ b/csv_schema_inference/csv_schema_inference.py @@ -3,7 +3,10 @@ import multiprocessing as mp import datetime as dt import operator - +# backend can be: +# - multiprocessing +# - threading +backend = "multiprocessing" class DetectType: @@ -135,32 +138,23 @@ def execute(self, records, x, obj, d_schema): return d_schema - def parallel(self, records, obj, d_schema): - - - + def parallel(self, records, obj, d_schema): cpus = (mp.cpu_count() - 2) - if cpus <= 0: cpus = mp.cpu_count() - chunk_size = len(records) / cpus - if chunk_size < 1: cpus = int(chunk_size * 10) chunk_size = 1 else: chunk_size = round(chunk_size) - + from joblib import Parallel, delayed - pool = mp.Pool(processes=cpus) - - results = [pool.apply_async(self.execute, args=(records[x:x+chunk_size], x, obj, d_schema)) for x in range(0, len(records), chunk_size)] - pool.close() - pool.join() - - return [p.get() for p in results] + # num_workers can be set based on a variable or command line argument + num_workers = cpus + results = Parallel(n_jobs=num_workers, backend=backend)(delayed(self.execute)(records[x:x+chunk_size], x, obj, d_schema) for x in range(0, len(records), chunk_size)) + return results class CsvSchemaInference: diff --git a/setup.cfg b/setup.cfg index 8256096..186810b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,4 +15,6 @@ classifiers = [options] packages = find: python_requires = >=3.7 -include_package_data = False \ No newline at end of file +include_package_data = False +install_requires = + joblib \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..0f05f82 --- /dev/null +++ b/setup.py @@ -0,0 +1,3 @@ +from setuptools import setup +if __name__ == '__main__': + setup() \ No newline at end of file