diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java index dd2844d9f8..672b48e540 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java @@ -24,9 +24,22 @@ @SuppressWarnings("unchecked") public interface ResourceCache extends Cache { + /** + * Lists all resources in the given namespace. + * + * @param namespace the namespace to list resources from + * @return a stream of all cached resources in the namespace + */ default Stream list(String namespace) { return list(namespace, TRUE); } + /** + * Lists resources in the given namespace that match the provided predicate. + * + * @param namespace the namespace to list resources from + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(String namespace, Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java index ffcfd2df58..b2c2d2692d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java @@ -25,17 +25,45 @@ public interface Cache { Predicate TRUE = (a) -> true; + /** + * Retrieves a resource from the cache by its {@link ResourceID}. + * + * @param resourceID the identifier of the resource + * @return an Optional containing the resource if present in the cache + */ Optional get(ResourceID resourceID); + /** + * Checks whether a resource with the given {@link ResourceID} exists in the cache. + * + * @param resourceID the identifier of the resource + * @return {@code true} if the resource is present in the cache + */ default boolean contains(ResourceID resourceID) { return get(resourceID).isPresent(); } + /** + * Returns a stream of all {@link ResourceID}s currently in the cache. + * + * @return a stream of resource identifiers + */ Stream keys(); + /** + * Lists all resources in the cache. + * + * @return a stream of all cached resources + */ default Stream list() { return list(TRUE); } + /** + * Lists resources in the cache that match the provided predicate. + * + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 69a5f36bf4..d33f514f78 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -234,134 +235,150 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); + return mergeWithTempCacheForList(manager().list(namespace, predicate), namespace, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(Predicate predicate) { - return cache.list(predicate); - } - - @Override - public List byIndex(String indexName, String indexKey) { - return manager().byIndex(indexName, indexKey); - } - - public Stream byIndexStream(String indexName, String indexKey) { - return manager().byIndexStream(indexName, indexKey); + return mergeWithTempCacheForList(cache.list(predicate), null, predicate); } /** - * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. */ - public Stream listWithStrongConsistency(String namespace, Predicate predicate) { - return mergeWithWithTempCacheResources( - manager().list(namespace, predicate), namespace, predicate); + @Override + public Stream byIndexStream(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey); } /** - * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when - * resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. */ - public Stream listWithStrongConsistency(Predicate predicate) { - return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); + @Override + public List byIndex(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey) + .collect(Collectors.toList()); } - /** - * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. - */ - public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { - return mergeWithWithTempCacheResources( - manager().byIndexStream(indexName, indexKey), indexName, indexKey); - } + private Stream mergeWithTempCacheForList( + Stream stream, String namespace, Predicate predicate) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)); + } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + if (tempResources.isEmpty()) { + return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)); + } - private Stream mergeWithWithTempCacheResources( - Stream stream, String indexName, String indexKey) { - return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey); - } + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + return tempResource; + } + return r; + }) + .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)) + .toList(); - private Stream mergeWithWithTempCacheResources( - Stream stream, String namespace, Predicate predicate) { - return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null); + return Stream.concat( + tempResources.values().stream() + .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)), + upToDateList.stream()); } - private Stream mergeWithWithTempCacheResources( - Stream stream, - String namespace, - Predicate predicate, - String indexName, - String indexKey) { + private Stream mergeWithTempCacheForIndex( + Stream stream, String indexName, String indexKey) { if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { return stream; } - var allTempResources = temporaryResourceCache.getResources(); - Map tempResources; - if (namespace == null && predicate == null) { - tempResources = new HashMap<>(allTempResources); - } else { - // filtering the temp cache according the user input (predicate, namespace) - tempResources = - allTempResources.entrySet().stream() - .filter( - e -> { - if (namespace != null) { - var res = - e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false); - if (!res) return false; - } - if (predicate != null) { - return predicate.test(e.getValue()); - } - return true; - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); if (tempResources.isEmpty()) { return stream; } + + var indexer = indexers.get(indexName); + if (indexer == null) { + throw new IllegalArgumentException("Indexer not found for: " + indexName); + } + var upToDateList = stream .map( r -> { var resourceID = ResourceID.fromResource(r); - // removing the id from the related temp resources - // this is important so we can detect ghost resources: - // all that remains is ghost resource var tempResource = tempResources.remove(resourceID); - // using the latest version if (tempResource != null && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + if (!indexer.apply(tempResource).contains(indexKey)) { + return null; + } return tempResource; } return r; }) + .filter(Objects::nonNull) .toList(); - Stream tempResourceStream; - // ghost resource handling - if (indexName != null && indexKey != null) { - var indexer = indexers.get(indexName); - if (indexer == null) { - throw new IllegalArgumentException("Indexer not found for: " + indexName); + + // remaining temp resources are ghost resources — include only those matching the index + return Stream.concat( + tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)), + upToDateList.stream()); + } + + private static Predicate filterResourceByNamespaceAndPredicate( + String namespace, Predicate predicate) { + return r -> { + if (namespace != null) { + var res = Optional.of(r).map(ns -> ns.equals(namespace)).orElse(false); + if (!res) return false; } - // we check if the ghost resource is part of the index - tempResourceStream = - tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)); - } else { - tempResourceStream = tempResources.values().stream(); - } - return Stream.concat(tempResourceStream, upToDateList.stream()); + if (predicate != null) { + return predicate.test(r); + } + return true; + }; } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Keys from the temporary resource + * cache (ghost resources) are included in the result. + */ @Override public Stream keys() { - return cache.keys(); + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return manager().keys(); + } + var tempKeys = temporaryResourceCache.getResources().keySet(); + return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k))); } @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 7313cc3a48..2a68101984 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -16,6 +16,7 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -541,40 +542,62 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { } @Test - void listWithStrongConsistencyReplacesResourceFromTempCache() { + void listReplacesResourceFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(newer); } @Test - void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { + void listExcludesResourceWhenTempCacheContainsNewerVersionThatNoLongerMatchesPredicate() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("4"); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = + informerEventSource + .list(null, r -> !"5".equals(r.getMetadata().getResourceVersion())) + .toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenNotInTempCache() { var original = testDeployment(); - when(temporaryResourceCache.getResources()).thenReturn(Map.of()); + when(temporaryResourceCache.getResources()).thenReturn(new HashMap<>()); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(original); } @Test - void listWithStrongConsistencyReplacesOnlyMatchingResources() { + void listReplacesOnlyMatchingResources() { var dep1 = testDeployment(); var dep2 = testDeployment(); dep2.getMetadata().setName("other"); @@ -582,93 +605,158 @@ void listWithStrongConsistencyReplacesOnlyMatchingResources() { newerDep1.getMetadata().setResourceVersion("5"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(dep1), newerDep1)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(dep1), newerDep1))); var informerManager = mock(InformerManager.class); when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2)); doReturn(informerManager).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); } @Test - void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { + void byIndexStreamReplacesFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); var informerManager = mock(InformerManager.class); when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); doReturn(informerManager).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(newer); } @Test - void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void byIndexStreamSkipsNewerTempCacheResourceWhenIndexedValueChanged() { + var original = testDeployment(); + original.getMetadata().setLabels(Map.of("app", "key")); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + newer.getMetadata().setLabels(Map.of("app", "other")); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + doReturn(informerManager).when(informerEventSource).manager(); + informerEventSource.addIndexers( + Map.of("idx", d -> List.of(d.getMetadata().getLabels().get("app")))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); olderTemp.getMetadata().setResourceVersion("3"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(original); } @Test - void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); olderTemp.getMetadata().setResourceVersion("3"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); var mim = mock(InformerManager.class); when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(original); } @Test - void listWithStrongConsistencyAddsGhostResources() { + void listAddsGhostResources() { var resource = testDeployment(); var ghostResource = testDeployment(); ghostResource.getMetadata().setName("ghost"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(ghostResource), ghostResource)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(ghostResource), ghostResource))); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); } + @Test + void keysIncludesGhostResourceKeys() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + var resourceId = ResourceID.fromResource(resource); + var ghostResourceId = ResourceID.fromResource(ghostResource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(ghostResourceId, ghostResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(ghostResourceId)).thenReturn(false); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactlyInAnyOrder(resourceId, ghostResourceId); + } + + @Test + void keysDoesNotDuplicateExistingKeys() { + var resource = testDeployment(); + var newerResource = testDeployment(); + newerResource.getMetadata().setResourceVersion("5"); + + var resourceId = ResourceID.fromResource(resource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(resourceId, newerResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(resourceId)).thenReturn(true); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactly(resourceId); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta());