Skip to content

Commit 19679e0

Browse files
ramindu90dilini-muthumala
authored andcommitted
Introduce EOF back into other modes
1 parent b60d554 commit 19679e0

File tree

4 files changed

+24
-42
lines changed

4 files changed

+24
-42
lines changed

component/src/main/java/io/siddhi/extension/io/file/FileSource.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,6 @@ public StateFactory<FileSourceState> init(SourceEventListener sourceEventListene
563563
createInitialSourceConf();
564564
updateSourceConf();
565565
getPattern();
566-
validateEOFConfigs();
567566
if (MetricsDataHolder.getInstance().getMetricService() != null &&
568567
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
569568
try {
@@ -1013,21 +1012,4 @@ public void restore(Map<String, Object> map) {
10131012
(List<String>) map.get(Constants.PROCESSED_FILE_LIST));
10141013
}
10151014
}
1016-
1017-
private void validateEOFConfigs() {
1018-
for (String property: requiredProperties) {
1019-
/**
1020-
* EOF property will only be supported under REGEX mode with tailing disabled, going forward.
1021-
* TEXT_FULL mode is supported in this version to keep backward compatibility
1022-
*/
1023-
if (property.equalsIgnoreCase(Constants.EOF)) {
1024-
if (!((Constants.REGEX.equalsIgnoreCase(mode) && !isTailingEnabled)
1025-
|| Constants.TEXT_FULL.equalsIgnoreCase(mode))) {
1026-
throw new SiddhiAppCreationException("Transport property trp:eof can be provided only " +
1027-
"on regex mode when tailing disabled or in Text Full mode. Given mode: " + mode
1028-
+ ". Tailing enabled: " + isTailingEnabled);
1029-
}
1030-
}
1031-
}
1032-
}
10331015
}

component/src/main/java/io/siddhi/extension/io/file/processors/FileProcessor.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -323,21 +323,6 @@ public String getId() {
323323
return "file-message-processor";
324324
}
325325

326-
private String[] getRequiredPropertyValues(CarbonMessage carbonMessage) {
327-
String[] values = new String[requiredProperties.length];
328-
int i = 0;
329-
for (String propertyKey : requiredProperties) {
330-
Object value = carbonMessage.getProperty(propertyKey);
331-
if (value != null) {
332-
values[i++] = value.toString();
333-
} else {
334-
log.error("Failed to find required transport property '" + propertyKey + "'. Assigning null value");
335-
values[i++] = null;
336-
}
337-
}
338-
return values;
339-
}
340-
341326
private void increaseMetrics(int byteLength) {
342327
metrics.getTotalFileReadCount().inc();
343328
metrics.getTotalReadsMetrics().inc();
@@ -358,8 +343,23 @@ private void increaseTailingMetrics() {
358343
metrics.getTailEnabledFilesMap().replace(Utils.getShortFilePath(fileURI), System.currentTimeMillis());
359344
}
360345

346+
private String[] getRequiredPropertyValues(CarbonMessage carbonMessage) {
347+
String[] values = new String[requiredProperties.length];
348+
int i = 0;
349+
for (String propertyKey : requiredProperties) {
350+
Object value = carbonMessage.getProperty(propertyKey);
351+
if (value != null) {
352+
values[i++] = value.toString();
353+
} else {
354+
log.error("Failed to find required transport property '" + propertyKey + "'. Assigning null value");
355+
values[i++] = null;
356+
}
357+
}
358+
return values;
359+
}
360+
361361
/**
362-
* In Regex mode, the user may request for property trp:eol which is not extracted from the carbonMessage.
362+
* In Regex mode, the user may request for property trp:eof which is not extracted from the carbonMessage.
363363
* @param eof Whether EOF reached or not
364364
* @return required properties array
365365
*/

component/src/test/java/io/siddhi/extension/io/file/FileSourceBinaryChunkedModeTestCase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,15 +92,15 @@ public void siddhiIoFileTestForBinaryChunkedWithDirUri() throws InterruptedExcep
9292
"move.after.process='file:/" + moveAfterProcessDir + "', " +
9393
"tailing='false', " +
9494
"@map(type='binaryPassThrough', " +
95-
"@attributes(buffer='0', fileName = 'trp:file.name', " +
95+
"@attributes(buffer='0', eof = 'trp:eof', fileName = 'trp:file.name', " +
9696
"sequenceNumber = 'trp:sequence.number', length = 'trp:content.length')))\n" +
97-
"define stream FooStream (buffer object, fileName string, sequenceNumber int, " +
97+
"define stream FooStream (buffer object, eof bool, fileName string, sequenceNumber int, " +
9898
"length int);\n" +
9999
"@sink(type='log')\n" +
100-
"define stream BarStream (fileName string, sequenceNumber int, length int); ";
100+
"define stream BarStream (eof bool, fileName string, sequenceNumber int, length int); ";
101101
String query = "" +
102102
"from FooStream " +
103-
"select fileName, sequenceNumber, length " +
103+
"select eof, fileName, sequenceNumber, length " +
104104
"insert into BarStream; ";
105105
SiddhiManager siddhiManager = new SiddhiManager();
106106
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query);
@@ -127,7 +127,7 @@ public void receive(Event[] events) {
127127
siddhiAppRuntime.shutdown();
128128
}
129129

130-
@Test(expectedExceptions = SiddhiAppCreationException.class)
130+
@Test
131131
public void siddhiIoFileTestForBinaryChunkedWithFileUri() throws InterruptedException {
132132
log.info("Siddhi IO File Test with binary.chunked mode and binaryPassThrough Mapper with File Uri");
133133
File file = new File(dirUri + "/binary/apache.bin");

component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ public void siddhiIoFileReadLineInBinaryFileTest() {
10031003
siddhiAppRuntime.start();
10041004
}
10051005

1006-
@Test(expectedExceptions = SiddhiAppCreationException.class)
1006+
@Test
10071007
public void siddhiIoFileTestForEOFAndFileName() throws InterruptedException {
10081008
log.info("test SiddhiIoFile [mode=line] Test for EOF and File Name");
10091009
String streams = "" +
@@ -1046,7 +1046,7 @@ public void receive(Event[] events) {
10461046
siddhiAppRuntime.shutdown();
10471047
}
10481048

1049-
@Test(expectedExceptions = SiddhiAppCreationException.class)
1049+
@Test
10501050
public void siddhiIoFileTestForSkipHeader() throws InterruptedException {
10511051
log.info("test SiddhiIoFile header.present parameter Test");
10521052
String streams = "" +
@@ -1082,7 +1082,7 @@ public void receive(Event[] events) {
10821082
siddhiAppRuntime.shutdown();
10831083
}
10841084

1085-
@Test(expectedExceptions = SiddhiAppCreationException.class)
1085+
@Test
10861086
public void siddhiIoFileTestWithoutSkipHeader() throws InterruptedException {
10871087
log.info("test SiddhiIoFile without header.present parameter Test");
10881088
String streams = "" +

0 commit comments

Comments
 (0)