Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,22 @@
@SuppressWarnings("unchecked")
public interface ResourceCache<T extends HasMetadata> extends Cache<T> {

/**
* Lists all resources in the given namespace.
*
* @param namespace the namespace to list resources from
* @return a stream of all cached resources in the namespace
*/
default Stream<T> list(String namespace) {
return list(namespace, TRUE);
}

/**
* Lists resources in the given namespace that match the provided predicate.
*
* @param namespace the namespace to list resources from
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(String namespace, Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,45 @@
public interface Cache<T> {
Predicate TRUE = (a) -> true;

/**
* Retrieves a resource from the cache by its {@link ResourceID}.
*
* @param resourceID the identifier of the resource
* @return an Optional containing the resource if present in the cache
*/
Optional<T> get(ResourceID resourceID);

/**
* Checks whether a resource with the given {@link ResourceID} exists in the cache.
*
* @param resourceID the identifier of the resource
* @return {@code true} if the resource is present in the cache
*/
default boolean contains(ResourceID resourceID) {
return get(resourceID).isPresent();
}

/**
* Returns a stream of all {@link ResourceID}s currently in the cache.
*
* @return a stream of resource identifiers
*/
Stream<ResourceID> keys();

/**
* Lists all resources in the cache.
*
* @return a stream of all cached resources
*/
default Stream<T> list() {
return list(TRUE);
}

/**
* Lists resources in the cache that match the provided predicate.
*
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -234,134 +235,150 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
this.indexers.putAll(indexers);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
Comment on lines +238 to +243
@Override
public Stream<R> list(String namespace, Predicate<R> predicate) {
return manager().list(namespace, predicate);
return mergeWithTempCacheForList(manager().list(namespace, predicate), namespace, predicate);
}
Comment on lines 245 to 247
Comment on lines 245 to 247

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public Stream<R> list(Predicate<R> predicate) {
return cache.list(predicate);
}

@Override
public List<R> byIndex(String indexName, String indexKey) {
return manager().byIndex(indexName, indexKey);
}

public Stream<R> byIndexStream(String indexName, String indexKey) {
return manager().byIndexStream(indexName, indexKey);
return mergeWithTempCacheForList(cache.list(predicate), null, predicate);
}

/**
* Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is
* useful when resources are updated using {@link
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
public Stream<R> listWithStrongConsistency(String namespace, Predicate<R> predicate) {
return mergeWithWithTempCacheResources(
manager().list(namespace, predicate), namespace, predicate);
@Override
public Stream<R> byIndexStream(String indexName, String indexKey) {
return mergeWithTempCacheForIndex(
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
}
Comment on lines +266 to 270
Comment on lines 245 to 270

/**
* Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when
* resources are updated using {@link
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
public Stream<R> listWithStrongConsistency(Predicate<R> predicate) {
return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate);
@Override
public List<R> byIndex(String indexName, String indexKey) {
return mergeWithTempCacheForIndex(
manager().byIndexStream(indexName, indexKey), indexName, indexKey)
.collect(Collectors.toList());
}

/**
* Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is
* useful when resources are updated using {@link
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
*/
public Stream<R> byIndexStreamWithStrongConsistency(String indexName, String indexKey) {
return mergeWithWithTempCacheResources(
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
}
private Stream<R> mergeWithTempCacheForList(
Stream<R> stream, String namespace, Predicate<R> predicate) {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate));
}
Comment on lines +285 to +289
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
if (tempResources.isEmpty()) {
return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate));
}

private Stream<R> mergeWithWithTempCacheResources(
Stream<R> stream, String indexName, String indexKey) {
return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey);
}
var upToDateList =
stream
.map(
r -> {
var resourceID = ResourceID.fromResource(r);
var tempResource = tempResources.remove(resourceID);
if (tempResource != null
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
return tempResource;
}
return r;
})
.filter(filterResourceByNamespaceAndPredicate(namespace, predicate))
.toList();
Comment on lines +307 to +308

private Stream<R> mergeWithWithTempCacheResources(
Stream<R> stream, String namespace, Predicate<R> predicate) {
return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null);
return Stream.concat(
tempResources.values().stream()
.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)),
upToDateList.stream());
}

private Stream<R> mergeWithWithTempCacheResources(
Stream<R> stream,
String namespace,
Predicate<R> predicate,
String indexName,
String indexKey) {
private Stream<R> mergeWithTempCacheForIndex(
Stream<R> stream, String indexName, String indexKey) {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return stream;
}
var allTempResources = temporaryResourceCache.getResources();
Map<ResourceID, R> tempResources;
if (namespace == null && predicate == null) {
tempResources = new HashMap<>(allTempResources);
} else {
// filtering the temp cache according the user input (predicate, namespace)
tempResources =
allTempResources.entrySet().stream()
.filter(
e -> {
if (namespace != null) {
var res =
e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false);
if (!res) return false;
}
if (predicate != null) {
return predicate.test(e.getValue());
}
return true;
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
if (tempResources.isEmpty()) {
return stream;
}

var indexer = indexers.get(indexName);
if (indexer == null) {
throw new IllegalArgumentException("Indexer not found for: " + indexName);
}

var upToDateList =
stream
.map(
r -> {
var resourceID = ResourceID.fromResource(r);
// removing the id from the related temp resources
// this is important so we can detect ghost resources:
// all that remains is ghost resource
var tempResource = tempResources.remove(resourceID);
// using the latest version
if (tempResource != null
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
if (!indexer.apply(tempResource).contains(indexKey)) {
return null;
}
return tempResource;
}
return r;
})
.filter(Objects::nonNull)
.toList();
Stream<R> tempResourceStream;
// ghost resource handling
if (indexName != null && indexKey != null) {
var indexer = indexers.get(indexName);
if (indexer == null) {
throw new IllegalArgumentException("Indexer not found for: " + indexName);

// remaining temp resources are ghost resources — include only those matching the index
return Stream.concat(
tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)),
upToDateList.stream());
}

private static <R extends HasMetadata> Predicate<R> filterResourceByNamespaceAndPredicate(
String namespace, Predicate<R> predicate) {
return r -> {
if (namespace != null) {
var res = Optional.of(r).map(ns -> ns.equals(namespace)).orElse(false);
if (!res) return false;
}
// we check if the ghost resource is part of the index
tempResourceStream =
tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey));
} else {
tempResourceStream = tempResources.values().stream();
}
return Stream.concat(tempResourceStream, upToDateList.stream());
if (predicate != null) {
return predicate.test(r);
}
return true;
};
}
Comment on lines +355 to 367

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Keys from the temporary resource
* cache (ghost resources) are included in the result.
*/
@Override
public Stream<ResourceID> keys() {
return cache.keys();
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return manager().keys();
}
var tempKeys = temporaryResourceCache.getResources().keySet();
return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k)));
}

@Override
Expand Down
Loading
Loading