Skip to content

Commit 12d39e3

Browse files
authored
Merge pull request #134 from dilini-muthumala/master
Introduce EOF back into other modes
2 parents 1898d0d + 19679e0 commit 12d39e3

File tree

4 files changed

+24
-42
lines changed

4 files changed

+24
-42
lines changed

component/src/main/java/io/siddhi/extension/io/file/FileSource.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,6 @@ public StateFactory<FileSourceState> init(SourceEventListener sourceEventListene
602602
createInitialSourceConf();
603603
updateSourceConf();
604604
getPattern();
605-
validateEOFConfigs();
606605
if (MetricsDataHolder.getInstance().getMetricService() != null &&
607606
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
608607
try {
@@ -1058,21 +1057,4 @@ public void restore(Map<String, Object> map) {
10581057
(List<String>) map.get(Constants.PROCESSED_FILE_LIST));
10591058
}
10601059
}
1061-
1062-
private void validateEOFConfigs() {
1063-
for (String property: requiredProperties) {
1064-
/**
1065-
* EOF property will only be supported under REGEX mode with tailing disabled, going forward.
1066-
* TEXT_FULL mode is supported in this version to keep backward compatibility
1067-
*/
1068-
if (property.equalsIgnoreCase(Constants.EOF)) {
1069-
if (!((Constants.REGEX.equalsIgnoreCase(mode) && !isTailingEnabled)
1070-
|| Constants.TEXT_FULL.equalsIgnoreCase(mode))) {
1071-
throw new SiddhiAppCreationException("Transport property trp:eof can be provided only " +
1072-
"on regex mode when tailing disabled or in Text Full mode. Given mode: " + mode
1073-
+ ". Tailing enabled: " + isTailingEnabled);
1074-
}
1075-
}
1076-
}
1077-
}
10781060
}

component/src/main/java/io/siddhi/extension/io/file/processors/FileProcessor.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -323,21 +323,6 @@ public String getId() {
323323
return "file-message-processor";
324324
}
325325

326-
private String[] getRequiredPropertyValues(CarbonMessage carbonMessage) {
327-
String[] values = new String[requiredProperties.length];
328-
int i = 0;
329-
for (String propertyKey : requiredProperties) {
330-
Object value = carbonMessage.getProperty(propertyKey);
331-
if (value != null) {
332-
values[i++] = value.toString();
333-
} else {
334-
log.error("Failed to find required transport property '" + propertyKey + "'. Assigning null value");
335-
values[i++] = null;
336-
}
337-
}
338-
return values;
339-
}
340-
341326
private void increaseMetrics(int byteLength) {
342327
metrics.getTotalFileReadCount().inc();
343328
metrics.getTotalReadsMetrics().inc();
@@ -358,8 +343,23 @@ private void increaseTailingMetrics() {
358343
metrics.getTailEnabledFilesMap().replace(Utils.getShortFilePath(fileURI), System.currentTimeMillis());
359344
}
360345

346+
private String[] getRequiredPropertyValues(CarbonMessage carbonMessage) {
347+
String[] values = new String[requiredProperties.length];
348+
int i = 0;
349+
for (String propertyKey : requiredProperties) {
350+
Object value = carbonMessage.getProperty(propertyKey);
351+
if (value != null) {
352+
values[i++] = value.toString();
353+
} else {
354+
log.error("Failed to find required transport property '" + propertyKey + "'. Assigning null value");
355+
values[i++] = null;
356+
}
357+
}
358+
return values;
359+
}
360+
361361
/**
362-
* In Regex mode, the user may request for property trp:eol which is not extracted from the carbonMessage.
362+
* In Regex mode, the user may request for property trp:eof which is not extracted from the carbonMessage.
363363
* @param eof Whether EOF reached or not
364364
* @return required properties array
365365
*/

component/src/test/java/io/siddhi/extension/io/file/FileSourceBinaryChunkedModeTestCase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,15 @@ public void siddhiIoFileTestForBinaryChunkedWithDirUri() throws InterruptedExcep
9292
"move.after.process='file:/" + moveAfterProcessDir + "', " +
9393
"tailing='false', " +
9494
"@map(type='binaryPassThrough', " +
95-
"@attributes(buffer='0', fileName = 'trp:file.name', " +
95+
"@attributes(buffer='0', eof = 'trp:eof', fileName = 'trp:file.name', " +
9696
"sequenceNumber = 'trp:sequence.number', length = 'trp:content.length')))\n" +
97-
"define stream FooStream (buffer object, fileName string, sequenceNumber int, " +
97+
"define stream FooStream (buffer object, eof bool, fileName string, sequenceNumber int, " +
9898
"length int);\n" +
9999
"@sink(type='log')\n" +
100-
"define stream BarStream (fileName string, sequenceNumber int, length int); ";
100+
"define stream BarStream (eof bool, fileName string, sequenceNumber int, length int); ";
101101
String query = "" +
102102
"from FooStream " +
103-
"select fileName, sequenceNumber, length " +
103+
"select eof, fileName, sequenceNumber, length " +
104104
"insert into BarStream; ";
105105
SiddhiManager siddhiManager = new SiddhiManager();
106106
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
@@ -127,7 +127,7 @@ public void receive(Event[] events) {
127127
siddhiAppRuntime.shutdown();
128128
}
129129

130-
@Test(expectedExceptions = SiddhiAppCreationException.class)
130+
@Test
131131
public void siddhiIoFileTestForBinaryChunkedWithFileUri() throws InterruptedException {
132132
log.info("Siddhi IO File Test with binary.chunked mode and binaryPassThrough Mapper with File Uri");
133133
File file = new File(dirUri + "/binary/apache.bin");

component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ public void siddhiIoFileReadLineInBinaryFileTest() {
10031003
siddhiAppRuntime.start();
10041004
}
10051005

1006-
@Test(expectedExceptions = SiddhiAppCreationException.class)
1006+
@Test
10071007
public void siddhiIoFileTestForEOFAndFileName() throws InterruptedException {
10081008
log.info("test SiddhiIoFile [mode=line] Test for EOF and File Name");
10091009
String streams = "" +
@@ -1046,7 +1046,7 @@ public void receive(Event[] events) {
10461046
siddhiAppRuntime.shutdown();
10471047
}
10481048

1049-
@Test(expectedExceptions = SiddhiAppCreationException.class)
1049+
@Test
10501050
public void siddhiIoFileTestForSkipHeader() throws InterruptedException {
10511051
log.info("test SiddhiIoFile header.present parameter Test");
10521052
String streams = "" +
@@ -1082,7 +1082,7 @@ public void receive(Event[] events) {
10821082
siddhiAppRuntime.shutdown();
10831083
}
10841084

1085-
@Test(expectedExceptions = SiddhiAppCreationException.class)
1085+
@Test
10861086
public void siddhiIoFileTestWithoutSkipHeader() throws InterruptedException {
10871087
log.info("test SiddhiIoFile without header.present parameter Test");
10881088
String streams = "" +

0 commit comments

Comments
 (0)