Skip to content

Commit

Permalink
fixup: fixing inprocess and eventing
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Jan 15, 2025
1 parent 82db146 commit 5678da0
Show file tree
Hide file tree
Showing 21 changed files with 229 additions and 254 deletions.
1 change: 1 addition & 0 deletions providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
Expand All @@ -12,12 +13,17 @@
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -31,10 +37,29 @@ public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean connected = false;
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private final List<Hook> hooks = new ArrayList<>();
private volatile ProviderEvent previousEvent = null;

/**
* An executor service responsible for scheduling reconnection attempts.
*/
private final ScheduledExecutorService reconnectExecutor;

/**
* A scheduled task for managing reconnection attempts.
*/
private ScheduledFuture<?> reconnectTask;

/**
* The grace period in milliseconds to wait for reconnection before emitting an error event.
*/
private final long gracePeriod;
/**
* The deadline in milliseconds for GRPC operations.
*/
private final long deadline;

protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
Expand All @@ -55,18 +80,21 @@ public FlagdProvider() {
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::isConnected, this::onConnectionEvent);
this.flagResolver = new InProcessResolver(options, this::onProviderEvent);
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onConnectionEvent);
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onProviderEvent);
break;
default:
throw new IllegalStateException(
String.format("Requested unsupported resolver type of %s", options.getResolverType()));
}
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
contextEnricher = options.getContextEnricher();
this.reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
this.gracePeriod = options.getRetryGracePeriod();
this.deadline = options.getDeadline();
}

@Override
Expand All @@ -81,17 +109,22 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
}

this.flagResolver.init();
this.initialized = this.connected = true;
// block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing into the equation
Util.busyWaitAndCheck(this.deadline + 200, () -> initialized);

}

