Skip to content

Commit b6cdcd6

Browse files
committed
Add test for issue #31
1 parent d58cb86 commit b6cdcd6

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed

tests/test_issue31.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#!python3
2+
3+
import os
4+
import time
5+
import unittest
6+
import chdb
7+
import zipfile
8+
import urllib.request
9+
10+
from timeout_decorator import timeout
11+
12+
csv_url = "https://media.githubusercontent.com/media/datablist/sample-csv-files/main/files/organizations/organizations-2000000.zip"
13+
14+
15+
# download csv file, and unzip it
16+
def download_and_extract(url, save_path):
17+
print("Downloading file...")
18+
urllib.request.urlretrieve(url, save_path)
19+
20+
print("Extracting file...")
21+
with zipfile.ZipFile(save_path, "r") as zip_ref:
22+
zip_ref.extractall(os.path.dirname(save_path))
23+
24+
print("Done!")
25+
26+
27+
@timeout(20, use_signals=False)
28+
def payload():
29+
now = time.time()
30+
res = chdb.query(
31+
'select Name, count(*) cnt from file("organizations-2000000.csv", CSVWithNames) group by Name order by cnt desc',
32+
"CSV",
33+
)
34+
print(res.get_memview().tobytes().decode("utf-8"))
35+
used_time = time.time() - now
36+
print("used time: ", used_time)
37+
38+
39+
class TestAggOnCSVSpeed(unittest.TestCase):
40+
def setUp(self):
41+
download_and_extract(csv_url, "organizations-2000000.zip")
42+
43+
def tearDown(self):
44+
os.remove("organizations-2000000.csv")
45+
os.remove("organizations-2000000.zip")
46+
47+
def test_agg(self):
48+
payload()
49+
50+
51+
if __name__ == "__main__":
52+
unittest.main()

