Skip to content

Commit d9f9147

Browse files
r1bpvillard31
authored andcommitted
NIFI-15474 Support timestamp truncation in RecordPath DSL
This closes #10796. Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
1 parent 072fb85 commit d9f9147

File tree

13 files changed

+788
-2
lines changed

13 files changed

+788
-2
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.functions;
18+
19+
import org.apache.nifi.record.path.FieldValue;
20+
import org.apache.nifi.record.path.RecordPathEvaluationContext;
21+
import org.apache.nifi.record.path.math.MathBinaryEvaluator;
22+
import org.apache.nifi.record.path.paths.RecordPathSegment;
23+
24+
import java.util.stream.Stream;
25+
26+
public class Divide extends RecordPathSegment {
27+
private final RecordPathSegment lhsPath;
28+
private final RecordPathSegment rhsPath;
29+
private final MathBinaryEvaluator divide = MathBinaryEvaluator.divide();
30+
31+
public Divide(final RecordPathSegment lhsPath, final RecordPathSegment rhsPath, final boolean absolute) {
32+
super("divide", null, absolute);
33+
this.lhsPath = lhsPath;
34+
this.rhsPath = rhsPath;
35+
}
36+
37+
@Override
38+
public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
39+
final FieldValue lhs = lhsPath.evaluate(context).findFirst().orElseThrow(() -> new IllegalArgumentException("divide function requires a left-hand operand"));
40+
final FieldValue rhs = rhsPath.evaluate(context).findFirst().orElseThrow(() -> new IllegalArgumentException("divide function requires a right-hand operand"));
41+
42+
if (lhs.getValue() == null || rhs.getValue() == null) {
43+
return Stream.of();
44+
}
45+
46+
return Stream.of(divide.evaluate(lhs, rhs));
47+
}
48+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.functions;
18+
19+
import org.apache.nifi.record.path.FieldValue;
20+
import org.apache.nifi.record.path.RecordPathEvaluationContext;
21+
import org.apache.nifi.record.path.math.MathBinaryEvaluator;
22+
import org.apache.nifi.record.path.paths.RecordPathSegment;
23+
24+
import java.util.stream.Stream;
25+
26+
public class Multiply extends RecordPathSegment {
27+
private final RecordPathSegment lhsPath;
28+
private final RecordPathSegment rhsPath;
29+
private final MathBinaryEvaluator multiply = MathBinaryEvaluator.multiply();
30+
31+
public Multiply(final RecordPathSegment lhsPath, final RecordPathSegment rhsPath, final boolean absolute) {
32+
super("multiply", null, absolute);
33+
this.lhsPath = lhsPath;
34+
this.rhsPath = rhsPath;
35+
}
36+
37+
@Override
38+
public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
39+
final FieldValue lhs = lhsPath.evaluate(context).findFirst().orElseThrow(() -> new IllegalArgumentException("multiply function requires a left-hand operand"));
40+
final FieldValue rhs = rhsPath.evaluate(context).findFirst().orElseThrow(() -> new IllegalArgumentException("multiply function requires a right-hand operand"));
41+
42+
if (lhs.getValue() == null || rhs.getValue() == null) {
43+
return Stream.of();
44+
}
45+
46+
return Stream.of(multiply.evaluate(lhs, rhs));
47+
}
48+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.functions;
18+
19+
import org.apache.nifi.record.path.FieldValue;
20+
import org.apache.nifi.record.path.RecordPathEvaluationContext;
21+
import org.apache.nifi.record.path.math.MathTypeUtils;
22+
import org.apache.nifi.record.path.paths.RecordPathSegment;
23+
24+
import java.util.stream.Stream;
25+
26+
public class ToNumber extends RecordPathSegment {
27+
private final RecordPathSegment valuePath;
28+
29+
public ToNumber(final RecordPathSegment valuePath, final boolean absolute) {
30+
super("toNumber", null, absolute);
31+
this.valuePath = valuePath;
32+
}
33+
34+
@Override
35+
public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
36+
final FieldValue fieldValue = valuePath.evaluate(context).findFirst().orElseThrow(() -> new IllegalArgumentException("toNumber function requires an operand"));
37+
final Object value = fieldValue.getValue();
38+
39+
if (value == null || value instanceof Number) {
40+
return Stream.of(fieldValue);
41+
} else {
42+
return Stream.of(MathTypeUtils.toNumber(fieldValue));
43+
}
44+
}
45+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.math;
18+
19+
import org.apache.nifi.record.path.FieldValue;
20+
import org.apache.nifi.record.path.StandardFieldValue;
21+
import org.apache.nifi.serialization.record.DataType;
22+
import org.apache.nifi.serialization.record.RecordField;
23+
import org.apache.nifi.serialization.record.RecordFieldType;
24+
25+
public class MathBinaryEvaluator extends MathEvaluator<MathBinaryOperator> {
26+
public MathBinaryEvaluator(MathBinaryOperator op) {
27+
super(op);
28+
}
29+
30+
public static MathBinaryEvaluator divide() {
31+
return new MathBinaryEvaluator(new MathDivideOperator());
32+
}
33+
34+
public static MathBinaryEvaluator multiply() {
35+
return new MathBinaryEvaluator(new MathMultiplyOperator());
36+
}
37+
38+
public FieldValue evaluate(FieldValue lhs, FieldValue rhs) {
39+
final Number lhsValue = MathTypeUtils.coerceNumber(lhs);
40+
final Number rhsValue = MathTypeUtils.coerceNumber(rhs);
41+
42+
Number result;
43+
DataType resultType;
44+
45+
if (MathTypeUtils.isLongCompatible(lhsValue) && MathTypeUtils.isLongCompatible(rhsValue)) {
46+
result = op.operate(lhsValue.longValue(), rhsValue.longValue());
47+
resultType = RecordFieldType.LONG.getDataType();
48+
} else {
49+
result = op.operate(lhsValue.doubleValue(), rhsValue.doubleValue());
50+
resultType = RecordFieldType.DOUBLE.getDataType();
51+
}
52+
53+
return new StandardFieldValue(result, new RecordField(op.getFieldName(), resultType), null);
54+
}
55+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.math;
18+
19+
public interface MathBinaryOperator extends MathOperator {
20+
Long operate(Long n, Long m);
21+
Double operate(Double n, Double m);
22+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.math;
18+
19+
public class MathDivideOperator implements MathBinaryOperator {
20+
@Override
21+
public Long operate(Long n, Long m) {
22+
if (m == 0L) {
23+
throw new ArithmeticException("Division by zero in RecordPath divide function");
24+
}
25+
return n / m;
26+
}
27+
28+
@Override
29+
public Double operate(Double n, Double m) {
30+
if (m == 0.0) {
31+
throw new ArithmeticException("Division by zero in RecordPath divide function");
32+
}
33+
return n / m;
34+
}
35+
36+
@Override
37+
public String getFieldName() {
38+
return "divide";
39+
}
40+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.math;
18+
19+
public class MathEvaluator<T extends MathOperator> {
20+
protected final T op;
21+
22+
public MathEvaluator(T op) {
23+
this.op = op;
24+
}
25+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.math;
18+
19+
public class MathMultiplyOperator implements MathBinaryOperator {
20+
@Override
21+
public Long operate(Long n, Long m) {
22+
return n * m;
23+
}
24+
25+
@Override
26+
public Double operate(Double n, Double m) {
27+
return n * m;
28+
}
29+
30+
@Override
31+
public String getFieldName() {
32+
return "multiply";
33+
}
34+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.record.path.math;
18+
19+
public interface MathOperator {
20+
String getFieldName();
21+
}

0 commit comments

Comments
 (0)