@Override
public synchronized void shutdown() {
if (!this.initialized) {
return;
}

try {
this.flagResolver.shutdown();
if (reconnectExecutor != null) {
reconnectExecutor.shutdownNow();
reconnectExecutor.awaitTermination(deadline, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
Expand Down Expand Up @@ -151,47 +184,77 @@ EvaluationContext getEnrichedContext() {
return enrichedContext;
}

private boolean isConnected() {
return this.connected;
}
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

private void onConnectionEvent(ConnectionEvent connectionEvent) {
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();
syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
}

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (previousEvent == ProviderEvent.PROVIDER_READY) {
this.emitProviderConfigurationChanged(
ProviderEventDetails
.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
.message("configuration changed")
.build());
break;
}
case PROVIDER_READY:
onReady();
previousEvent = ProviderEvent.PROVIDER_READY;
break;

if (!initialized) {
return;
case PROVIDER_ERROR:
if (previousEvent != ProviderEvent.PROVIDER_ERROR) {
onError();
}
previousEvent = ProviderEvent.PROVIDER_ERROR;
break;
}
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
private void onReady() {
if(!initialized) {
initialized = true;
log.info("initialized FlagdProvider");
}
if (reconnectTask != null && !reconnectTask.isCancelled()) {
reconnectTask.cancel(false);
log.debug("Reconnection task cancelled as connection became READY.");
}
this.emitProviderReady(
ProviderEventDetails
.builder()
.message("connected to flagd")
.build());
}

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
private void onError() {
log.info("Connection lost. Emit STALE event...");
log.debug("Waiting {}s for connection to become available...", gracePeriod);
this.emitProviderStale(ProviderEventDetails.builder().message("there has been an error").build());

if (reconnectTask != null && !reconnectTask.isCancelled()) {
reconnectTask.cancel(false);
}

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());
} else {
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
if (!reconnectExecutor.isShutdown()) {
reconnectTask = reconnectExecutor.schedule(
() -> {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...", gracePeriod);
flagResolver.onError();
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
;
},
gracePeriod,
TimeUnit.SECONDS);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface Resolver {

void shutdown() throws Exception;

default void onError() {}

ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx);

ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void monitorChannelState(
Runnable onConnectionLost) {
channel.notifyWhenStateChanged(expectedState, () -> {
ConnectivityState currentState = channel.getState(true);
log.info("Channel state changed to: {}", currentState);
//log.info("Channel state changed to: {}", currentState);
if (currentState == ConnectivityState.READY) {
if (onConnectionReady != null) {
onConnectionReady.run();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.Structure;
import lombok.Getter;
import java.util.Collections;
Expand All @@ -12,13 +13,13 @@
* The event includes information about the connection status, any flags that have changed,
* and metadata associated with the synchronization process.
*/
public class ConnectionEvent {
public class FlagdProviderEvent {

/**
* The current state of the connection.
*/
@Getter
private final ConnectionState connected;
private final ProviderEvent event;

/**
* A list of flags that have changed due to this connection event.
Expand All @@ -30,57 +31,45 @@ public class ConnectionEvent {
*/
private final Structure syncMetadata;

/**
* Constructs a new {@code ConnectionEvent} with the connection status only.
*
* @param connected {@code true} if the connection is established, otherwise {@code false}.
*/
public ConnectionEvent(boolean connected) {
this(
connected ? ConnectionState.CONNECTED : ConnectionState.DISCONNECTED,
Collections.emptyList(),
new ImmutableStructure());
}

/**
* Constructs a new {@code ConnectionEvent} with the specified connection state.
*
* @param connected the connection state indicating if the connection is established or not.
* @param event the event indicating the provider state.
*/
public ConnectionEvent(ConnectionState connected) {
this(connected, Collections.emptyList(), new ImmutableStructure());
public FlagdProviderEvent(ProviderEvent event) {
this(event, Collections.emptyList(), new ImmutableStructure());
}

/**
* Constructs a new {@code ConnectionEvent} with the specified connection state and changed flags.
*
* @param connected the connection state indicating if the connection is established or not.
* @param event the event indicating the provider state.
* @param flagsChanged a list of flags that have changed due to this connection event.
*/
public ConnectionEvent(ConnectionState connected, List<String> flagsChanged) {
this(connected, flagsChanged, new ImmutableStructure());
public FlagdProviderEvent(ProviderEvent event, List<String> flagsChanged) {
this(event, flagsChanged, new ImmutableStructure());
}

/**
* Constructs a new {@code ConnectionEvent} with the specified connection state and synchronization metadata.
*
* @param connected the connection state indicating if the connection is established or not.
* @param event the event indicating the provider state.
* @param syncMetadata metadata related to the synchronization process of this event.
*/
public ConnectionEvent(ConnectionState connected, Structure syncMetadata) {
this(connected, Collections.emptyList(), new ImmutableStructure(syncMetadata.asMap()));
public FlagdProviderEvent(ProviderEvent event, Structure syncMetadata) {
this(event, Collections.emptyList(), new ImmutableStructure(syncMetadata.asMap()));
}

/**
* Constructs a new {@code ConnectionEvent} with the specified connection state, changed flags, and
* synchronization metadata.
*
* @param connectionState the state of the connection.
* @param event the event.
* @param flagsChanged a list of flags that have changed due to this connection event.
* @param syncMetadata metadata related to the synchronization process of this event.
*/
public ConnectionEvent(ConnectionState connectionState, List<String> flagsChanged, Structure syncMetadata) {
this.connected = connectionState;
public FlagdProviderEvent(ProviderEvent event, List<String> flagsChanged, Structure syncMetadata) {
this.event = event;
this.flagsChanged = flagsChanged != null ? flagsChanged : Collections.emptyList(); // Ensure non-null list
this.syncMetadata = syncMetadata != null
? new ImmutableStructure(syncMetadata.asMap())
Expand All @@ -105,32 +94,7 @@ public Structure getSyncMetadata() {
return new ImmutableStructure(syncMetadata.asMap());
}

/**
* Indicates whether the current connection state is connected.
*
* @return {@code true} if connected, otherwise {@code false}.
*/
public boolean isConnected() {
return this.connected == ConnectionState.CONNECTED;
}

/**
* Indicates
* whether the current connection state is disconnected.
*
* @return {@code true} if disconnected, otherwise {@code false}.
*/
public boolean isDisconnected() {
return this.connected == ConnectionState.DISCONNECTED;
return event == ProviderEvent.PROVIDER_ERROR || event == ProviderEvent.PROVIDER_STALE;
}
/**
* Indicates
* whether the current connection state is stale.
*
* @return {@code true} if stale, otherwise {@code false}.
*/
public boolean isStale() {
return this.connected == ConnectionState.STALE;
}

}
Loading

0 comments on commit 5678da0

Please sign in to comment.