Skip to content

Commit ad568a0

Browse files
authored
Merge pull request #18 from MK2112/12-multi-threaded-multi-file-conversion
multi threaded multi file conversion
2 parents e3d05b8 + b05f459 commit ad568a0

File tree

8 files changed

+288
-109
lines changed

8 files changed

+288
-109
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- Offering a flexible **Command Line Interface**, a **Web Interface**, and a **Graphical User Interface (GUI)**
1818
- Control output quality and video framerate for the conversion
1919
- Automatically monitor a **"dropzone" directory** for new files and process them as they are dropped
20+
- Fast batch conversion with configurable **parallel processing** for audio and video conversions
2021

2122
## Setup
2223
1. **Clone/Download**:
@@ -96,6 +97,7 @@ This is the most detailed way to use `any_to_any.py`. You can structure a comman
9697
| `-r` or </br>`--recursive` | Recursively process all input files in subdirectories from the input directory. Outputs by default will be placed in their respective subdirectory, unless different output path provided. |
9798
| `-z` or </br>`--dropzone` | While running, a specified directory will be monitored for new files. When a file is added, it will be converted to the specified format, saved in the output directory and deleted from the input directory. |
9899
| `-fps` or</br>`--framerate` | Set the framerate (fps) when converting to a movie format or codec; default maintains input fps. |
100+
| `--workers` | Set the maximum number of parallel worker threads for per-file conversions (`1` to `cpu_count - 1` are supported). Defaults to `1`. |
99101

