Skip to content

Commit 4ee32cd

Browse files
authored
[Java Harness] Reapply optimization to use Caches.weigh on objects within cached blocks (#37964)
* Reapply "Reduce weighing overhead for caching blocks (#36897)" This reverts commit 63177cb. * Remove use of ImmutableList where values are not known to be non-null
1 parent 2e1edb0 commit 4ee32cd

5 files changed

Lines changed: 212 additions & 96 deletions

File tree

sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/WeightedList.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.concurrent.atomic.AtomicLong;
2222
import org.apache.beam.sdk.util.Weighted;
23+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.LongMath;
2324

2425
/** Facade for a {@link List<T>} that keeps track of weight, for cache limit reasons. */
2526
public class WeightedList<T> implements Weighted {
@@ -71,14 +72,6 @@ public void addAll(List<T> values, long weight) {
7172
}
7273

7374
public void accumulateWeight(long weight) {
74-
this.weight.accumulateAndGet(
75-
weight,
76-
(first, second) -> {
77-
try {
78-
return Math.addExact(first, second);
79-
} catch (ArithmeticException e) {
80-
return Long.MAX_VALUE;
81-
}
82-
});
75+
this.weight.accumulateAndGet(weight, LongMath::saturatedAdd);
8376
}
8477
}

sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,16 @@ public void log(LogEntry entry) {
198198
@TearDown
199199
public void tearDown() {
200200
try {
201+
// Shutting down the control server should terminate the sdk client.
202+
// We do this before shutting down logging server in particular as that can
203+
// trigger exceptions if the client was not yet shutdown.
201204
controlServer.close();
205+
sdkHarnessExecutorFuture.get();
206+
202207
stateServer.close();
203208
dataServer.close();
204209
loggingServer.close();
205210
controlClient.close();
206-
sdkHarnessExecutorFuture.get();
207211
} catch (InterruptedException ignored) {
208212
Thread.currentThread().interrupt();
209213
} catch (Exception e) {

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.fn.harness;
1919

20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
21+
2022
import java.util.Arrays;
2123
import java.util.Collections;
2224
import java.util.Objects;
@@ -25,6 +27,7 @@
2527
import java.util.concurrent.ExecutionException;
2628
import java.util.concurrent.atomic.LongAdder;
2729
import java.util.function.Function;
30+
import javax.annotation.Nullable;
2831
import org.apache.beam.fn.harness.Cache.Shrinkable;
2932
import org.apache.beam.sdk.options.PipelineOptions;
3033
import org.apache.beam.sdk.options.SdkHarnessOptions;
@@ -41,7 +44,6 @@
4144
import org.slf4j.LoggerFactory;
4245

4346
/** Utility methods used to instantiate and operate over cache instances. */
44-
@SuppressWarnings("nullness")
4547
public final class Caches {
4648
private static final Logger LOG = LoggerFactory.getLogger(Caches.class);
4749

@@ -70,7 +72,7 @@ public final class Caches {
7072
public static final long REFERENCE_SIZE = 8;
7173

7274
/** Returns the amount of memory in bytes the provided object consumes. */
73-
public static long weigh(Object o) {
75+
public static long weigh(@Nullable Object o) {
7476
if (o == null) {
7577
return REFERENCE_SIZE;
7678
}
@@ -115,6 +117,7 @@ static class ShrinkOnEviction implements RemovalListener<CompositeKey, WeightedV
115117
cache;
116118
private final LongAdder weightInBytes;
117119

120+
@SuppressWarnings("argument")
118121
ShrinkOnEviction(
119122
CacheBuilder<CompositeKey, WeightedValue<Object>> cacheBuilder, LongAdder weightInBytes) {
120123
this.cache = cacheBuilder.removalListener(this).build();
@@ -130,18 +133,19 @@ static class ShrinkOnEviction implements RemovalListener<CompositeKey, WeightedV
130133
@Override
131134
public void onRemoval(
132135
RemovalNotification<CompositeKey, WeightedValue<Object>> removalNotification) {
133-
weightInBytes.add(
134-
-(removalNotification.getKey().getWeight() + removalNotification.getValue().getWeight()));
135-
if (removalNotification.wasEvicted()) {
136-
if (!(removalNotification.getValue().getValue() instanceof Cache.Shrinkable)) {
137-
return;
138-
}
139-
Object updatedEntry = ((Shrinkable<?>) removalNotification.getValue().getValue()).shrink();
140-
if (updatedEntry != null) {
141-
cache.put(
142-
removalNotification.getKey(),
143-
addWeightedValue(removalNotification.getKey(), updatedEntry, weightInBytes));
144-
}
136+
CompositeKey key = checkNotNull(removalNotification.getKey());
137+
WeightedValue<Object> value = checkNotNull(removalNotification.getValue());
138+
weightInBytes.add(-(key.getWeight() + value.getWeight()));
139+
if (!removalNotification.wasEvicted()) {
140+
return;
141+
}
142+
@Nullable Object v = value.getValue();
143+
if (!(v instanceof Cache.Shrinkable)) {
144+
return;
145+
}
146+
@Nullable Object updatedEntry = ((Shrinkable<?>) v).shrink();
147+
if (updatedEntry != null) {
148+
cache.put(key, addWeightedValue(key, updatedEntry, weightInBytes));
145149
}
146150
}
147151
}
@@ -282,8 +286,8 @@ private static class SubCache<K, V> implements Cache<K, V> {
282286
}
283287

284288
@Override
285-
public V peek(K key) {
286-
WeightedValue<Object> value = cache.getIfPresent(keyPrefix.valueKey(key));
289+
public @Nullable V peek(K key) {
290+
@Nullable WeightedValue<Object> value = cache.getIfPresent(keyPrefix.valueKey(key));
287291
if (value == null) {
288292
return null;
289293
}
@@ -298,7 +302,9 @@ public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
298302
cache
299303
.get(
300304
compositeKey,
301-
() -> addWeightedValue(compositeKey, loadingFunction.apply(key), weightInBytes))
305+
() ->
306+
addWeightedValue(
307+
compositeKey, checkNotNull(loadingFunction.apply(key)), weightInBytes))
302308
.getValue();
303309
} catch (ExecutionException e) {
304310
throw new RuntimeException(e);
@@ -308,7 +314,7 @@ public V computeIfAbsent(K key, Function<K, V> loadingFunction) {
308314
@Override
309315
public void put(K key, V value) {
310316
CompositeKey compositeKey = keyPrefix.valueKey(key);
311-
cache.put(compositeKey, addWeightedValue(compositeKey, value, weightInBytes));
317+
cache.put(compositeKey, addWeightedValue(compositeKey, checkNotNull(value), weightInBytes));
312318
}
313319

314320
@Override
@@ -356,7 +362,7 @@ CompositeKeyPrefix subKey(Object suffix, Object... additionalSuffixes) {
356362
return new CompositeKeyPrefix(subKey, subKeyWeight);
357363
}
358364

359-
<K> CompositeKey valueKey(K k) {
365+
<K> CompositeKey valueKey(@Nullable K k) {
360366
return new CompositeKey(namespace, weight, k);
361367
}
362368

@@ -391,22 +397,26 @@ boolean isEquivalentNamespace(CompositeKey otherKey) {
391397
@VisibleForTesting
392398
static class CompositeKey implements Weighted {
393399
private final Object[] namespace;
394-
private final Object key;
400+
private final @Nullable Object key;
395401
private final long weight;
396402

397-
private CompositeKey(Object[] namespace, long namespaceWeight, Object key) {
403+
private CompositeKey(Object[] namespace, long namespaceWeight, @Nullable Object key) {
398404
this.namespace = namespace;
399405
this.key = key;
400406
this.weight = namespaceWeight + weigh(key);
401407
}
402408

403409
@Override
404410
public String toString() {
405-
return "CompositeKey{namespace=" + Arrays.toString(namespace) + ", key=" + key + "}";
411+
return "CompositeKey{namespace="
412+
+ Arrays.toString(namespace)
413+
+ ", key="
414+
+ String.valueOf(key)
415+
+ "}";
406416
}
407417

408418
@Override
409-
public boolean equals(Object o) {
419+
public boolean equals(@Nullable Object o) {
410420
if (this == o) {
411421
return true;
412422
}
@@ -434,6 +444,7 @@ public long getWeight() {
434444
* <p>The set of keys that are tracked are only those provided to {@link #peek} and {@link
435445
* #computeIfAbsent}.
436446
*/
447+
@SuppressWarnings("nullness")
437448
public static class ClearableCache<K, V> extends SubCache<K, V> {
438449
private final Set<K> weakHashSet;
439450

0 commit comments

Comments
 (0)