tests/timeout_decorator.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
"""
2+
Timeout decorator.
3+
4+
:copyright: (c) 2012-2013 by PN.
5+
:license: MIT, see LICENSE for more details.
6+
"""
7+
8+
from __future__ import print_function
9+
from __future__ import unicode_literals
10+
from __future__ import division
11+
12+
import sys
13+
import time
14+
import multiprocessing
15+
import signal
16+
from functools import wraps
17+
18+
############################################################
19+
# Timeout
20+
############################################################
21+
22+
# http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
23+
# Used work of Stephen "Zero" Chappell <[email protected]>
24+
# in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
25+
26+
27+
class TimeoutError(AssertionError):
28+
29+
"""Thrown when a timeout occurs in the `timeout` context manager."""
30+
31+
def __init__(self, value="Timed Out"):
32+
self.value = value
33+
34+
def __str__(self):
35+
return repr(self.value)
36+
37+
38+
def _raise_exception(exception, exception_message):
39+
""" This function checks if a exception message is given.
40+
41+
If there is no exception message, the default behaviour is maintained.
42+
If there is an exception message, the message is passed to the exception with the 'value' keyword.
43+
"""
44+
if exception_message is None:
45+
raise exception()
46+
else:
47+
raise exception(exception_message)
48+
49+
50+
def timeout(seconds=None, use_signals=True, timeout_exception=TimeoutError, exception_message=None):
51+
"""Add a timeout parameter to a function and return it.
52+
53+
:param seconds: optional time limit in seconds or fractions of a second. If None is passed, no timeout is applied.
54+
This adds some flexibility to the usage: you can disable timing out depending on the settings.
55+
:type seconds: float
56+
:param use_signals: flag indicating whether signals should be used for timing function out or the multiprocessing
57+
When using multiprocessing, timeout granularity is limited to 10ths of a second.
58+
:type use_signals: bool
59+
60+
:raises: TimeoutError if time limit is reached
61+
62+
It is illegal to pass anything other than a function as the first
63+
parameter. The function is wrapped and returned to the caller.
64+
"""
65+
def decorate(function):
66+
67+
if use_signals:
68+
def handler(signum, frame):
69+
_raise_exception(timeout_exception, exception_message)
70+
71+
@wraps(function)
72+
def new_function(*args, **kwargs):
73+
new_seconds = kwargs.pop('timeout', seconds)
74+
if new_seconds:
75+
old = signal.signal(signal.SIGALRM, handler)
76+
signal.setitimer(signal.ITIMER_REAL, new_seconds)
77+
78+
if not seconds:
79+
return function(*args, **kwargs)
80+
81+
try:
82+
return function(*args, **kwargs)
83+
finally:
84+
if new_seconds:
85+
signal.setitimer(signal.ITIMER_REAL, 0)
86+
signal.signal(signal.SIGALRM, old)
87+
return new_function
88+
else:
89+
@wraps(function)
90+
def new_function(*args, **kwargs):
91+
timeout_wrapper = _Timeout(function, timeout_exception, exception_message, seconds)
92+
return timeout_wrapper(*args, **kwargs)
93+
return new_function
94+
95+
return decorate
96+
97+
98+
def _target(queue, function, *args, **kwargs):
99+
"""Run a function with arguments and return output via a queue.
100+
101+
This is a helper function for the Process created in _Timeout. It runs
102+
the function with positional arguments and keyword arguments and then
103+
returns the function's output by way of a queue. If an exception gets
104+
raised, it is returned to _Timeout to be raised by the value property.
105+
"""
106+
try:
107+
queue.put((True, function(*args, **kwargs)))
108+
except:
109+
queue.put((False, sys.exc_info()[1]))
110+
111+
112+
class _Timeout(object):
113+
114+
"""Wrap a function and add a timeout (limit) attribute to it.
115+
116+
Instances of this class are automatically generated by the add_timeout
117+
function defined above. Wrapping a function allows asynchronous calls
118+
to be made and termination of execution after a timeout has passed.
119+
"""
120+
121+
def __init__(self, function, timeout_exception, exception_message, limit):
122+
"""Initialize instance in preparation for being called."""
123+
self.__limit = limit
124+
self.__function = function
125+
self.__timeout_exception = timeout_exception
126+
self.__exception_message = exception_message
127+
self.__name__ = function.__name__
128+
self.__doc__ = function.__doc__
129+
self.__timeout = time.time()
130+
self.__process = multiprocessing.Process()
131+
self.__queue = multiprocessing.Queue()
132+
133+
def __call__(self, *args, **kwargs):
134+
"""Execute the embedded function object asynchronously.
135+
136+
The function given to the constructor is transparently called and
137+
requires that "ready" be intermittently polled. If and when it is
138+
True, the "value" property may then be checked for returned data.
139+
"""
140+
self.__limit = kwargs.pop('timeout', self.__limit)
141+
self.__queue = multiprocessing.Queue(1)
142+
args = (self.__queue, self.__function) + args
143+
self.__process = multiprocessing.Process(target=_target,
144+
args=args,
145+
kwargs=kwargs)
146+
self.__process.daemon = True
147+
self.__process.start()
148+
if self.__limit is not None:
149+
self.__timeout = self.__limit + time.time()
150+
while not self.ready:
151+
time.sleep(0.01)
152+
return self.value
153+
154+
def cancel(self):
155+
"""Terminate any possible execution of the embedded function."""
156+
if self.__process.is_alive():
157+
print("Terminating process: %s" % self.__process, file=sys.stderr)
158+
self.__process.kill()
159+
160+
_raise_exception(self.__timeout_exception, self.__exception_message)
161+
162+
@property
163+
def ready(self):
164+
"""Read-only property indicating status of "value" property."""
165+
if self.__limit and self.__timeout < time.time():
166+
self.cancel()
167+
return self.__queue.full() and not self.__queue.empty()
168+
169+
@property
170+
def value(self):
171+
"""Read-only property containing data returned from function."""
172+
if self.ready is True:
173+
flag, load = self.__queue.get()
174+
if flag:
175+
return load
176+
raise load

0 commit comments

Comments
 (0)