Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.testkit.javadsl.TestKit;

import com.typesafe.config.Config;


/**
* Manage creation and termination of actor systems for tests.
Expand All @@ -27,6 +29,24 @@ private DittoTestSystem(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}

/**
* Run a unit test in its own actor system and test kit.
*
* @param test the JUnit test object.
* @param config the config to load into the actorSystem created for the test.
* @param assertions the assertions to run.
*/
public static void run(final Object test, final Config config, final TestConsumer assertions) {
final String actorSystemName = test.getClass().getSimpleName();
try (final DittoTestSystem testSystem = new DittoTestSystem(ActorSystem.create(actorSystemName, config))) {
new TestKit(testSystem.actorSystem) {{
assertions.accept(this);
}};
} catch (final Exception e) {
throw new AssertionError(e);
}
}

/**
* Run a unit test in its own actor system and test kit.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ public class DefaultLocalAskTimeoutConfig implements LocalAskTimeoutConfig {
private static final String CONFIG_PATH = "local-ask";
private final Duration askTimeout;
private final Duration askTimeoutDuringRecovery;
private final Duration enforcerAskTimeout;

private DefaultLocalAskTimeoutConfig(final ScopedConfig config) {
askTimeout = config.getNonNegativeAndNonZeroDurationOrThrow(LocalAskTimeoutConfigValue.ASK_TIMEOUT);
askTimeoutDuringRecovery =
config.getNonNegativeAndNonZeroDurationOrThrow(LocalAskTimeoutConfigValue.ASK_TIMEOUT_DURING_RECOVERY);
enforcerAskTimeout = config
.getNonNegativeAndNonZeroDurationOrThrow(LocalAskTimeoutConfigValue.ENFORCER_ASK_TIMEOUT);
}

/**
Expand All @@ -61,6 +64,11 @@ public Duration getLocalAskTimeoutDuringRecovery() {
return askTimeoutDuringRecovery;
}

@Override
public Duration getLocalEnforcerAskTimeout() {
return enforcerAskTimeout;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -71,19 +79,21 @@ public boolean equals(final Object o) {
}
final DefaultLocalAskTimeoutConfig that = (DefaultLocalAskTimeoutConfig) o;
return Objects.equals(askTimeout, that.askTimeout) &&
Objects.equals(askTimeoutDuringRecovery, that.askTimeoutDuringRecovery);
Objects.equals(askTimeoutDuringRecovery, that.askTimeoutDuringRecovery) &&
Objects.equals(enforcerAskTimeout, that.enforcerAskTimeout);
}

@Override
public int hashCode() {
return Objects.hash(askTimeout, askTimeoutDuringRecovery);
return Objects.hash(askTimeout, askTimeoutDuringRecovery, enforcerAskTimeout);
}

@Override
public String toString() {
return getClass().getSimpleName() + "[" +
"askTimeout=" + askTimeout +
", askTimeoutDuringRecovery=" + askTimeoutDuringRecovery +
", enforcerAskTimeout=" + enforcerAskTimeout +
']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

/**
* Provides configuration settings for the local ACK timeout.
* Provides configuration settings for the local ask timeout.
*/
@Immutable
public interface LocalAskTimeoutConfig {
Expand All @@ -40,6 +40,15 @@ public interface LocalAskTimeoutConfig {
*/
Duration getLocalAskTimeoutDuringRecovery();

/**
* Timeout for local actor invocation of "enforcer" child actor. Normally, a small timeout is sufficient (local actor
* communication is only a method call) - however if the policy enforcer is not yet cached, a remote call to load
* the policy is required - which can (especially under load, e.g. during a restart) take longer time.
*
* @return the duration for local ask timeout calls when interacting with "enforcer" actor.
*/
Duration getLocalEnforcerAskTimeout();


/**
* An enumeration of the known config path expressions and their associated default values for
Expand All @@ -55,7 +64,12 @@ enum LocalAskTimeoutConfigValue implements KnownConfigValue {
/**
* The local ask timeout duration during persistence actor recovery.
*/
ASK_TIMEOUT_DURING_RECOVERY("timeout-during-recovery", Duration.ofSeconds(45L));
ASK_TIMEOUT_DURING_RECOVERY("timeout-during-recovery", Duration.ofSeconds(45L)),

/**
* The local ask timeout duration for enforcer actor.
*/
ENFORCER_ASK_TIMEOUT("enforcer-timeout", Duration.ofSeconds(10L));

private final String path;
private final Duration defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
softly.assertThat(underTest.getLocalAskTimeoutDuringRecovery())
.as(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ASK_TIMEOUT_DURING_RECOVERY.getConfigPath())
.isEqualTo(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ASK_TIMEOUT_DURING_RECOVERY.getDefaultValue());

softly.assertThat(underTest.getLocalEnforcerAskTimeout())
.as(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ENFORCER_ASK_TIMEOUT.getConfigPath())
.isEqualTo(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ENFORCER_ASK_TIMEOUT.getDefaultValue());
}

@Test
Expand All @@ -69,5 +73,8 @@ public void underTestReturnsValuesOfConfigFile() {
softly.assertThat(underTest.getLocalAskTimeoutDuringRecovery())
.as(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ASK_TIMEOUT_DURING_RECOVERY.getConfigPath())
.isEqualTo(Duration.ofSeconds(25L));
softly.assertThat(underTest.getLocalEnforcerAskTimeout())
.as(LocalAskTimeoutConfig.LocalAskTimeoutConfigValue.ENFORCER_ASK_TIMEOUT.getConfigPath())
.isEqualTo(Duration.ofSeconds(7L));
}
}
1 change: 1 addition & 0 deletions base/service/src/test/resources/local-ask-timout-test.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local-ask {
timeout = 10s
timeout-during-recovery = 25s
enforcer-timeout = 7s
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.mapping;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.edge.service.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.internal.models.signalenrichment.ByRoundTripSignalEnrichmentFacade;
Expand All @@ -21,8 +22,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorSystem;

/**
* Provider for Connectivity-service of signal-enriching facades that uses an async Caffeine cache in order to load
* extra data to enrich.
Expand All @@ -43,7 +42,7 @@ public final class DefaultConnectivitySignalEnrichmentProvider implements Connec
public DefaultConnectivitySignalEnrichmentProvider(final ActorSystem actorSystem, final Config config) {
final var commandHandler = actorSystem.actorSelection(COMMAND_FORWARDER_ACTOR_PATH);
final var providerConfig = DefaultSignalEnrichmentProviderConfig.of(config);
final var delegate = ByRoundTripSignalEnrichmentFacade.of(commandHandler, providerConfig.getAskTimeout());
final var delegate = ByRoundTripSignalEnrichmentFacade.of(actorSystem, commandHandler);
if (providerConfig.isCachingEnabled()) {
final var cacheLoaderExecutor = actorSystem.dispatchers().lookup(CACHE_DISPATCHER);
facade = DittoCachingSignalEnrichmentFacade.newInstance(
Expand Down
3 changes: 0 additions & 3 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ ditto {
expire-after-create = 2m
expire-after-create = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_CACHE_EXPIRE_AFTER_CREATE}
}
# timeout for all facades
ask-timeout = 10s
ask-timeout = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_ASK_TIMEOUT}
}
}
connection-enforcer-actor-props-factory = org.eclipse.ditto.connectivity.service.enforcement.NoOpEnforcerActorPropsFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionTimeoutException;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigUnavailableException;
import org.eclipse.ditto.connectivity.service.messaging.kafka.MessageRejectedException;
import org.eclipse.ditto.edge.service.EdgeServiceTimeoutException;
import org.eclipse.ditto.internal.utils.cacheloaders.ServiceTimeoutException;
import org.eclipse.ditto.internal.utils.test.GlobalErrorRegistryTestCases;
import org.eclipse.ditto.jwt.model.JwtInvalidException;
import org.eclipse.ditto.messages.model.AuthorizationSubjectBlockedException;
Expand Down Expand Up @@ -87,12 +87,12 @@ public ConnectivityServiceGlobalErrorRegistryTest() {
JwtInvalidException.class,
IllegalAdaptableException.class,
WotThingModelInvalidException.class,
EdgeServiceTimeoutException.class,
WotThingModelPayloadValidationException.class,
WotValidationConfigErrorException.class,
WotValidationConfigHistoryNotAccessibleException.class,
WotValidationConfigInvalidException.class,
WotValidationConfigNotAccessibleException.class
WotValidationConfigNotAccessibleException.class,
ServiceTimeoutException.class
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.time.Duration;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.internal.models.signalenrichment.ByRoundTripSignalEnrichmentFacade;
import org.eclipse.ditto.internal.models.signalenrichment.DittoCachingSignalEnrichmentFacade;
Expand All @@ -26,13 +26,9 @@
import org.junit.Test;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.testkit.javadsl.TestKit;

/**
* Tests {@link DefaultConnectivitySignalEnrichmentProvider}.
*/
Expand All @@ -41,8 +37,6 @@ public final class DefaultConnectivitySignalEnrichmentProviderTest {
private static final Config CONFIG = ConfigFactory.empty()
.withValue("ditto.extensions.signal-enrichment-provider.extension-class",
ConfigValueFactory.fromAnyRef(DefaultConnectivitySignalEnrichmentProvider.class.getCanonicalName()))
.withValue("ditto.extensions.signal-enrichment-provider.extension-config.ask-timeout",
ConfigValueFactory.fromAnyRef(Duration.ofDays(1L)))
.withValue("ditto.extensions.signal-enrichment-provider.extension-config.cache.enabled",
ConfigValueFactory.fromAnyRef(false));

Expand Down Expand Up @@ -115,15 +109,6 @@ public void loadProviderWithIncorrectClass() {
.isThrownBy(() -> ConnectivitySignalEnrichmentProvider.get(actorSystem, dittoExtensionsConfig));
}

@Test
public void loadProviderWithIncorrectConfig() {
createActorSystem(withValue("ditto.extensions.signal-enrichment-provider.extension-config.ask-timeout",
"This is not a duration"));
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
assertThatExceptionOfType(ConfigException.class)
.isThrownBy(() -> ConnectivitySignalEnrichmentProvider.get(actorSystem, dittoExtensionsConfig));
}

private Config withValue(final String key, final String value) {
return CONFIG.withValue(key, ConfigValueFactory.fromAnyRef(value));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ The configuration for an extension consists of two parts:
ditto.extensions.signal-enrichment-provider {
extension-class = org.eclipse.ditto.gateway.service.endpoints.utils.DefaultGatewaySignalEnrichmentProvider
extension-config = {
ask-timeout = 10s

cache {
enabled = true
maximum-size = 20000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.ditto.connectivity.model.ConnectivityConstants;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveAllConnectionIds;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetryCommandForwarder;
import org.eclipse.ditto.internal.utils.cacheloaders.config.DefaultAskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.RemoteStreamRefActorTerminatedException;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
Expand Down Expand Up @@ -196,14 +197,15 @@ private void handleSourceRef(final SourceRef<?> sourceRef, final List<ThingId> t
);

final CompletionStage<List<PlainJson>> o =
sourceRef.getSource()
.<Jsonifiable<?>>map(Jsonifiable.class::cast)
.orElse(thingNotAccessibleExceptionSource)
.filterNot(DittoRuntimeException.class::isInstance)
.map(thingPlainJsonSupplier::apply)
provideSource(sourceRef, thingNotAccessibleExceptionSource, thingPlainJsonSupplier)
.log("retrieve-thing-response", log)
.recoverWithRetries(1, new PFBuilder<Throwable, Source<PlainJson, NotUsed>>()
.recoverWithRetries(5, new PFBuilder<Throwable, Source<PlainJson, NotUsed>>()
.match(NoSuchElementException.class, nsee -> Source.single(PlainJson.empty()))
.match(RemoteStreamRefActorTerminatedException.class, rsrate -> {
log.warning("Remote stream terminated, retrying...");
return provideSource(sourceRef, thingNotAccessibleExceptionSource,
thingPlainJsonSupplier);
})
.build()
)
.runWith(Sink.seq(), materializer);
Expand All @@ -222,6 +224,17 @@ private void handleSourceRef(final SourceRef<?> sourceRef, final List<ThingId> t
);
}

private static Source<PlainJson, NotUsed> provideSource(final SourceRef<?> sourceRef,
final Source<Jsonifiable<?>, NotUsed> thingNotAccessibleExceptionSource,
final Function<Jsonifiable<?>, PlainJson> thingPlainJsonSupplier
) {
return sourceRef.getSource()
.<Jsonifiable<?>>map(Jsonifiable.class::cast)
.orElse(thingNotAccessibleExceptionSource)
.filterNot(DittoRuntimeException.class::isInstance)
.map(thingPlainJsonSupplier::apply);
}

private Function<Jsonifiable<?>, PlainJson> supplyPlainJsonFromRetrieveThingResponse() {
return jsonifiable -> {
if (jsonifiable instanceof RetrieveThingResponse response) {
Expand Down
8 changes: 4 additions & 4 deletions edge/service/src/main/resources/ditto-edge-service.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ ditto {
ask-with-retry {
# maximum duration to wait for answers from entity shard regions
ask-timeout = 5s
ask-timeout = ${?CONCIERGE_CACHES_ASK_TIMEOUT}
ask-timeout = ${?EDGE_ASK_WITH_RETRY_ASK_TIMEOUT}

# one of: OFF, NO_DELAY, FIXED_DELAY, BACKOFF_DELAY
retry-strategy = BACKOFF_DELAY
retry-strategy = ${?CONCIERGE_CACHES_ASK_RETRY_STRATEGY}
retry-strategy = ${?EDGE_ASK_WITH_RETRY_ASK_RETRY_STRATEGY}

retry-attempts = 3
retry-attempts = ${?CONCIERGE_CACHES_ASK_TIMEOUT_RETRIES}
retry-attempts = ${?EDGE_ASK_WITH_RETRY_ASK_TIMEOUT_RETRIES}

fixed-delay = 5s
fixed-delay = ${?CONCIERGE_CACHES_ASK_FIXED_DELAY}
fixed-delay = ${?EDGE_ASK_WITH_RETRY_ASK_FIXED_DELAY}

backoff-delay {
min = 100ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class DefaultGatewaySignalEnrichmentProvider implements GatewaySign
public DefaultGatewaySignalEnrichmentProvider(final ActorSystem actorSystem, final Config config) {
final var commandHandler = actorSystem.actorSelection(COMMAND_FORWARDER);
final var providerConfig = DefaultSignalEnrichmentProviderConfig.of(config);
final var delegate = ByRoundTripSignalEnrichmentFacade.of(commandHandler, providerConfig.getAskTimeout());
final var delegate = ByRoundTripSignalEnrichmentFacade.of(actorSystem, commandHandler);
if (providerConfig.isCachingEnabled()) {
final Executor cacheLoaderExecutor = actorSystem.dispatchers().lookup(CACHE_LOADER_DISPATCHER);
facade = DittoCachingSignalEnrichmentFacade.newInstance(
Expand Down
4 changes: 0 additions & 4 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ ditto {
signal-enrichment-provider {
extension-class = org.eclipse.ditto.gateway.service.endpoints.utils.DefaultGatewaySignalEnrichmentProvider
extension-config = {
# timeout for all facades
ask-timeout = 10s
ask-timeout = ${?GATEWAY_SIGNAL_ENRICHMENT_ASK_TIMEOUT}

cache {
# enable/disable caching
enabled = true
Expand Down
Loading
Loading