diff --git a/flink-connector-rabbitmq/README.md b/flink-connector-rabbitmq/README.md index de8d1d89c69..c01cd60b11b 100644 --- a/flink-connector-rabbitmq/README.md +++ b/flink-connector-rabbitmq/README.md @@ -1,4 +1,4 @@ -# License of the Rabbit MQ Connector +# License of the RabbitMQ Connector Flink's RabbitMQ connector defines a Maven dependency on the "RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). diff --git a/flink-connector-rabbitmq/pom.xml b/flink-connector-rabbitmq/pom.xml index 22d0c3acbef..80152f7e52a 100644 --- a/flink-connector-rabbitmq/pom.xml +++ b/flink-connector-rabbitmq/pom.xml @@ -41,6 +41,14 @@ under the License. + + + + org.apache.flink + flink-connector-base + ${flink.version} + + org.apache.flink flink-streaming-java @@ -48,12 +56,16 @@ under the License. provided + + com.rabbitmq amqp-client ${rabbitmq.version} + + org.apache.flink flink-streaming-java @@ -62,6 +74,13 @@ under the License. test-jar + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + + org.apache.flink flink-runtime @@ -88,6 +107,12 @@ under the License. + + org.testcontainers + rabbitmq + test + + diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/ConsistencyMode.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/ConsistencyMode.java new file mode 100644 index 00000000000..2575cc5e46a --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/ConsistencyMode.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.common; + +/** + * The different consistency modes that can be defined for the sink and source individually. + * + *

The available consistency modes are as follows. + * + *

+ * + *

