Skip to content

Commit 465522d

Browse files
authored
Merge pull request #350 from splunk/end_to_end_searchcommand_tests
initial end-to-end tests for streaming, reporting, generating
2 parents 9f6b2e0 + 139d907 commit 465522d

File tree

5 files changed

+209
-1
lines changed

5 files changed

+209
-1
lines changed

splunklib/searchcommands/search_command.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,8 @@ def _execute(self, ifile, process):
851851

852852
@staticmethod
853853
def _as_binary_stream(ifile):
854-
if six.PY2:
854+
naught = ifile.read(0)
855+
if isinstance(naught, bytes):
855856
return ifile
856857

857858
try:
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import collections
2+
import csv
3+
import io
4+
import json
5+
6+
import splunklib.searchcommands.internals
7+
from splunklib import six
8+
9+
10+
class Chunk(object):
11+
def __init__(self, version, meta, data):
12+
self.version = six.ensure_str(version)
13+
self.meta = json.loads(meta)
14+
dialect = splunklib.searchcommands.internals.CsvDialect
15+
self.data = csv.DictReader(io.StringIO(data.decode("utf-8")),
16+
dialect=dialect)
17+
18+
19+
class ChunkedDataStreamIter(collections.Iterator):
20+
def __init__(self, chunk_stream):
21+
self.chunk_stream = chunk_stream
22+
23+
def __next__(self):
24+
return self.next()
25+
26+
def next(self):
27+
try:
28+
return self.chunk_stream.read_chunk()
29+
except EOFError:
30+
raise StopIteration
31+
32+
33+
class ChunkedDataStream(collections.Iterable):
34+
def __iter__(self):
35+
return ChunkedDataStreamIter(self)
36+
37+
def __init__(self, stream):
38+
empty = stream.read(0)
39+
assert isinstance(empty, bytes)
40+
self.stream = stream
41+
42+
def read_chunk(self):
43+
header = self.stream.readline()
44+
45+
while len(header) > 0 and header.strip() == b'':
46+
header = self.stream.readline() # Skip empty lines
47+
if len(header) == 0:
48+
raise EOFError
49+
50+
version, meta, data = header.rstrip().split(b',')
51+
metabytes = self.stream.read(int(meta))
52+
databytes = self.stream.read(int(data))
53+
return Chunk(version, metabytes, databytes)
54+
55+
56+
def build_chunk(keyval, data=None):
57+
metadata = six.ensure_binary(json.dumps(keyval), 'utf-8')
58+
data_output = _build_data_csv(data)
59+
return b"chunked 1.0,%d,%d\n%s%s" % (len(metadata), len(data_output), metadata, data_output)
60+
61+
62+
def build_empty_searchinfo():
63+
return {
64+
'earliest_time': 0,
65+
'latest_time': 0,
66+
'search': "",
67+
'dispatch_dir': "",
68+
'sid': "",
69+
'args': [],
70+
'splunk_version': "42.3.4",
71+
}
72+
73+
74+
def build_getinfo_chunk():
75+
return build_chunk({
76+
'action': 'getinfo',
77+
'preview': False,
78+
'searchinfo': build_empty_searchinfo()})
79+
80+
81+
def build_data_chunk(data, finished=True):
82+
return build_chunk({'action': 'execute', 'finished': finished}, data)
83+
84+
85+
def _build_data_csv(data):
86+
if data is None:
87+
return b''
88+
if isinstance(data, bytes):
89+
return data
90+
csvout = splunklib.six.StringIO()
91+
92+
headers = set()
93+
for datum in data:
94+
headers.update(datum.keys())
95+
writer = csv.DictWriter(csvout, headers,
96+
dialect=splunklib.searchcommands.internals.CsvDialect)
97+
writer.writeheader()
98+
for datum in data:
99+
writer.writerow(datum)
100+
return six.ensure_binary(csvout.getvalue())
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import io
2+
import time
3+
4+
from . import chunked_data_stream as chunky
5+
6+
from splunklib.searchcommands import Configuration, GeneratingCommand
7+
8+
9+
def test_simple_generator():
10+
@Configuration()
11+
class GeneratorTest(GeneratingCommand):
12+
def generate(self):
13+
for num in range(1, 10):
14+
yield {'_time': time.time(), 'event_index': num}
15+
generator = GeneratorTest()
16+
in_stream = io.BytesIO()
17+
in_stream.write(chunky.build_getinfo_chunk())
18+
in_stream.write(chunky.build_chunk({'action': 'execute'}))
19+
in_stream.seek(0)
20+
out_stream = io.BytesIO()
21+
generator._process_protocol_v2([], in_stream, out_stream)
22+
out_stream.seek(0)
23+
24+
ds = chunky.ChunkedDataStream(out_stream)
25+
is_first_chunk = True
26+
finished_seen = False
27+
expected = set(map(lambda i: str(i), range(1, 10)))
28+
seen = set()
29+
for chunk in ds:
30+
if is_first_chunk:
31+
assert chunk.meta["generating"] is True
32+
assert chunk.meta["type"] == "stateful"
33+
is_first_chunk = False
34+
finished_seen = chunk.meta.get("finished", False)
35+
for row in chunk.data:
36+
seen.add(row["event_index"])
37+
print(out_stream.getvalue())
38+
print(expected)
39+
print(seen)
40+
assert expected.issubset(seen)
41+
assert finished_seen
42+
43+
44+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import io
2+
3+
import splunklib.searchcommands as searchcommands
4+
from . import chunked_data_stream as chunky
5+
6+
7+
def test_simple_reporting_command():
8+
@searchcommands.Configuration()
9+
class TestReportingCommand(searchcommands.ReportingCommand):
10+
def reduce(self, records):
11+
value = 0
12+
for record in records:
13+
value += int(record["value"])
14+
yield {'sum': value}
15+
16+
cmd = TestReportingCommand()
17+
ifile = io.BytesIO()
18+
data = list()
19+
for i in range(0, 10):
20+
data.append({"value": str(i)})
21+
ifile.write(chunky.build_getinfo_chunk())
22+
ifile.write(chunky.build_data_chunk(data))
23+
ifile.seek(0)
24+
ofile = io.BytesIO()
25+
cmd._process_protocol_v2([], ifile, ofile)
26+
ofile.seek(0)
27+
chunk_stream = chunky.ChunkedDataStream(ofile)
28+
getinfo_response = chunk_stream.read_chunk()
29+
assert getinfo_response.meta['type'] == 'reporting'
30+
data_chunk = chunk_stream.read_chunk()
31+
assert data_chunk.meta['finished'] is True # Should only be one row
32+
data = list(data_chunk.data)
33+
assert len(data) == 1
34+
assert int(data[0]['sum']) == sum(range(0, 10))
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import io
2+
3+
from . import chunked_data_stream as chunky
4+
from splunklib.searchcommands import StreamingCommand, Configuration
5+
6+
7+
def test_simple_streaming_command():
8+
@Configuration()
9+
class TestStreamingCommand(StreamingCommand):
10+
11+
def stream(self, records):
12+
for record in records:
13+
record["out_index"] = record["in_index"]
14+
yield record
15+
16+
cmd = TestStreamingCommand()
17+
ifile = io.BytesIO()
18+
ifile.write(chunky.build_getinfo_chunk())
19+
data = list()
20+
for i in range(0, 10):
21+
data.append({"in_index": str(i)})
22+
ifile.write(chunky.build_data_chunk(data, finished=True))
23+
ifile.seek(0)
24+
ofile = io.BytesIO()
25+
cmd._process_protocol_v2([], ifile, ofile)
26+
ofile.seek(0)
27+
output = chunky.ChunkedDataStream(ofile)
28+
getinfo_response = output.read_chunk()
29+
assert getinfo_response.meta["type"] == "streaming"

0 commit comments

Comments
 (0)