Skip to content

Commit 85c2397

Browse files
Support array properties in CSVs. (#37)
* Support array properties in CSVs. * Address PR comments
1 parent eaf097e commit 85c2397

File tree

5 files changed

+135
-7
lines changed

5 files changed

+135
-7
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ The flags for `max-token-count`, `max-buffer-size`, and `max-token-size` are typ
8787
- `integer`: an unquoted value that can be read as an integer type.
8888
- `double`: an unquoted value that can be read as a floating-point type.
8989
- `string`: any field that is either quote-interpolated or cannot be casted to a numeric or boolean type.
90+
- `array`: A bracket-interpolated array of elements of any types. Strings within the array must be explicitly quote-interpolated. Array properties require use of a non-comma delimiter for the CSV (`-o`).
9091
- Cypher does not allow NULL values to be assigned to properties.
9192
- The default behaviour is to infer the property type, attempting to cast it to integer, float, boolean, or string in that order.
9293
- The `--enforce-schema` flag and an [Input Schema](#input-schemas) should be used if type inference is not desired.
@@ -104,6 +105,20 @@ The flags for `max-token-count`, `max-buffer-size`, and `max-token-size` are typ
104105
- If the file has more than 2 fields, all subsequent fields are relationship properties that adhere to the same rules as node properties.
105106
- Described relationships are always considered to be directed (source->destination).
106107

108+
### Input CSV example
109+
Store.csv
110+
```
111+
storeNum | Location | daysOpen |
112+
118 | 123 Main St | ['Mon', 'Wed', 'Fri']
113+
136 | 55 Elm St | ['Sat', 'Sun']
114+
```
115+
This CSV would be inserted with the command:
116+
`redisgraph-bulk-loader StoreGraph --separator \| --nodes Store.csv`
117+
118+
(Since the pipe character has meaning in the terminal, it must be backslash-escaped.)
119+
120+
All `storeNum` properties will be inserted as integers, `Location` will be inserted as strings, and `daysOpen` will be inserted as arrays of strings.
121+
107122
## Input Schemas
108123
If the `--enforce-schema` flag is specified, all input CSVs will be expected to specify each column's data type in the header.
109124

@@ -122,6 +137,7 @@ The accepted data types are:
122137
| INT / INTEGER / LONG | A signed 64-bit integer value | Yes |
123138
| BOOLEAN | A boolean value indicated by the string 'true' or 'false' | Yes |
124139
| STRING | A string value | Yes |
140+
| ARRAY | An array value | Yes |
125141

126142
If an `ID` column has a name string, the value will be added to each node as a property. Otherwise, it is internal to the bulk loader operation and will not appear in the graph. `START_ID` and `END_ID` columns will never be added as properties.
127143

redisgraph_bulk_loader/entity_file.py

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
import os
22
import io
33
import csv
4+
import ast
5+
import sys
46
import math
57
import struct
68
from enum import Enum
79
from exceptions import CSVError, SchemaError
810

11+
csv.field_size_limit(sys.maxsize) # Don't limit the size of user input fields.
12+
913

1014
class Type(Enum):
1115
UNKNOWN = 0
@@ -16,10 +20,11 @@ class Type(Enum):
1620
LONG = 4
1721
INT = 4 # alias to LONG
1822
INTEGER = 4 # alias to LONG
19-
ID = 5
20-
START_ID = 6
21-
END_ID = 7
22-
IGNORE = 8
23+
ARRAY = 5
24+
ID = 6
25+
START_ID = 7
26+
END_ID = 8
27+
IGNORE = 9
2328

2429

2530
def convert_schema_type(in_type):
@@ -38,11 +43,28 @@ def convert_schema_type(in_type):
3843
raise SchemaError("Encountered invalid field type '%s'" % in_type)
3944

4045

46+
def array_prop_to_binary(format_str, prop_val):
47+
# Evaluate the array to convert its elements.
48+
# (This allows us to handle nested arrays.)
49+
array_val = ast.literal_eval(prop_val)
50+
# Send array length as a long.
51+
array_to_send = struct.pack(format_str + "q", Type.ARRAY.value, len(array_val))
52+
# Recursively send each array element as a string.
53+
for elem in array_val:
54+
array_to_send += inferred_prop_to_binary(str(elem))
55+
# Return the full array struct.
56+
return array_to_send
57+
58+
4159
# Convert a property field with an enforced type into a binary stream.
4260
# Supported property types are string, integer, float, and boolean.
4361
def typed_prop_to_binary(prop_val, prop_type):
4462
# All format strings start with an unsigned char to represent our prop_type enum
4563
format_str = "=B"
64+
65+
# Remove leading and trailing whitespace
66+
prop_val = prop_val.strip()
67+
4668
# TODO allow ID type specification
4769
if prop_type == Type.ID or prop_type == Type.LONG:
4870
try:
@@ -79,6 +101,11 @@ def typed_prop_to_binary(prop_val, prop_type):
79101
format_str += "%ds" % (len(encoded_str) + 1)
80102
return struct.pack(format_str, Type.STRING.value, encoded_str)
81103

104+
elif prop_type == Type.ARRAY:
105+
if prop_val[0] != '[' or prop_val[-1] != ']':
106+
raise SchemaError("Could not parse '%s' as an array" % prop_val)
107+
return array_prop_to_binary(format_str, prop_val)
108+
82109
# If it hasn't returned by this point, it is trying to set it to a type that it can't adopt
83110
raise Exception("unable to parse [" + prop_val + "] with type ["+repr(prop_type)+"]")
84111

@@ -93,6 +120,9 @@ def inferred_prop_to_binary(prop_val):
93120
# TODO This is not allowed in Cypher, consider how to handle it here rather than in-module.
94121
return struct.pack(format_str, 0)
95122

123+
# Remove leading and trailing whitespace
124+
prop_val = prop_val.strip()
125+
96126
# Try to parse value as an integer.
97127
try:
98128
numeric_prop = int(prop_val)
@@ -114,6 +144,10 @@ def inferred_prop_to_binary(prop_val):
114144
elif prop_val.lower() == 'true':
115145
return struct.pack(format_str + '?', Type.BOOL.value, True)
116146

147+
# If the property string is bracket-interpolated, it is an array.
148+
if prop_val[0] == '[' and prop_val[-1] == ']':
149+
return array_prop_to_binary(format_str, prop_val)
150+
117151
# If we've reached this point, the property is a string.
118152
encoded_str = str.encode(prop_val) # struct.pack requires bytes objects as arguments
119153
# Encoding len+1 adds a null terminator to the string
@@ -192,7 +226,7 @@ def convert_header_with_schema(self, header):
192226
raise CSVError("Field '%s' had %d colons" % field, len(field))
193227

194228
# Convert the column type.
195-
col_type = convert_schema_type(pair[1].upper())
229+
col_type = convert_schema_type(pair[1].upper().strip())
196230

197231
# If the column did not have a name but the type requires one, emit an error.
198232
if len(pair[0]) == 0 and col_type not in (Type.ID, Type.START_ID, Type.END_ID, Type.IGNORE):
@@ -201,7 +235,8 @@ def convert_header_with_schema(self, header):
201235
# We have a column name and a type.
202236
# Only store the name if the column's values should be added as properties.
203237
if len(pair[0]) > 0 and col_type not in (Type.START_ID, Type.END_ID, Type.IGNORE):
204-
self.column_names[idx] = pair[0]
238+
column_name = pair[0].strip()
239+
self.column_names[idx] = column_name
205240

206241
# Store the column type.
207242
self.types[idx] = col_type

redisgraph_bulk_loader/label.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def process_schemaless_header(self, header):
1818
self.id = 0
1919

2020
for idx, field in enumerate(header):
21+
field = field.strip()
2122
self.column_names[idx] = field
2223

2324
if header[0][0] == '_':

redisgraph_bulk_loader/relation_type.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def process_schemaless_header(self, header):
2222
self.end_namespace = None
2323

2424
for idx, field in enumerate(header[2:]):
25-
self.column_names[idx+2] = field
25+
self.column_names[idx+2] = field.strip()
2626

2727
def post_process_header_with_schema(self, header):
2828
# Can interleave these tasks if preferred.

test/test_bulk_loader.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22

33
import os
4+
import sys
45
import csv
56
import redis
67
import unittest
@@ -584,6 +585,81 @@ def test13_id_namespaces(self):
584585
[1, 'Filipe', 'User', 1, 40, 'Post']]
585586
self.assertEqual(query_result.result_set, expected_result)
586587

588+
def test14_array_properties_inferred(self):
589+
"""Validate that array properties are correctly inserted."""
590+
591+
graphname = "arr_graph"
592+
with open('/tmp/nodes.tmp', mode='w') as csv_file:
593+
out = csv.writer(csv_file, delimiter='|')
594+
out.writerow(['str_col', 'arr_col'])
595+
out.writerow(['str1', """[1, 0.2, 'nested_str', False]"""])
596+
out.writerow(['str2', """['prop1', ['nested_1', 'nested_2'], 5]"""])
597+
598+
runner = CliRunner()
599+
res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp',
600+
'--separator', '|',
601+
graphname], catch_exceptions=False)
602+
603+
self.assertEqual(res.exit_code, 0)
604+
self.assertIn('2 nodes created', res.output)
605+
606+
graph = Graph(graphname, self.redis_con)
607+
query_result = graph.query('MATCH (a) RETURN a ORDER BY a.str_col')
608+
609+
node_1 = {'str_col': 'str1', 'arr_col': [1, 0.2, 'nested_str', False]}
610+
node_2 = {'str_col': 'str2', 'arr_col': ['prop1', ['nested_1', 'nested_2'], 5]}
611+
self.assertEqual(query_result.result_set[0][0].properties, node_1)
612+
self.assertEqual(query_result.result_set[1][0].properties, node_2)
613+
614+
def test15_array_properties_schema_enforced(self):
615+
"""Validate that array properties are correctly inserted with an enforced schema."""
616+
617+
graphname = "arr_graph_with_schema"
618+
with open('/tmp/nodes.tmp', mode='w') as csv_file:
619+
out = csv.writer(csv_file, delimiter='|')
620+
out.writerow(['str_col:STRING', 'arr_col:ARRAY'])
621+
out.writerow(['str1', """[1, 0.2, 'nested_str', False]"""])
622+
out.writerow(['str2', """['prop1', ['nested_1', 'nested_2'], 5]"""])
623+
624+
runner = CliRunner()
625+
res = runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp',
626+
'--separator', '|',
627+
'--enforce-schema',
628+
graphname], catch_exceptions=False)
629+
630+
self.assertEqual(res.exit_code, 0)
631+
self.assertIn('2 nodes created', res.output)
632+
633+
graph = Graph(graphname, self.redis_con)
634+
query_result = graph.query('MATCH (a) RETURN a ORDER BY a.str_col')
635+
636+
node_1 = {'str_col': 'str1', 'arr_col': [1, 0.2, 'nested_str', False]}
637+
node_2 = {'str_col': 'str2', 'arr_col': ['prop1', ['nested_1', 'nested_2'], 5]}
638+
self.assertEqual(query_result.result_set[0][0].properties, node_1)
639+
self.assertEqual(query_result.result_set[1][0].properties, node_2)
640+
641+
def test16_error_on_schema_failure(self):
642+
"""Validate that the loader errors on processing non-conformant CSVs with an enforced schema."""
643+
644+
graphname = "schema_error"
645+
with open('/tmp/nodes.tmp', mode='w') as csv_file:
646+
out = csv.writer(csv_file, delimiter='|')
647+
out.writerow(['str_col:STRING', 'arr_col:ARRAY'])
648+
out.writerow(['str1', """[1, 0.2, 'nested_str', False]"""])
649+
out.writerow(['str2', 'strval'])
650+
651+
try:
652+
runner = CliRunner()
653+
runner.invoke(bulk_insert, ['--nodes', '/tmp/nodes.tmp',
654+
'--separator', '|',
655+
'--enforce-schema',
656+
graphname], catch_exceptions=False)
657+
self.fail() # Should be unreachable
658+
except Exception as e:
659+
# Verify that the correct exception is raised.
660+
self.assertEqual(sys.exc_info()[0].__name__, 'SchemaError')
661+
self.assertIn("Could not parse 'strval' as an array", e.args)
662+
587663

588664
if __name__ == '__main__':
589665
unittest.main()

0 commit comments

Comments
 (0)