Note that the higher the consistency guarantee gets, fewer messages can be processed by the + * system. At-least-once and exactly-once should only be used if necessary. + */ +public enum ConsistencyMode { + AT_MOST_ONCE, + AT_LEAST_ONCE, + EXACTLY_ONCE, +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java new file mode 100644 index 00000000000..7a634a80236 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/common/RabbitMQConnectionConfig.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.connector.rabbitmq.source.RabbitMQSource; +import org.apache.flink.util.Preconditions; + +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +import static java.util.Objects.requireNonNull; + +/** + * This class is copied from the previous RabbitMQ connector. Connection Configuration for RMQ. If + * {@link Builder#setUri(String)} has been set then {@link + * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, + * Integer, Integer, Integer, Integer)} will be used to initialize the RMQ connection or {@link + * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, String, String, String, + * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)} will be used to + * initialize the RMQ connection. + */ +public class RabbitMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConnectionConfig.class); + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private final @Nullable Integer networkRecoveryInterval; + private final @Nullable Boolean automaticRecovery; + private final @Nullable Boolean topologyRecovery; + + private final @Nullable Integer connectionTimeout; + private final @Nullable Integer requestedChannelMax; + private final @Nullable Integer requestedFrameMax; + private final @Nullable Integer requestedHeartbeat; + + private final @Nullable Integer prefetchCount; + + /** + * @param host host name + * @param port port + * @param virtualHost virtual host + * @param username username + * @param password password + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if host or virtual host or username or password is null + */ + private RabbitMQConnectionConfig( + String host, + Integer port, + String virtualHost, + String username, + String password, + Integer networkRecoveryInterval, + Boolean automaticRecovery, + Boolean topologyRecovery, + Integer connectionTimeout, + Integer requestedChannelMax, + Integer requestedFrameMax, + Integer requestedHeartbeat, + Integer prefetchCount) { + this.host = requireNonNull(host); + this.port = requireNonNull(port); + this.virtualHost = requireNonNull(virtualHost); + this.username = requireNonNull(username); + this.password = requireNonNull(password); + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + this.prefetchCount = prefetchCount; + } + + /** + * @param uri the connection URI + * @param networkRecoveryInterval connection recovery interval in milliseconds + * @param automaticRecovery if automatic connection recovery + * @param topologyRecovery if topology recovery + * @param connectionTimeout connection timeout + * @param requestedChannelMax requested maximum channel number + * @param requestedFrameMax requested maximum frame size + * @param requestedHeartbeat requested heartbeat interval + * @throws NullPointerException if URI is null + */ + private RabbitMQConnectionConfig( + String uri, + Integer networkRecoveryInterval, + Boolean automaticRecovery, + Boolean topologyRecovery, + Integer connectionTimeout, + Integer requestedChannelMax, + Integer requestedFrameMax, + Integer requestedHeartbeat, + Integer prefetchCount) { + + this.uri = Preconditions.checkNotNull(uri, "Uri can not be null"); + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + this.prefetchCount = prefetchCount; + } + + /** @return the host to use for connections */ + public String getHost() { + return host; + } + + /** @return the port to use for connections */ + public int getPort() { + return port; + } + + /** + * Retrieve the virtual host. + * + * @return the virtual host to use when connecting to the broker + */ + public String getVirtualHost() { + return virtualHost; + } + + /** + * Retrieve the user name. + * + * @return the AMQP user name to use when connecting to the broker + */ + public String getUsername() { + return username; + } + + /** + * Retrieve the password. + * + * @return the password to use when connecting to the broker + */ + public String getPassword() { + return password; + } + + /** + * Retrieve the URI. + * + * @return the connection URI when connecting to the broker + */ + public String getUri() { + return uri; + } + + /** + * Returns automatic connection recovery interval in milliseconds. + * + * @return how long will automatic recovery wait before attempting to reconnect, in ms; default + * is 5000 + */ + public @Nullable Integer getNetworkRecoveryInterval() { + return networkRecoveryInterval; + } + + /** + * Returns true if automatic connection recovery is enabled, false otherwise. + * + * @return true if automatic connection recovery is enabled, false otherwise + */ + public @Nullable Boolean isAutomaticRecovery() { + return automaticRecovery; + } + + /** + * Returns true if topology recovery is enabled, false otherwise. + * + * @return true if topology recovery is enabled, false otherwise + */ + public @Nullable Boolean isTopologyRecovery() { + return topologyRecovery; + } + + /** + * Retrieve the connection timeout. + * + * @return the connection timeout, in milliseconds; zero for infinite + */ + public @Nullable Integer getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Retrieve the requested maximum channel number. + * + * @return the initially requested maximum channel number; zero for unlimited + */ + public @Nullable Integer getRequestedChannelMax() { + return requestedChannelMax; + } + + /** + * Retrieve the requested maximum frame size. + * + * @return the initially requested maximum frame size, in octets; zero for unlimited + */ + public @Nullable Integer getRequestedFrameMax() { + return requestedFrameMax; + } + + /** + * Retrieve the requested heartbeat interval. + * + * @return the initially requested heartbeat interval, in seconds; zero for none + */ + public @Nullable Integer getRequestedHeartbeat() { + return requestedHeartbeat; + } + + /** + * Retrieve the the channel prefetch count. + * + * @return an integer that represents the prefetch count, if not null, for the consumer channel + */ + public @Nullable Integer getPrefetchCount() { + return prefetchCount; + } + + /** + * @return Connection Factory for RMQ + * @throws URISyntaxException if Malformed URI has been passed + * @throws NoSuchAlgorithmException if the ssl factory could not be created + * @throws KeyManagementException if the ssl context could not be initialized + */ + public ConnectionFactory getConnectionFactory() + throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { + ConnectionFactory factory = new ConnectionFactory(); + if (this.uri != null && !this.uri.isEmpty()) { + try { + factory.setUri(this.uri); + } catch (URISyntaxException e) { + LOG.error("Failed to parse uri", e); + throw e; + } catch (KeyManagementException e) { + // this should never happen + LOG.error("Failed to initialize ssl context.", e); + throw e; + } catch (NoSuchAlgorithmException e) { + // this should never happen + LOG.error("Failed to setup ssl factory.", e); + throw e; + } + } else { + factory.setHost(this.host); + factory.setPort(this.port); + factory.setVirtualHost(this.virtualHost); + factory.setUsername(this.username); + factory.setPassword(this.password); + } + + if (this.automaticRecovery != null) { + factory.setAutomaticRecoveryEnabled(this.automaticRecovery); + } + if (this.connectionTimeout != null) { + factory.setConnectionTimeout(this.connectionTimeout); + } + if (this.networkRecoveryInterval != null) { + factory.setNetworkRecoveryInterval(this.networkRecoveryInterval); + } + if (this.requestedHeartbeat != null) { + factory.setRequestedHeartbeat(this.requestedHeartbeat); + } + if (this.topologyRecovery != null) { + factory.setTopologyRecoveryEnabled(this.topologyRecovery); + } + if (this.requestedChannelMax != null) { + factory.setRequestedChannelMax(this.requestedChannelMax); + } + if (this.requestedFrameMax != null) { + factory.setRequestedFrameMax(this.requestedFrameMax); + } + + return factory; + } + + /** The Builder Class for {@link RabbitMQConnectionConfig}. */ + public static class Builder { + + private String host; + private Integer port; + private String virtualHost; + private String username; + private String password; + + private Integer networkRecoveryInterval; + private Boolean automaticRecovery; + private Boolean topologyRecovery; + + private Integer connectionTimeout; + private Integer requestedChannelMax; + private Integer requestedFrameMax; + private Integer requestedHeartbeat; + + // basicQos options for consumers + private Integer prefetchCount; + + private String uri; + + /** + * Set the target port. + * + * @param port the default port to use for connections + * @return the Builder + */ + public Builder setPort(int port) { + this.port = port; + return this; + } + + /** + * @param host the default host to use for connections + * @return the Builder + */ + public Builder setHost(String host) { + this.host = host; + return this; + } + + /** + * Set the virtual host. + * + * @param virtualHost the virtual host to use when connecting to the broker + * @return the Builder + */ + public Builder setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + return this; + } + + /** + * Set the user name. + * + * @param username the AMQP user name to use when connecting to the broker + * @return the Builder + */ + public Builder setUserName(String username) { + this.username = username; + return this; + } + + /** + * Set the password. + * + * @param password the password to use when connecting to the broker + * @return the Builder + */ + public Builder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Convenience method for setting the fields in an AMQP URI: host, port, username, password + * and virtual host. If any part of the URI is omitted, the ConnectionFactory's + * corresponding variable is left unchanged. + * + * @param uri is the AMQP URI containing the data + * @return the Builder + */ + public Builder setUri(String uri) { + this.uri = uri; + return this; + } + + /** + * Enables or disables topology recovery. + * + * @param topologyRecovery if true, enables topology recovery + * @return the Builder + */ + public Builder setTopologyRecoveryEnabled(boolean topologyRecovery) { + this.topologyRecovery = topologyRecovery; + return this; + } + + /** + * Set the requested heartbeat. + * + * @param requestedHeartbeat the initially requested heartbeat interval, in seconds; zero + * for none + * @return the Builder + */ + public Builder setRequestedHeartbeat(int requestedHeartbeat) { + this.requestedHeartbeat = requestedHeartbeat; + return this; + } + + /** + * Set the requested maximum frame size. + * + * @param requestedFrameMax initially requested maximum frame size, in octets; zero for + * unlimited + * @return the Builder + */ + public Builder setRequestedFrameMax(int requestedFrameMax) { + this.requestedFrameMax = requestedFrameMax; + return this; + } + + /** + * Set the requested maximum channel number. + * + * @param requestedChannelMax initially requested maximum channel number; zero for unlimited + */ + public Builder setRequestedChannelMax(int requestedChannelMax) { + this.requestedChannelMax = requestedChannelMax; + return this; + } + + /** + * Sets connection recovery interval. Default is 5000. + * + * @param networkRecoveryInterval how long will automatic recovery wait before attempting to + * reconnect, in ms + * @return the Builder + */ + public Builder setNetworkRecoveryInterval(int networkRecoveryInterval) { + this.networkRecoveryInterval = networkRecoveryInterval; + return this; + } + + /** + * Set the connection timeout. + * + * @param connectionTimeout connection establishment timeout in milliseconds; zero for + * infinite + * @return the Builder + */ + public Builder setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + return this; + } + + /** + * Enables or disables automatic connection recovery. + * + * @param automaticRecovery if true, enables connection recovery + * @return the Builder + */ + public Builder setAutomaticRecovery(boolean automaticRecovery) { + this.automaticRecovery = automaticRecovery; + return this; + } + + /** + * Enables setting basicQos for the consumer channel. Only applicable to the {@link + * RabbitMQSource}. Set to 0 for unlimited, which is the default. + * + * @see Consumer Prefetch + * @see Channel + * Prefetch (QoS) + * @param prefetchCount the max number of messages to receive without acknowledgement. + * @return the Builder + */ + public Builder setPrefetchCount(int prefetchCount) { + this.prefetchCount = prefetchCount; + return this; + } + + /** + * The Builder method. + * + *

If URI is NULL we use host, port, vHost, username, password combination to initialize + * connection. using {@link RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, + * Integer, String, String, String, Integer, Boolean, Boolean, Integer, Integer, Integer, + * Integer, Integer)}. + * + *

Otherwise the URI will be used to initialize the client connection {@link + * RabbitMQConnectionConfig#RabbitMQConnectionConfig(String, Integer, Boolean, Boolean, + * Integer, Integer, Integer, Integer, Integer)} + * + * @return RMQConnectionConfig + */ + public RabbitMQConnectionConfig build() { + if (this.uri != null) { + return new RabbitMQConnectionConfig( + this.uri, + this.networkRecoveryInterval, + this.automaticRecovery, + this.topologyRecovery, + this.connectionTimeout, + this.requestedChannelMax, + this.requestedFrameMax, + this.requestedHeartbeat, + this.prefetchCount); + } else { + return new RabbitMQConnectionConfig( + this.host, + this.port, + this.virtualHost, + this.username, + this.password, + this.networkRecoveryInterval, + this.automaticRecovery, + this.topologyRecovery, + this.connectionTimeout, + this.requestedChannelMax, + this.requestedFrameMax, + this.requestedHeartbeat, + this.prefetchCount); + } + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/README.md b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/README.md new file mode 100644 index 00000000000..eaee8d0ffea --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/README.md @@ -0,0 +1,14 @@ +# License of the RabbitMQ Connector + +Flink's RabbitMQ connector defines a Maven dependency on the +"RabbitMQ AMQP Java Client", is triple-licensed under the Mozilla Public License 1.1 ("MPL"), +the GNU General Public License version 2 ("GPL") and the Apache License version 2 ("ASL"). + +Flink itself neither reuses source code from the "RabbitMQ AMQP Java Client" +nor packages binaries from the "RabbitMQ AMQP Java Client". + +Users that create and publish derivative work based on Flink's +RabbitMQ connector (thereby re-distributing the "RabbitMQ AMQP Java Client") +must be aware that this may be subject to conditions declared in the +Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 ("GPL") +and the Apache License version 2 ("ASL"). diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSource.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSource.java new file mode 100644 index 00000000000..076790eef9b --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSource.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.rabbitmq.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumState; +import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumStateSerializer; +import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumerator; +import org.apache.flink.connector.rabbitmq.source.reader.RabbitMQSourceReaderBase; +import org.apache.flink.connector.rabbitmq.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce; +import org.apache.flink.connector.rabbitmq.source.reader.specialized.RabbitMQSourceReaderAtMostOnce; +import org.apache.flink.connector.rabbitmq.source.reader.specialized.RabbitMQSourceReaderExactlyOnce; +import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit; +import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It provides + * at-most-once, at-least-once and exactly-once processing semantics. For at-least-once and + * exactly-once, checkpointing needs to be enabled. The source operates as a StreamingSource and + * thus works in a streaming fashion. Please use a {@link RabbitMQSourceBuilder} to construct the + * source. The following example shows how to create a RabbitMQSource emitting records of + * String type. + * + *

{@code
+ * RabbitMQSource source = RabbitMQSource
+ *     .builder()
+ *     .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+ *     .setQueueName("myQueue")
+ *     .setDeliveryDeserializer(new SimpleStringSchema())
+ *     .setConsistencyMode(MY_CONSISTENCY_MODE)
+ *     .build();
+ * }
+ * + *

When creating the source a {@code connectionConfig} must be specified via {@link + * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to + * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a + * password and a port. Besides that, the {@code queueName} to consume from and a {@link + * DeserializationSchema} + * + *

When using at-most-once consistency, messages are automatically acknowledged when received + * from RabbitMQ and later consumed by the output. In case of a failure, messages might be lost. + * More details in {@link RabbitMQSourceReaderAtMostOnce}. + * + *

In case of at-least-once consistency, message are buffered and later consumed by the output. + * Once a checkpoint is finished, the messages that were consumed by the output are acknowledged to + * RabbitMQ. This way, we ensure that the messages are successfully received by the output. In case + * of a system failure, the message that were acknowledged to RabbitMQ will be resend by RabbitMQ. + * More details in {@link RabbitMQSourceReaderAtLeastOnce}. + * + *

To ensure exactly-once consistency, messages are deduplicated through {@code correlationIds}. + * Similar to at-least-once consistency, we store the {@code deliveryTags} of the messages that are + * consumed by the output to acknowledge them later. A transactional RabbitMQ channel is used to + * ensure that all messages are successfully acknowledged to RabbitMQ. More details in {@link + * RabbitMQSourceReaderExactlyOnce}. + * + *

Keep in mind that the transactional channels are heavyweight and performance will drop. Under + * heavy load, checkpoints can be delayed if a transaction takes longer than the specified + * checkpointing interval. + * + * @param the output type of the source. + */ +public class RabbitMQSource + implements Source, ResultTypeQueryable { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSource.class); + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private final DeserializationSchema deserializationSchema; + private final ConsistencyMode consistencyMode; + + private RabbitMQSource( + RabbitMQConnectionConfig connectionConfig, + String queueName, + DeserializationSchema deserializationSchema, + ConsistencyMode consistencyMode) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.deserializationSchema = requireNonNull(deserializationSchema); + this.consistencyMode = requireNonNull(consistencyMode); + + LOG.info("Create RabbitMQ source"); + } + + /** + * Get a {@link RabbitMQSourceBuilder} for the source. + * + * @param type of the source. + * @return a source builder + * @see RabbitMQSourceBuilder + */ + public static RabbitMQSourceBuilder builder() { + return new RabbitMQSourceBuilder<>(); + } + + /** + * The boundedness is always continuous unbounded as this is a streaming-only source. + * + * @return Boundedness continuous unbounded. + * @see Boundedness + */ + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + /** + * Returns a new initialized source reader of the source's consistency mode. + * + * @param sourceReaderContext context which the reader will be created in. + * @return RabbitMQSourceReader a source reader of the specified consistency type. + * @see RabbitMQSourceReaderBase + */ + @Override + public SourceReader createReader( + SourceReaderContext sourceReaderContext) { + LOG.info("New Source Reader of type {} requested.", consistencyMode); + switch (consistencyMode) { + case AT_MOST_ONCE: + return new RabbitMQSourceReaderAtMostOnce<>( + sourceReaderContext, deserializationSchema); + case AT_LEAST_ONCE: + return new RabbitMQSourceReaderAtLeastOnce<>( + sourceReaderContext, deserializationSchema); + case EXACTLY_ONCE: + return new RabbitMQSourceReaderExactlyOnce<>( + sourceReaderContext, deserializationSchema); + default: + throw new IllegalStateException( + "Error in creating a SourceReader: No valid consistency mode (" + + consistencyMode + + ") was specified."); + } + } + + /** + * @param splitEnumeratorContext context which the enumerator will be created in + * @return a new split enumerator + * @see SplitEnumerator + */ + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext splitEnumeratorContext) { + return new RabbitMQSourceEnumerator( + splitEnumeratorContext, consistencyMode, connectionConfig, queueName); + } + + /** + * @param splitEnumeratorContext context which the enumerator will be created in + * @param enumState enum state the + * @return a new split enumerator + * @see SplitEnumerator + */ + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext splitEnumeratorContext, + RabbitMQSourceEnumState enumState) { + return new RabbitMQSourceEnumerator( + splitEnumeratorContext, consistencyMode, connectionConfig, queueName, enumState); + } + + /** + * @return a simple serializer for a RabbitMQPartitionSplit + * @see SimpleVersionedSerializer + */ + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new RabbitMQSourceSplitSerializer(); + } + + /** + * @return a simple serializer for a RabbitMQSourceEnumState + * @see SimpleVersionedSerializer + */ + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new RabbitMQSourceEnumStateSerializer(); + } + + /** + * @return type information + * @see TypeInformation + */ + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } + + /** + * A @builder class to simplify the creation of a {@link RabbitMQSource}. + * + *

The following example shows the minimum setup to create a RabbitMQSource that reads String + * messages from a Queue. + * + *

{@code
+     * RabbitMQSource source = RabbitMQSource
+     *     .builder()
+     *     .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG)
+     *     .setQueueName("myQueue")
+     *     .setDeliveryDeserializer(new SimpleStringSchema())
+     *     .setConsistencyMode(MY_CONSISTENCY_MODE)
+     *     .build();
+     * }
+ * + *

For details about the connection config refer to {@link RabbitMQConnectionConfig}. For + * details about the available consistency modes refer to {@link ConsistencyMode}. + * + * @param the output type of the source. + */ + public static class RabbitMQSourceBuilder { + // The configuration for the RabbitMQ connection. + private RabbitMQConnectionConfig connectionConfig; + // Name of the queue to consume from. + private String queueName; + // The deserializer for the messages of RabbitMQ. + private DeserializationSchema deserializationSchema; + // The consistency mode for the source. + private ConsistencyMode consistencyMode; + + /** + * Build the {@link RabbitMQSource}. + * + * @return a RabbitMQSource with the configuration set for this builder. + */ + public RabbitMQSource build() { + return new RabbitMQSource<>( + connectionConfig, queueName, deserializationSchema, consistencyMode); + } + + /** + * Set the connection config for RabbitMQ. + * + * @param connectionConfig the connection configuration for RabbitMQ. + * @return this RabbitMQSourceBuilder + * @see RabbitMQConnectionConfig + */ + public RabbitMQSourceBuilder setConnectionConfig( + RabbitMQConnectionConfig connectionConfig) { + this.connectionConfig = connectionConfig; + return this; + } + + /** + * Set the name of the queue to consume from. + * + * @param queueName the name of the queue to consume from. + * @return this RabbitMQSourceBuilder + */ + public RabbitMQSourceBuilder setQueueName(String queueName) { + this.queueName = queueName; + return this; + } + + /** + * Set the deserializer for the message deliveries from RabbitMQ. + * + * @param deserializationSchema a deserializer for the message deliveries from RabbitMQ. + * @return this RabbitMQSourceBuilder + * @see DeserializationSchema + */ + public RabbitMQSourceBuilder setDeserializationSchema( + DeserializationSchema deserializationSchema) { + this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema); + return this; + } + + /** + * Set the consistency mode for the source. + * + * @param consistencyMode the consistency mode for the source. + * @return this RabbitMQSourceBuilder + * @see ConsistencyMode + */ + public RabbitMQSourceBuilder setConsistencyMode(ConsistencyMode consistencyMode) { + this.consistencyMode = consistencyMode; + return this; + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/common/RabbitMQSourceMessageWrapper.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/common/RabbitMQSourceMessageWrapper.java new file mode 100644 index 00000000000..0289baa72ac --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/common/RabbitMQSourceMessageWrapper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.common; + +import javax.annotation.Nullable; + +/** + * A wrapper class for the message received from RabbitMQ that holds the deserialized message, the + * delivery tag and the correlation id. + * + * @param The type of the message to hold. + */ +public class RabbitMQSourceMessageWrapper { + private final long deliveryTag; + private final String correlationId; + private final T message; + + public RabbitMQSourceMessageWrapper( + long deliveryTag, @Nullable String correlationId, @Nullable T message) { + this.deliveryTag = deliveryTag; + this.correlationId = correlationId; + this.message = message; + } + + public RabbitMQSourceMessageWrapper(long deliveryTag, String correlationId) { + this(deliveryTag, correlationId, null); + } + + public long getDeliveryTag() { + return deliveryTag; + } + + public T getMessage() { + return message; + } + + public String getCorrelationId() { + return correlationId; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumState.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumState.java new file mode 100644 index 00000000000..c7c7592d4ca --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.enumerator; + +/** + * The EnumState is empty because every reader gets assigned the same split. And therefore, no split + * assignment needs to be remembered. + * + * @see RabbitMQSourceEnumerator + */ +public class RabbitMQSourceEnumState {} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumStateSerializer.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumStateSerializer.java new file mode 100644 index 00000000000..b95e1cf734c --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumStateSerializer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.enumerator; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +/** + * The EnumStateSerializer does nothing particular because the EnumState does not contain data. + * + * @see RabbitMQSourceEnumState + */ +public class RabbitMQSourceEnumStateSerializer + implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(RabbitMQSourceEnumState rabbitMQSourceEnumState) { + if (getVersion() == 1) { + return new byte[0]; + } + throw new RuntimeException("Version " + getVersion() + " is not supported"); + } + + @Override + public RabbitMQSourceEnumState deserialize(int i, byte[] bytes) { + if (getVersion() == 1) { + return new RabbitMQSourceEnumState(); + } + throw new RuntimeException("Version " + getVersion() + " is not supported"); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumerator.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumerator.java new file mode 100644 index 00000000000..5a708ae607a --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/enumerator/RabbitMQSourceEnumerator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.rabbitmq.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * The source enumerator provides the source readers with the split. All source readers receive the + * same split as it only contains information about the connection and in case of exactly-once, the + * seen correlation ids. But in this case, the enumerator makes sure that at maximum one source + * reader receives the split. During exactly-once if multiple reader should be assigned a split a + * {@link RuntimeException} is thrown. + */ +public class RabbitMQSourceEnumerator + implements SplitEnumerator { + private final SplitEnumeratorContext context; + private final ConsistencyMode consistencyMode; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceEnumerator.class); + private RabbitMQSourceSplit split; + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName, + RabbitMQSourceEnumState enumState) { + // The enumState is not used since the enumerator has no state in this architecture. + this(context, consistencyMode, connectionConfig, rmqQueueName); + } + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName) { + this.context = requireNonNull(context); + this.consistencyMode = requireNonNull(consistencyMode); + this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName); + } + + @Override + public void start() { + LOG.info("Start RabbitMQ source enumerator"); + } + + @Override + public void handleSplitRequest(int i, @Nullable String s) { + LOG.info("Split request from reader {}.", i); + assignSplitToReader(i, split); + } + + @Override + public void addSplitsBack(List list, int i) { + if (list == null || list.size() == 0) { + return; + } + + // Every Source Reader will only receive one split, thus we will never get back more. + if (list.size() != 1) { + throw new RuntimeException( + "There should only be one split added back at a time. per reader"); + } + + LOG.info("Split returned from reader {}.", i); + // In case of exactly-once (parallelism 1) the single split gets updated with the + // correlation ids and in case of a recovery we have to store this split until we can + // assign it to the recovered reader. + split = list.get(0); + } + + /** + * In the case of exactly-once multiple readers are not allowed. + * + * @see RabbitMQSourceEnumerator#assignSplitToReader(int, RabbitMQSourceSplit) + * @param i reader id + */ + @Override + public void addReader(int i) {} + + /** @return empty enum state object */ + @Override + public RabbitMQSourceEnumState snapshotState(long checkpointId) throws Exception { + return new RabbitMQSourceEnumState(); + } + + @Override + public void close() {} + + private void assignSplitToReader(int readerId, RabbitMQSourceSplit split) { + if (consistencyMode == ConsistencyMode.EXACTLY_ONCE && context.currentParallelism() > 1) { + throw new RuntimeException( + "The consistency mode is exactly-once and a parallelism higher than one was defined. " + + "For exactly once a parallelism higher than one is forbidden."); + } + + SplitsAssignment assignment = new SplitsAssignment<>(split, readerId); + context.assignSplits(assignment); + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQCollector.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQCollector.java new file mode 100644 index 00000000000..1886dc8f75e --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQCollector.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.reader; + +import org.apache.flink.connector.rabbitmq.source.common.RabbitMQSourceMessageWrapper; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * The collector for the messages received from RabbitMQ. Deserialized receive their identifiers + * through {@link #setMessageIdentifiers(String, long)} before they are collected through {@link + * #collect(Object)}. Messages can be polled in order to be processed by the output. + * + * @param The output type of the source. + * @see RabbitMQSourceMessageWrapper + */ +public class RabbitMQCollector implements Collector { + // Queue that holds the messages. + private final BlockingQueue> unpolledMessageQueue; + // Identifiers of the next message that will be collected. + private long deliveryTag; + private String correlationId; + + public RabbitMQCollector() { + this.unpolledMessageQueue = new LinkedBlockingQueue<>(); + } + + /** @return boolean true if there are messages remaining in the collector. */ + public boolean hasUnpolledMessages() { + return !unpolledMessageQueue.isEmpty(); + } + + /** + * Poll a message from the collector. + * + * @return Message the polled message. + */ + public RabbitMQSourceMessageWrapper pollMessage() { + return unpolledMessageQueue.poll(); + } + + /** + * Sets the correlation id and the delivery tag that corresponds to the records originating from + * the RMQ event. If the correlation id has been processed before, records will not be emitted + * downstream. + * + *

If not set explicitly, the {@link AMQP.BasicProperties#getCorrelationId()} and {@link + * Envelope#getDeliveryTag()} will be used. + */ + public void setMessageIdentifiers(String correlationId, long deliveryTag) { + this.correlationId = correlationId; + this.deliveryTag = deliveryTag; + } + + @Override + public void collect(T record) { + unpolledMessageQueue.add( + new RabbitMQSourceMessageWrapper<>(deliveryTag, correlationId, record)); + } + + @Override + public void close() {} +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java new file mode 100644 index 00000000000..13ef746580c --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/RabbitMQSourceReaderBase.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.reader; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.rabbitmq.source.common.RabbitMQSourceMessageWrapper; +import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumerator; +import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit; +import org.apache.flink.core.io.InputStatus; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.Envelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; + +/** + * The source reader for RabbitMQ queues. This is the base class of the different consistency modes. + * + * @param The output type of the source. + */ +public abstract class RabbitMQSourceReaderBase implements SourceReader { + protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceReaderBase.class); + + // The assigned split from the enumerator. + private RabbitMQSourceSplit split; + + private Connection rmqConnection; + private Channel rmqChannel; + + private final SourceReaderContext sourceReaderContext; + // The deserialization schema for the messages of RabbitMQ. + private final DeserializationSchema deliveryDeserializer; + // The collector keeps the messages received from RabbitMQ. + private final RabbitMQCollector collector; + + public RabbitMQSourceReaderBase( + SourceReaderContext sourceReaderContext, + DeserializationSchema deliveryDeserializer) { + this.sourceReaderContext = requireNonNull(sourceReaderContext); + this.deliveryDeserializer = requireNonNull(deliveryDeserializer); + this.collector = new RabbitMQCollector<>(); + } + + @Override + public void start() { + LOG.info("Starting source reader and send split request"); + sourceReaderContext.sendSplitRequest(); + } + + // ------------- start RabbitMQ methods -------------- + + private void setupRabbitMQ() throws Exception { + setupConnection(); + setupChannel(); + LOG.info( + "RabbitMQ Connection was successful: Waiting for messages from the queue. To exit press CTRL+C"); + } + + private ConnectionFactory setupConnectionFactory() throws Exception { + return split.getConnectionConfig().getConnectionFactory(); + } + + private void setupConnection() throws Exception { + rmqConnection = setupConnectionFactory().newConnection(); + } + + /** @return boolean whether messages should be automatically acknowledged to RabbitMQ. */ + protected abstract boolean isAutoAck(); + + /** + * This function will be called when a new message from RabbitMQ gets pushed to the source. The + * message will be deserialized and forwarded to our message collector where it is buffered + * until it can be processed. + * + * @param consumerTag The consumer tag of the message. + * @param delivery The delivery from RabbitMQ. + * @throws IOException if something fails during deserialization. + */ + protected void handleMessageReceivedCallback(String consumerTag, Delivery delivery) + throws IOException { + + AMQP.BasicProperties properties = delivery.getProperties(); + byte[] body = delivery.getBody(); + Envelope envelope = delivery.getEnvelope(); + collector.setMessageIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag()); + deliveryDeserializer.deserialize(body, collector); + } + + protected void setupChannel() throws IOException { + rmqChannel = rmqConnection.createChannel(); + rmqChannel.queueDeclare(split.getQueueName(), true, false, false, null); + + // Set maximum of unacknowledged messages + Integer prefetchCount = getSplit().getConnectionConfig().getPrefetchCount(); + if (prefetchCount != null) { + // global: false - the prefetch count is set per consumer, not per RabbitMQ channel + rmqChannel.basicQos(prefetchCount, false); + } + + final DeliverCallback deliverCallback = this::handleMessageReceivedCallback; + rmqChannel.basicConsume( + split.getQueueName(), isAutoAck(), deliverCallback, consumerTag -> {}); + } + + // ------------- end RabbitMQ methods -------------- + + /** + * This method provides a hook that is called when a message gets polled by the output. + * + * @param message the message that was polled by the output. + */ + protected void handleMessagePolled(RabbitMQSourceMessageWrapper message) {} + + @Override + public InputStatus pollNext(ReaderOutput output) { + RabbitMQSourceMessageWrapper message = collector.pollMessage(); + if (message == null) { + return InputStatus.NOTHING_AVAILABLE; + } + + output.collect(message.getMessage()); + handleMessagePolled(message); + + return collector.hasUnpolledMessages() + ? InputStatus.MORE_AVAILABLE + : InputStatus.NOTHING_AVAILABLE; + } + + @Override + public List snapshotState(long checkpointId) { + return split != null ? Collections.singletonList(split.copy()) : Collections.emptyList(); + } + + @Override + public CompletableFuture isAvailable() { + return CompletableFuture.runAsync( + () -> { + while (!collector.hasUnpolledMessages()) { + // supposed to be empty + } + }); + /* + * a) It runs in a non-specified thread pool, + * which means it runs in the JVMs common pool which may also be in use + * by other components. Use a dedicated executor. + * b) It hot-loops, which both blocks an entire thread from doing + * anything and blows through CPU cycles. Consider restructuring the + * collector to return a sort of availability future that is completed once a message + * was added, or use basic locking to at least prevent hot-looping. + */ + } + + /** + * Assign the split from the enumerator. If the source reader already has a split nothing + * happens. After the split is assigned, the connection to RabbitMQ can be setup. + * + * @param list RabbitMQSourceSplits with only one element. + * @see RabbitMQSourceEnumerator + * @see RabbitMQSourceSplit + */ + @Override + public void addSplits(List list) { + if (split != null) { + return; + } + if (list.size() != 1) { + throw new RuntimeException("The number of added splits should be exaclty one."); + } + split = list.get(0); + try { + setupRabbitMQ(); + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void notifyNoMoreSplits() {} + + @Override + public void notifyCheckpointComplete(long checkpointId) throws IOException {} + + /** + * Acknowledge a list of message ids in the RabbitMQ channel. + * + * @param messageIds ids that will be acknowledged. + * @throws RuntimeException if an error occurs during the acknowledgement. + */ + protected void acknowledgeMessageIds(List messageIds) throws IOException { + for (long id : messageIds) { + rmqChannel.basicAck(id, false); + } + } + + @Override + public void notifyCheckpointAborted(long checkpointId) {} + + @Override + public void close() throws Exception { + LOG.info("Close source reader"); + if (getSplit() == null) { + return; + } + + if (rmqChannel != null) { + rmqChannel.close(); + } + + if (rmqConnection != null) { + rmqConnection.close(); + } + } + + protected Channel getRmqChannel() { + return rmqChannel; + } + + protected RabbitMQSourceSplit getSplit() { + return split; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java new file mode 100644 index 00000000000..966644cc969 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtLeastOnce.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.reader.specialized; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.rabbitmq.source.common.RabbitMQSourceMessageWrapper; +import org.apache.flink.connector.rabbitmq.source.reader.RabbitMQSourceReaderBase; +import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * The RabbitMQSourceReaderAtLeastOnce provides at-least-once guarantee. The deliveryTag from the + * received messages are used to acknowledge the messages once it is assured that they are safely + * consumed by the output. This means that the deliveryTags of the messages that were polled by the + * output are stored separately. Once a snapshot is executed the deliveryTags get associated with + * the checkpoint id. When the checkpoint is completed successfully, all messages from before are + * acknowledged. In the case of a system failure and a successful restart, the messages that are + * unacknowledged, are resend by RabbitMQ. This way at-least-once is guaranteed. + * + *

In order for the at-least-once source reader to work, checkpointing needs to be enabled. + * + * @param The output type of the source. + * @see RabbitMQSourceReaderBase + */ +public class RabbitMQSourceReaderAtLeastOnce extends RabbitMQSourceReaderBase { + // DeliveryTags which corresponding messages were polled by the output since the last + // checkpoint. + private final List polledAndUnacknowledgedMessageIds; + // List of tuples of checkpoint id and deliveryTags that were polled by the output since the + // last checkpoint. + private final BlockingQueue>> + polledAndUnacknowledgedMessageIdsPerCheckpoint; + + public RabbitMQSourceReaderAtLeastOnce( + SourceReaderContext sourceReaderContext, + DeserializationSchema deliveryDeserializer) { + super(sourceReaderContext, deliveryDeserializer); + this.polledAndUnacknowledgedMessageIds = Collections.synchronizedList(new ArrayList<>()); + this.polledAndUnacknowledgedMessageIdsPerCheckpoint = new LinkedBlockingQueue<>(); + } + + @Override + protected boolean isAutoAck() { + return false; + } + + @Override + protected void handleMessagePolled(RabbitMQSourceMessageWrapper message) { + this.polledAndUnacknowledgedMessageIds.add(message.getDeliveryTag()); + } + + @Override + public List snapshotState(long checkpointId) { + Tuple2> tuple = + new Tuple2<>(checkpointId, polledAndUnacknowledgedMessageIds); + polledAndUnacknowledgedMessageIdsPerCheckpoint.add(tuple); + polledAndUnacknowledgedMessageIds.clear(); + + return super.snapshotState(checkpointId); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws IOException { + Iterator>> checkpointIterator = + polledAndUnacknowledgedMessageIdsPerCheckpoint.iterator(); + while (checkpointIterator.hasNext()) { + final Tuple2> nextCheckpoint = checkpointIterator.next(); + long nextCheckpointId = nextCheckpoint.f0; + if (nextCheckpointId <= checkpointId) { + acknowledgeMessageIds(nextCheckpoint.f1); + checkpointIterator.remove(); + } + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java new file mode 100644 index 00000000000..73073a336bb --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderAtMostOnce.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.reader.specialized; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.rabbitmq.source.reader.RabbitMQSourceReaderBase; + +/** + * The RabbitMQSourceReaderAtMostOnce provides at-most-once guarantee. Messages are automatically + * acknowledged when received from RabbitMQ and afterwards consumed by the output. In case of a + * failure in Flink messages might be lost. + * + * @param The output type of the source. + * @see RabbitMQSourceReaderBase + */ +public class RabbitMQSourceReaderAtMostOnce extends RabbitMQSourceReaderBase { + + public RabbitMQSourceReaderAtMostOnce( + SourceReaderContext sourceReaderContext, + DeserializationSchema deliveryDeserializer) { + super(sourceReaderContext, deliveryDeserializer); + } + + @Override + protected boolean isAutoAck() { + return true; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java new file mode 100644 index 00000000000..0228376e5da --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/reader/specialized/RabbitMQSourceReaderExactlyOnce.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.reader.specialized; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.connector.rabbitmq.source.common.RabbitMQSourceMessageWrapper; +import org.apache.flink.connector.rabbitmq.source.reader.RabbitMQSourceReaderBase; +import org.apache.flink.connector.rabbitmq.source.split.RabbitMQSourceSplit; +import org.apache.flink.util.Preconditions; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Delivery; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +/** + * The RabbitMQSourceReaderExactlyOnce provides exactly-once guarantee. The deliveryTag from the + * received messages are used to acknowledge the messages once it is assured that they are safely + * consumed by the output. In addition, correlation ids are used to deduplicate messages. Messages + * polled by the output are stored so they can be later acknowledged. During a checkpoint the + * messages that were polled since the last checkpoint are associated with the id of the current + * checkpoint. Once the checkpoint is completed, the messages for the checkpoint are acknowledged in + * a transaction to assure that RabbitMQ successfully receives the acknowledgements. + * + *

In order for the exactly-once source reader to work, checkpointing needs to be enabled and the + * message from RabbitMQ need to have a correlation id. + * + * @param The output type of the source. + * @see RabbitMQSourceReaderBase + */ +public class RabbitMQSourceReaderExactlyOnce extends RabbitMQSourceReaderBase { + // Message that were polled by the output since the last checkpoint was created. + // These messages are currently forward but not yet acknowledged to RabbitMQ. + // It needs to be ensured they are persisted before they can be acknowledged and thus be delete + // in RabbitMQ. + private final List> + polledAndUnacknowledgedMessagesSinceLastCheckpoint; + + // All message in polledAndUnacknowledgedMessagesSinceLastCheckpoint will move to hear when + // a new checkpoint is created and therefore the messages can be mapped to it. This mapping is + // necessary to ensure we acknowledge only message which belong to a completed checkpoint. + private final BlockingQueue>>> + polledAndUnacknowledgedMessagesPerCheckpoint; + + // Set of correlation ids that have been seen and are not acknowledged yet. + // The message publisher (who pushes the messages to RabbitMQ) is obligated to set the + // correlation id per message and ensure their uniqueness. + private final ConcurrentHashMap.KeySetView correlationIds; + + public RabbitMQSourceReaderExactlyOnce( + SourceReaderContext sourceReaderContext, + DeserializationSchema deliveryDeserializer) { + super(sourceReaderContext, deliveryDeserializer); + this.polledAndUnacknowledgedMessagesSinceLastCheckpoint = + Collections.synchronizedList(new ArrayList<>()); + this.polledAndUnacknowledgedMessagesPerCheckpoint = new LinkedBlockingQueue<>(); + this.correlationIds = ConcurrentHashMap.newKeySet(); + } + + @Override + protected boolean isAutoAck() { + return false; + } + + @Override + protected void handleMessagePolled(RabbitMQSourceMessageWrapper message) { + System.out.println("handleMessagePolled " + message.getMessage()); + this.polledAndUnacknowledgedMessagesSinceLastCheckpoint.add(message); + } + + @Override + protected void handleMessageReceivedCallback(String consumerTag, Delivery delivery) + throws IOException { + AMQP.BasicProperties properties = delivery.getProperties(); + String correlationId = properties.getCorrelationId(); + Preconditions.checkNotNull( + correlationId, + "RabbitMQ source was instantiated " + + "with consistencyMode set EXACTLY_ONCE yet we couldn't extract the correlation id from it !"); + + Envelope envelope = delivery.getEnvelope(); + long deliveryTag = envelope.getDeliveryTag(); + + if (correlationIds.add(correlationId)) { + // Handle the message only if the correlation id hasn't been seen before. + // The message will follow the normal process and be acknowledge when it got polled. + super.handleMessageReceivedCallback(consumerTag, delivery); + } else { + // Otherwise, store the new delivery-tag for later acknowledgments. The correlation id + // was seen before and therefore this is a duplicate received from RabbitMQ. + // Instead of letting the message to be polled, the message will directly be marked + // to be acknowledged in the next wave of acknowledgments under their new deliveryTag. + polledAndUnacknowledgedMessagesSinceLastCheckpoint.add( + new RabbitMQSourceMessageWrapper<>(deliveryTag, correlationId)); + } + } + + @Override + public List snapshotState(long checkpointId) { + Tuple2>> tuple = + new Tuple2<>( + checkpointId, + new ArrayList<>(polledAndUnacknowledgedMessagesSinceLastCheckpoint)); + polledAndUnacknowledgedMessagesPerCheckpoint.add(tuple); + polledAndUnacknowledgedMessagesSinceLastCheckpoint.clear(); + + if (getSplit() != null) { + getSplit().setCorrelationIds(correlationIds); + } + return super.snapshotState(checkpointId); + } + + @Override + public void addSplits(List list) { + super.addSplits(list); + correlationIds.addAll(getSplit().getCorrelationIds()); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws IOException { + Iterator>>> checkpointIterator = + polledAndUnacknowledgedMessagesPerCheckpoint.iterator(); + while (checkpointIterator.hasNext()) { + final Tuple2>> nextCheckpoint = + checkpointIterator.next(); + long nextCheckpointId = nextCheckpoint.f0; + if (nextCheckpointId <= checkpointId) { + acknowledgeMessages(nextCheckpoint.f1); + checkpointIterator.remove(); + } + } + } + + @Override + protected void setupChannel() throws IOException { + super.setupChannel(); + // enable channel transactional mode + getRmqChannel().txSelect(); + } + + private void acknowledgeMessages(List> messages) + throws IOException { + List correlationIds = + messages.stream() + .map(RabbitMQSourceMessageWrapper::getCorrelationId) + .collect(Collectors.toList()); + this.correlationIds.removeAll(correlationIds); + try { + List deliveryTags = + messages.stream() + .map(RabbitMQSourceMessageWrapper::getDeliveryTag) + .collect(Collectors.toList()); + acknowledgeMessageIds(deliveryTags); + getRmqChannel().txCommit(); + LOG.info("Successfully acknowledged " + deliveryTags.size() + " messages."); + } catch (IOException e) { + LOG.error( + "Error during acknowledgement of " + + correlationIds.size() + + " messages. CorrelationIds will be rolled back. Error: " + + e.getMessage()); + this.correlationIds.addAll(correlationIds); + throw e; + } + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplit.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplit.java new file mode 100644 index 00000000000..975f978d732 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplit.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.split; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.source.enumerator.RabbitMQSourceEnumerator; + +import java.util.HashSet; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +/** + * This split is passed by the {@link RabbitMQSourceEnumerator} to the SourceReader. It contains the + * configuration for the connection and the name of the queue to connect to. In case of exactly-once + * the correlation ids for deduplication of messages might contain data. They are fIt might contain + * data ife single reader fails and a new reader needs to be * created. + */ +public class RabbitMQSourceSplit implements SourceSplit { + + private final RabbitMQConnectionConfig connectionConfig; + private final String rmqQueueName; + private Set correlationIds; + + public RabbitMQSourceSplit(RabbitMQConnectionConfig connectionConfig, String rmqQueueName) { + this(connectionConfig, rmqQueueName, new HashSet<>()); + } + + public RabbitMQSourceSplit( + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName, + Set correlationIds) { + this.connectionConfig = requireNonNull(connectionConfig); + this.rmqQueueName = requireNonNull(rmqQueueName); + this.correlationIds = requireNonNull(correlationIds); + } + + /** + * Create a copy of the the split. + * + * @return RabbitMQSourceSplit which is a copy of this split. + */ + public RabbitMQSourceSplit copy() { + return new RabbitMQSourceSplit( + connectionConfig, rmqQueueName, new HashSet<>(correlationIds)); + } + + /** + * Get the correlation ids specified in the split. + * + * @return Set of all correlation ids. + */ + public Set getCorrelationIds() { + return correlationIds; + } + + /** + * Get the name of the queue to consume from defined in the split. + * + * @return String name of the queue + */ + public String getQueueName() { + return rmqQueueName; + } + + /** + * Get the connection configuration of RabbitMQ defined in the split. + * + * @return RMQConnectionConfig connection configuration of RabbitMQ. + * @see RabbitMQConnectionConfig + */ + public RabbitMQConnectionConfig getConnectionConfig() { + return connectionConfig; + } + + /** + * Set the correlation ids specified in this split. + * + * @param newCorrelationIds the correlation ids that will be set. + */ + public void setCorrelationIds(Set newCorrelationIds) { + correlationIds = newCorrelationIds; + } + + @Override + public String splitId() { + // Is fixed as there will be only one split that is relevant for the enumerator. + return "0"; + } +} diff --git a/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializer.java b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializer.java new file mode 100644 index 00000000000..b129caee962 --- /dev/null +++ b/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.split; + +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashSet; +import java.util.Set; + +/** + * The {@link org.apache.flink.core.io.SimpleVersionedSerializer serializer} for {@link + * RabbitMQSourceSplit}. + * + * @see RabbitMQSourceSplit + */ +public class RabbitMQSourceSplitSerializer + implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(RabbitMQSourceSplit rabbitMQSourceSplit) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + serializeV1(out, rabbitMQSourceSplit); + return baos.toByteArray(); + } + + public void serializeV1(DataOutputStream out, RabbitMQSourceSplit rabbitMQSourceSplit) + throws IOException { + ObjectOutputStream objectOutputStream = new ObjectOutputStream(out); + objectOutputStream.writeObject(rabbitMQSourceSplit.getConnectionConfig()); + out.writeUTF(rabbitMQSourceSplit.getQueueName()); + writeStringSet(out, rabbitMQSourceSplit.getCorrelationIds()); + out.flush(); + } + + @Override + public RabbitMQSourceSplit deserialize(int version, byte[] bytes) throws IOException { + switch (version) { + case 1: + return deserializeV1(bytes); + default: + throw new IOException("Unrecognized version or corrupt state: " + version); + } + } + + public RabbitMQSourceSplit deserializeV1(byte[] bytes) throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream in = new DataInputStream(bais); + ObjectInputStream objectInputStream = new ObjectInputStream(in)) { + RabbitMQConnectionConfig config = + (RabbitMQConnectionConfig) objectInputStream.readObject(); + // Queue names may be up to 255 bytes of UTF-8 characters. + String queueName = in.readUTF(); + Set correlationIds = readStringSet(in); + return new RabbitMQSourceSplit(config, queueName, correlationIds); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getException()); + } + } + + private static void writeStringSet(DataOutputStream out, Set strings) + throws IOException { + out.writeInt(strings.size()); + for (String string : strings) { + out.writeUTF(string); + } + } + + private static Set readStringSet(DataInputStream in) throws IOException { + final int len = in.readInt(); + final Set strings = new HashSet<>(); + for (int i = 0; i < len; i++) { + strings.add(in.readUTF()); + } + return strings; + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQContainerClient.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQContainerClient.java new file mode 100644 index 00000000000..3485e0bdab7 --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/common/RabbitMQContainerClient.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.common; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Delivery; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; + +/** + * This class provides a RabbitMQ container client which allows creating queues, sending messages to + * RabbitMQ and get the messages received by RabbitMQ. + */ +public class RabbitMQContainerClient { + + private final RabbitMQContainer container; + private Channel channel; + private final Queue messages; + private String queueName; + private final CountDownLatch latch; + private final DeserializationSchema valueDeserializer; + + public RabbitMQContainerClient( + RabbitMQContainer container, + DeserializationSchema valueDeserializer, + int countDownLatchSize) { + container.withExposedPorts(5762).waitingFor(Wait.forListeningPort()); + this.container = container; + this.messages = new LinkedList<>(); + this.latch = new CountDownLatch(countDownLatchSize); + this.valueDeserializer = valueDeserializer; + } + + public RabbitMQContainerClient(RabbitMQContainer container) { + this(container, null, 0); + } + + public String createQueue(String queueName, boolean withConsumer) + throws IOException, TimeoutException { + this.queueName = queueName; + Connection connection = getRabbitMQConnection(); + this.channel = connection.createChannel(); + channel.queueDeclare(queueName, true, false, false, null); + if (withConsumer) { + final DeliverCallback deliverCallback = this::handleMessageReceivedCallback; + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); + } + return this.queueName; + } + + public String createQueue() throws IOException, TimeoutException { + return createQueue(UUID.randomUUID().toString(), true); + } + + public String createQueue(boolean withConsumer) throws IOException, TimeoutException { + return createQueue(UUID.randomUUID().toString(), withConsumer); + } + + public void sendMessages(SerializationSchema valueSerializer, List messages) + throws IOException { + for (T message : messages) { + channel.basicPublish("", queueName, null, valueSerializer.serialize(message)); + } + } + + public void sendMessage(SerializationSchema valueSerializer, T message, String correlationId) + throws IOException { + AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); + builder.correlationId(correlationId); + AMQP.BasicProperties properties = builder.build(); + channel.basicPublish("", queueName, properties, valueSerializer.serialize(message)); + } + + public List getConsumedMessages() throws IOException { + List deserializedMessages = new ArrayList<>(); + while (!messages.isEmpty()) { + T message = valueDeserializer.deserialize(messages.poll()); + deserializedMessages.add(message); + } + return deserializedMessages; + } + + public void await() throws InterruptedException { + latch.await(); + } + + public void close() throws IOException, TimeoutException { + this.channel.close(); + } + + private void handleMessageReceivedCallback(String consumerTag, Delivery delivery) { + byte[] body = delivery.getBody(); + messages.add(body); + if (latch != null) { + latch.countDown(); + } + } + + private Connection getRabbitMQConnection() throws TimeoutException, IOException { + ConnectionFactory factory = new ConnectionFactory(); + + factory.setUsername(container.getAdminUsername()); + factory.setPassword(container.getAdminPassword()); + factory.setVirtualHost("/"); + factory.setHost(container.getHost()); + factory.setPort(container.getAmqpPort()); + + return factory.newConnection(); + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSourceITest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSourceITest.java new file mode 100644 index 00000000000..39d4c2e46ac --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/source/RabbitMQSourceITest.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.rabbitmq.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq.common.RabbitMQContainerClient; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.core.testutils.TestContainerExtension; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * The tests for the RabbitMQ source with different consistency modes. As the tests are working a + * lot with timeouts to uphold stream it is possible that tests might fail. + */ +class RabbitMQSourceITest { + static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceITest.class); + private static final List sinkedMessages = + Collections.synchronizedList(new ArrayList<>()); + private static CountDownLatch messageLatch; + private static CountDownLatch checkpointLatch; + private static int failAtNthMessage = 0; + private static final int RABBITMQ_PORT = 5672; + private static RabbitMQContainerClient client; + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(4) + .setConfiguration(new Configuration()) + .build()); + + @RegisterExtension + static AllCallbackWrapper> rabbitMqCallbackWrapper = + new AllCallbackWrapper<>( + new TestContainerExtension<>( + () -> + new RabbitMQContainer( + DockerImageName.parse("rabbitmq") + .withTag("3.7.25-management-alpine")) + .withExposedPorts(RABBITMQ_PORT) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor(Wait.forListeningPort()))); + + @BeforeEach + void setUpContainerClient() { + client = + new RabbitMQContainerClient<>( + rabbitMqCallbackWrapper.getCustomExtension().getTestContainer()); + sinkedMessages.clear(); + failAtNthMessage = -1; + messageLatch = null; + } + + protected void sendToRabbit(List messages) throws IOException { + client.sendMessages(new SimpleStringSchema(), messages); + } + + protected void sendToRabbit(List messages, List correlationIds) + throws IOException { + for (int i = 0; i < messages.size(); i++) { + client.sendMessage(new SimpleStringSchema(), messages.get(i), correlationIds.get(i)); + } + } + + protected List getRandomMessages(int numberOfMessages) { + List messages = new ArrayList<>(); + for (int i = 0; i < numberOfMessages; i++) { + messages.add(UUID.randomUUID().toString()); + } + return messages; + } + + protected List getSequentialMessages(int numberOfMessages) { + List messages = new ArrayList<>(); + for (int i = 0; i < numberOfMessages; i++) { + messages.add("Message " + i); + } + return messages; + } + + protected DataStream addSourceOn( + StreamExecutionEnvironment env, ConsistencyMode consistencyMode) + throws IOException, TimeoutException { + String queueName = client.createQueue(false); + + final RabbitMQConnectionConfig connectionConfig = + new RabbitMQConnectionConfig.Builder() + .setHost( + rabbitMqCallbackWrapper + .getCustomExtension() + .getTestContainer() + .getHost()) + .setVirtualHost("/") + .setUserName( + rabbitMqCallbackWrapper + .getCustomExtension() + .getTestContainer() + .getAdminUsername()) + .setPassword( + rabbitMqCallbackWrapper + .getCustomExtension() + .getTestContainer() + .getAdminPassword()) + .setPort( + rabbitMqCallbackWrapper + .getCustomExtension() + .getTestContainer() + .getMappedPort(RABBITMQ_PORT)) + .build(); + + RabbitMQSource rabbitMQSource = + RabbitMQSource.builder() + .setConnectionConfig(connectionConfig) + .setQueueName(queueName) + .setDeserializationSchema(new SimpleStringSchema()) + .setConsistencyMode(consistencyMode) + .build(); + + return env.fromSource(rabbitMQSource, WatermarkStrategy.noWatermarks(), "RabbitMQSource") + .setParallelism(1); + } + + /** CollectSink to access the messages from the stream. */ + private static class CollectSink implements SinkFunction, CheckpointListener { + + public static void addOnStream(DataStream stream) { + stream.addSink(new CollectSink()).setParallelism(1); + } + + @Override + public void invoke(String value, Context context) throws Exception { + + if (failAtNthMessage > 0) { + failAtNthMessage -= 1; + if (failAtNthMessage == 0) { + throw new Exception("This is supposed to be thrown."); + } + } + sinkedMessages.add(value); + messageLatch.countDown(); + } + + @Override + public void notifyCheckpointComplete(long l) { + if (checkpointLatch != null) { + checkpointLatch.countDown(); + } + } + } + + private StreamExecutionEnvironment getEnv(CheckpointingMode checkpointingMode) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(MINIMAL_CHECKPOINT_TIME, checkpointingMode); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000)); + return env; + } + + // --------------- at most once --------------- + @Test + void atMostOnceTest(@InjectClusterClient ClusterClient clusterClient) throws Exception { + StreamExecutionEnvironment env = getEnv(CheckpointingMode.EXACTLY_ONCE); + + List messages = getRandomMessages(100); + messageLatch = new CountDownLatch(messages.size()); + + DataStream stream = addSourceOn(env, ConsistencyMode.AT_MOST_ONCE); + + CollectSink.addOnStream(stream); + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + CompletableFuture jobIdFuture = clusterClient.submitJob(jobGraph); + + sendToRabbit(messages); + messageLatch.await(10, TimeUnit.MINUTES); + assertThat(messages).containsAll(sinkedMessages); + clusterClient.cancel(jobIdFuture.get()).get(); + CommonTestUtils.waitUntilCondition( + () -> clusterClient.getJobStatus(jobGraph.getJobID()).get() == JobStatus.CANCELED, + 1000); + } + + // --------------- at least once --------------- + @Test + void atLeastOnceTest(@InjectClusterClient MiniClusterClient miniClusterClient) + throws Exception { + StreamExecutionEnvironment env = getEnv(CheckpointingMode.AT_LEAST_ONCE); + + List messages = getRandomMessages(100); + messageLatch = new CountDownLatch(messages.size()); + + DataStream stream = addSourceOn(env, ConsistencyMode.AT_LEAST_ONCE); + CollectSink.addOnStream(stream); + CompletableFuture jobIdFuture = + miniClusterClient.submitJob(env.getStreamGraph().getJobGraph()); + + sendToRabbit(messages); + messageLatch.await(); + assertThat(sinkedMessages).containsAll(messages); + miniClusterClient.cancel(jobIdFuture.get()).get(); + } + + @Test + void atLeastOnceFailureTest(@InjectClusterClient ClusterClient clusterClient) + throws Exception { + // An exception is thrown in the MapFunction in order to trigger a restart of Flink and it + // is assured that the source receives the messages again. + StreamExecutionEnvironment env = getEnv(CheckpointingMode.AT_LEAST_ONCE); + + DataStream stream = addSourceOn(env, ConsistencyMode.AT_LEAST_ONCE); + List messages = getSequentialMessages(100); + failAtNthMessage = 30; + messageLatch = new CountDownLatch(messages.size() + failAtNthMessage - 1); + CollectSink.addOnStream(stream); + sendToRabbit(messages); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + CompletableFuture jobIdFuture = clusterClient.submitJob(jobGraph); + + messageLatch.await(); + + assertThat(sinkedMessages).containsAll(messages); + clusterClient.cancel(jobIdFuture.get()).get(); + + CommonTestUtils.waitUntilCondition( + () -> clusterClient.getJobStatus(jobGraph.getJobID()).get() == JobStatus.CANCELED, + 1000); + } + + // --------------- exactly once --------------- + @Test + void exactlyOnceTest(@InjectClusterClient MiniClusterClient miniClusterClient) + throws Exception { + StreamExecutionEnvironment env = getEnv(CheckpointingMode.EXACTLY_ONCE); + + List messages = getRandomMessages(1000); + messageLatch = new CountDownLatch(messages.size()); + + DataStream stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + CollectSink.addOnStream(stream); + + CompletableFuture jobIdFuture = + miniClusterClient.submitJob(env.getStreamGraph().getJobGraph()); + + // use messages as correlation ids here + sendToRabbit(messages, messages); + messageLatch.await(); + + assertThat(messages).contains(sinkedMessages.toArray(new String[0])); + miniClusterClient.cancel(jobIdFuture.get()).get(); + } + + @Test + void exactlyOnceFilterCorrelationIdsTest( + @InjectClusterClient MiniClusterClient miniClusterClient) throws Exception { + StreamExecutionEnvironment env = getEnv(CheckpointingMode.EXACTLY_ONCE); + + List messages = getRandomMessages(5); + List correlationIds = Arrays.asList("1", "2", "3", "3", "3"); + messageLatch = new CountDownLatch(3); + + DataStream stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + CollectSink.addOnStream(stream); + CompletableFuture jobIdFuture = + miniClusterClient.submitJob(env.getStreamGraph().getJobGraph()); + + sendToRabbit(messages, correlationIds); + + messageLatch.await(); + + List expectedMessages = messages.subList(0, 3); + assertThat(sinkedMessages).isEqualTo(expectedMessages); + miniClusterClient.cancel(jobIdFuture.get()).get(); + } + + /** + * This test is supposed to check if we receive all messages once again which were polled after + * the checkpoint and before the exception thrown by the test. Thus, these messages were not yet + * acknowledged to RabbitMQ and therefore will be consumed once again after the recovery. This + * checks that messages will not be lost on failures. + * + *

The CollectSink has no checkpoint logic and will collect message twice. The test expect + * that all messages before the checkpoint are received twice by the CollectSink. + * + * @throws Exception something not supposed failed + */ + @Test + void exactlyOnceWithFailureAndMessageDuplicationTest( + @InjectClusterClient MiniClusterClient miniClusterClient) throws Exception { + // An exception is thrown in order to trigger a restart of Flink and it + // is assured that the system receives the messages only once. We disable + // (by setting the interval higher than the test duration) checkpoint to + // expect receiving all pre-exception messages once again. + StreamExecutionEnvironment env = getEnv(CheckpointingMode.EXACTLY_ONCE); + + // env.enableCheckpointing(500000); + DataStream stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + + List messages = getRandomMessages(100); + + int originalFailAthNthMessage = 30; + failAtNthMessage = originalFailAthNthMessage; + messageLatch = new CountDownLatch(messages.size() + failAtNthMessage - 1); + CollectSink.addOnStream(stream); + CompletableFuture jobIdFuture = + miniClusterClient.submitJob(env.getStreamGraph().getJobGraph()); + + sendToRabbit(messages, messages); + messageLatch.await(); + + List expectedMessage = + sinkedMessages.subList(originalFailAthNthMessage - 1, sinkedMessages.size()); + assertThat(messages).isEqualTo(expectedMessage); + + miniClusterClient.cancel(jobIdFuture.get()).get(); + } + + /** + * This test checks that messages which were consumed and polled before a successful and + * completed checkpoint will not be consumed from RabbitMQ a second time if a failure happens. + * This mean that these messages will not be polled a second time from Flink (after recovery) as + * well and therefore no duplicated are expected in the CollectSink. + * + * @throws Exception something not supposed failed + */ + @Test + void exactlyOnceWithFailureWithNoMessageDuplicationTest( + @InjectClusterClient MiniClusterClient miniClusterClient) throws Exception { + StreamExecutionEnvironment env = getEnv(CheckpointingMode.EXACTLY_ONCE); + // env.enableCheckpointing(1000); + DataStream stream = addSourceOn(env, ConsistencyMode.EXACTLY_ONCE); + + List messages = getSequentialMessages(60); + List messagesA = messages.subList(0, 30); + List messagesB = messages.subList(30, messages.size()); + + failAtNthMessage = messagesA.size() + 1; + messageLatch = new CountDownLatch(messagesA.size() + messagesB.size()); + + CollectSink.addOnStream(stream); + CompletableFuture jobIdFuture = + miniClusterClient.submitJob(env.getStreamGraph().getJobGraph()); + + // Send first batch of messages + sendToRabbit(messagesA, messagesA); + + // Wait for successful checkpoints to ensure the previous message are acknowledged and + // thus will not be polled a second . + checkpointLatch = new CountDownLatch(2); + checkpointLatch.await(); + + // Send second batch of messages + sendToRabbit(messagesB, messagesB); + + messageLatch.await(); + + // Expect all message to be received without duplications + assertThat(messages).isEqualTo(sinkedMessages); + + miniClusterClient.cancel(jobIdFuture.get()).get(); + } +} diff --git a/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializerTest.java b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializerTest.java new file mode 100644 index 00000000000..4b70bc5b425 --- /dev/null +++ b/flink-connector-rabbitmq/src/test/java/org/apache/flink/connector/rabbitmq/source/split/RabbitMQSourceSplitSerializerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq.source.split; + +import org.apache.flink.connector.rabbitmq.common.RabbitMQConnectionConfig; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** Test the source split serializer. */ +public class RabbitMQSourceSplitSerializerTest { + + private RabbitMQSourceSplit getSourceSplit() { + + String queueName = "exampleQueueName"; + List ids = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + ids.add(Integer.toString(i)); + } + Set correlationIds = new HashSet<>(ids); + final RabbitMQConnectionConfig connectionConfig = + new RabbitMQConnectionConfig.Builder() + .setHost("Host") + .setVirtualHost("/") + .setUserName("Username") + .setPassword("Password") + .setPort(3000) + .build(); + return new RabbitMQSourceSplit(connectionConfig, queueName, correlationIds); + } + + @Test + public void testSplitSerializer() throws IOException { + RabbitMQSourceSplit split = getSourceSplit(); + RabbitMQSourceSplitSerializer serializer = new RabbitMQSourceSplitSerializer(); + + byte[] serializedSplit = serializer.serialize(split); + RabbitMQSourceSplit deserializedSplit = serializer.deserialize(1, serializedSplit); + + assertNotNull(deserializedSplit); + assertEquals(split.splitId(), deserializedSplit.splitId()); + assertEquals(split.getCorrelationIds(), deserializedSplit.getCorrelationIds()); + assertEquals(split.getQueueName(), deserializedSplit.getQueueName()); + assertEquals( + split.getConnectionConfig().getHost(), + deserializedSplit.getConnectionConfig().getHost()); + } +} diff --git a/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties b/flink-connector-rabbitmq/src/test/resources/log4j2.properties similarity index 98% rename from flink-connector-rabbitmq/src/test/resources/log4j2-test.properties rename to flink-connector-rabbitmq/src/test/resources/log4j2.properties index 835c2ec9a3d..dbc4282f06e 100644 --- a/flink-connector-rabbitmq/src/test/resources/log4j2-test.properties +++ b/flink-connector-rabbitmq/src/test/resources/log4j2.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = DEBUG rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger diff --git a/pom.xml b/pom.xml index 23c5fd7a3e5..c079716b3ef 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,6 @@ under the License. 4.0.0 - org.apache.flink flink-connector-rabbitmq-parent 3.1-SNAPSHOT Flink : Connectors : RabbitMQ : Parent