100102
### Single File Processing
101103
Convert a WEBP file to PNG:
@@ -202,6 +204,23 @@ python any_to_any.py -i -1 /path/to/folder1 -2 /path/to/folder2 -o /path/to/outp
202204
```
203205
Omitting the `-a`/`--across` parameter will execute merges or concatenations seperately, per each input directory.
204206

207+
#### Parallel Processing
208+
- Per-file conversions (e.g., audio-to-audio, movie-to-movie, gif-to-video) are processed in parallel.
209+
- Override default (`1` worker) by setting `--workers N` where `N` can be any integer from `1` to `cpu_count - 1`.
210+
211+
Directory with many audio files:
212+
```bash
213+
python any_to_any.py -i /path/to/folder -f mp3 --workers 4
214+
```
215+
Explicit multiple files:
216+
```bash
217+
python any_to_any.py -i /path/to/a.wav /path/to/b.flac /path/to/c.ogg -f mp3 --workers 4 -o /path/to/output_dir
218+
```
219+
Recursive scan (include subdirectories):
220+
```bash
221+
python any_to_any.py -i /path/to/input_dir -f mp3 --workers 4 --recursive
222+
```
223+
205224
## Supported Formats
206225
**Audio:** MP2, MP3, FLAC, AAC, AC3, DTS, OGG, OGA, WMA, WAV, M4A, AIFF, WEBA, MKA, WV, CAF, TTA, M4B, EAC3, SPX, AU, OPUS, M3U8, W64, MLP, ADTS, SBC, THD<br><br>
207226
**Image:** JPEG, PNG, GIF, BMP, WEBP, TIFF, TGA, EPS, PS, ICO, EPS, JPEG2000, IM, PCX, PPM<br><br>

any_to_any.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@
129129
type=str,
130130
required=False,
131131
)
132+
parser.add_argument(
133+
"--workers",
134+
help="Maximum worker threads to use for per-file conversions (default: 1)",
135+
type=int,
136+
default=1,
137+
required=False,
138+
)
132139

133140
args = vars(parser.parse_args())
134141

@@ -168,4 +175,5 @@
168175
recursive=args["recursive"],
169176
dropzone=args["dropzone"],
170177
language=args["language"],
178+
workers=args["workers"],
171179
)

core/audio_converter.py

Lines changed: 126 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import utils.language_support as lang
33
from utils.category import Category
44
from moviepy import AudioFileClip, VideoFileClip
5+
from concurrent.futures import ThreadPoolExecutor, as_completed
56

67

78
class AudioConverter:
@@ -26,87 +27,141 @@ def to_audio(
2627
output: str,
2728
delete: str,
2829
) -> None:
29-
# Audio to audio conversion
30-
for audio_path_set in file_paths[Category.AUDIO]:
30+
try:
31+
# Decide worker count with env A2A_MAX_WORKERS if set
32+
env_workers = int(os.environ.get("A2A_MAX_WORKERS", "1"))
33+
env_workers = 1 if env_workers < 1 else env_workers
34+
env_workers = os.cpu_count() - 1 if env_workers >= os.cpu_count() else env_workers
35+
except ValueError:
36+
# If this variable doesn't exist, flag wasn't invoked: Default to 1
37+
env_workers = 1
38+
39+
# Helper to convert a single audio file
40+
def _convert_audio_file(audio_path_set: tuple):
3141
if audio_path_set[2] == format:
32-
continue
33-
audio = AudioFileClip(self.file_handler.join_back(audio_path_set))
34-
# If recursive, create file outright where its source was found
35-
if not recursive or input != output:
36-
out_path = os.path.abspath(
37-
os.path.join(output, f"{audio_path_set[1]}.{format}")
38-
)
39-
else:
40-
out_path = os.path.abspath(
41-
os.path.join(audio_path_set[0], f"{audio_path_set[1]}.{format}")
42-
)
43-
# Write audio to file
42+
return None
43+
audio = None
44+
out_path = None
4445
try:
45-
audio.write_audiofile(
46-
out_path,
47-
codec=codec,
48-
bitrate=bitrate,
49-
fps=audio.fps,
50-
logger=self.prog_logger,
51-
)
52-
except Exception as _:
53-
self.event_logger.info(
54-
f"\n\n[!] {lang.get_translation('error', self.locale)}: {lang.get_translation('source_rate_incompatible', self.locale).replace('[format]', f'{format}')}\n"
55-
)
56-
audio.write_audiofile(
57-
out_path,
58-
codec=codec,
59-
bitrate=bitrate,
60-
fps=48000,
61-
logger=self.prog_logger,
62-
)
63-
audio.close()
64-
self.file_handler.post_process(audio_path_set, out_path, delete)
65-
66-
# Movie to audio conversion
67-
for movie_path_set in file_paths[Category.MOVIE]:
68-
out_path = os.path.abspath(
69-
os.path.join(output, f"{movie_path_set[1]}.{format}")
70-
)
71-
72-
if self.file_handler.has_visuals(movie_path_set):
73-
video = VideoFileClip(
74-
self.file_handler.join_back(movie_path_set),
75-
audio=True,
76-
fps_source="tbr",
77-
)
78-
audio = video.audio
79-
# Check if audio was found
80-
if audio is None:
81-
self.event_logger.info(
82-
f"[!] {lang.get_translation('no_audio', self.locale).replace('[path]', f'"{self.file_handler.join_back(movie_path_set)}"')} - {lang.get_translation('skipping', self.locale)}\n"
46+
audio = AudioFileClip(self.file_handler.join_back(audio_path_set))
47+
# If recursive, create file outright where its source was found
48+
if not recursive or input != output:
49+
out_path = os.path.abspath(
50+
os.path.join(output, f"{audio_path_set[1]}.{format}")
51+
)
52+
else:
53+
out_path = os.path.abspath(
54+
os.path.join(audio_path_set[0], f"{audio_path_set[1]}.{format}")
8355
)
84-
video.close()
85-
continue
86-
87-
audio.write_audiofile(
88-
out_path,
89-
codec=codec,
90-
bitrate=bitrate,
91-
logger=self.prog_logger,
92-
)
93-
94-
audio.close()
95-
video.close()
96-
else:
9756
try:
98-
# AudioFileClip works for audio-only video files
99-
audio = AudioFileClip(self.file_handler.join_back(movie_path_set))
10057
audio.write_audiofile(
10158
out_path,
10259
codec=codec,
10360
bitrate=bitrate,
61+
fps=audio.fps,
10462
logger=self.prog_logger,
10563
)
106-
audio.close()
10764
except Exception as _:
10865
self.event_logger.info(
109-
f"[!] {lang.get_translation('audio_extract_fail', self.locale).replace('[path]', f'"{self.file_handler.join_back(movie_path_set)}"')} - {lang.get_translation('skipping', self.locale)}\n"
66+
f"\n\n[!] {lang.get_translation('error', self.locale)}: {lang.get_translation('source_rate_incompatible', self.locale).replace('[format]', f'{format}')}\n"
67+
)
68+
audio.write_audiofile(
69+
out_path,
70+
codec=codec,
71+
bitrate=bitrate,
72+
fps=48000,
73+
logger=self.prog_logger,
11074
)
111-
continue
112-
self.file_handler.post_process(movie_path_set, out_path, delete)
75+
return (audio_path_set, out_path)
76+
finally:
77+
if audio is not None:
78+
audio.close()
79+
80+
audio_items = list(file_paths[Category.AUDIO])
81+
if len(audio_items) <= 1:
82+
# Fallback to sequential for 0/1 items
83+
result = _convert_audio_file(audio_items[0]) if audio_items else None
84+
if result is not None:
85+
src, out_path = result
86+
self.file_handler.post_process(src, out_path, delete)
87+
else:
88+
max_workers = env_workers if env_workers > 1 else 1
89+
with ThreadPoolExecutor(max_workers=max_workers) as ex:
90+
futures = [ex.submit(_convert_audio_file, a) for a in audio_items]
91+
for fut in as_completed(futures):
92+
res = fut.result()
93+
if res is None:
94+
continue
95+
src, out_path = res
96+
self.file_handler.post_process(src, out_path, delete)
97+
98+
# Movie to audio conversion
99+
def _extract_from_movie(movie_path_set: tuple):
100+
out_path_local = os.path.abspath(
101+
os.path.join(output, f"{movie_path_set[1]}.{format}")
102+
)
103+
video = None
104+
audio = None
105+
try:
106+
if self.file_handler.has_visuals(movie_path_set):
107+
video = VideoFileClip(
108+
self.file_handler.join_back(movie_path_set),
109+
audio=True,
110+
fps_source="tbr",
111+
)
112+
audio = video.audio
113+
# Check if audio was found
114+
if audio is None:
115+
self.event_logger.info(
116+
f"[!] {lang.get_translation('no_audio', self.locale).replace('[path]', f'\"{self.file_handler.join_back(movie_path_set)}\"')} - {lang.get_translation('skipping', self.locale)}\n"
117+
)
118+
return None
119+
audio.write_audiofile(
120+
out_path_local,
121+
codec=codec,
122+
bitrate=bitrate,
123+
logger=self.prog_logger,
124+
)
125+
else:
126+
try:
127+
# AudioFileClip works for audio-only video files
128+
audio = AudioFileClip(self.file_handler.join_back(movie_path_set))
129+
audio.write_audiofile(
130+
out_path_local,
131+
codec=codec,
132+
bitrate=bitrate,
133+
logger=self.prog_logger,
134+
)
135+
except Exception as _:
136+
self.event_logger.info(
137+
f"[!] {lang.get_translation('audio_extract_fail', self.locale).replace('[path]', f'\"{self.file_handler.join_back(movie_path_set)}\"')} - {lang.get_translation('skipping', self.locale)}\n"
138+
)
139+
return None
140+
return (movie_path_set, out_path_local)
141+
finally:
142+
try:
143+
if audio is not None:
144+
audio.close()
145+
except Exception:
146+
pass
147+
try:
148+
if video is not None:
149+
video.close()
150+
except Exception:
151+
pass
152+
153+
movie_items = list(file_paths[Category.MOVIE])
154+
if len(movie_items) <= 1:
155+
res = _extract_from_movie(movie_items[0]) if movie_items else None
156+
if res is not None:
157+
src, out_path = res
158+
self.file_handler.post_process(src, out_path, delete)
159+
else:
160+
with ThreadPoolExecutor(max_workers=env_workers) as ex:
161+
futures = [ex.submit(_extract_from_movie, m) for m in movie_items]
162+
for fut in as_completed(futures):
163+
res = fut.result()
164+
if res is None:
165+
continue
166+
src, out_path = res
167+
self.file_handler.post_process(src, out_path, delete)

core/controller.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,21 @@ def run(
238238
recursive: bool,
239239
dropzone: bool,
240240
language: str,
241+
workers: int,
241242
) -> None:
242243
# Main function, convert media files to defined formats
243244
# or merge or concatenate, according to the arguments
244245
input_paths = []
246+
# Apply worker override via environment for converters
247+
try:
248+
if workers is not None:
249+
# Workers flag was set, do some validation, hand it to env
250+
os.environ["A2A_MAX_WORKERS"] = str(max(1, int(workers)))
251+
except Exception:
252+
# Ignore invalid values; converters will fall back to CPU-based default
253+
pass
254+
255+
# Apply input paths
245256
input_path_args = (
246257
input_path_args
247258
if input_path_args is not None

0 commit comments

Comments
 (0)