Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@

package org.cloudfoundry.reactor.logcache.v1;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.EnvelopeType;
import org.cloudfoundry.logcache.v1.InfoRequest;
import org.cloudfoundry.logcache.v1.InfoResponse;
import org.cloudfoundry.logcache.v1.MetaRequest;
import org.cloudfoundry.logcache.v1.MetaResponse;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final class ReactorLogCacheEndpoints extends AbstractLogCacheOperations {
Expand All @@ -48,4 +56,111 @@ Mono<MetaResponse> meta(MetaRequest request) {
Mono<ReadResponse> read(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}

Mono<ReadResponse> recentLogs(ReadRequest request) {
return read(request);
}

/**
* Continuously polls Log Cache and emits new {@link Envelope}s as they arrive.
*
* <p>Mirrors the Go {@code logcache.Walk()} / {@code cf tail --follow} semantics:
* <ol>
* <li>Start the cursor at {@code startTime} (defaults to now&nbsp;&minus;&nbsp;5&nbsp;s in
* nanoseconds).</li>
* <li>Issue {@code GET /api/v1/read/{sourceId}?start_time=cursor}.</li>
* <li>Emit every returned envelope in ascending timestamp order and advance
* the cursor to {@code lastTimestamp + 1}.</li>
* <li>When the batch is empty, wait {@code pollInterval} before the next poll.</li>
* <li>Repeat forever – the caller cancels the subscription to stop.</li>
* </ol>
* Fully non-blocking: no {@code Thread.sleep}.
*/
Flux<Envelope> logsTail(TailLogsRequest request) {
long defaultStartNanos = (System.currentTimeMillis() - 5_000L) * 1_000_000L;
AtomicLong cursor =
new AtomicLong(
request.getStartTime() != null
? request.getStartTime()
: defaultStartNanos);

List<EnvelopeType> envelopeTypes =
request.getEnvelopeTypes() != null
? request.getEnvelopeTypes()
: Collections.emptyList();
String nameFilter = request.getNameFilter();

/*
* Strategy (mirrors Go's logcache.Walk):
* – Mono.defer builds a fresh ReadRequest from the mutable cursor on every repetition.
* – The Mono returns either the sorted batch (non-empty) or an empty list.
* – flatMapMany turns each batch into a stream of individual Envelope items.
* – repeat() subscribes again after each completion.
* – When the batch was empty we insert a delay via Mono.delay before the next
* repetition so we do not hammer the server. We signal "empty" by returning
* a sentinel Mono<Boolean> (false = was empty, true = had data) and use
* repeatWhen to conditionally delay.
*/
return Flux.defer(
() -> {
// Build the read request from the current cursor position.
ReadRequest.Builder builder =
ReadRequest.builder()
.sourceId(request.getSourceId())
.startTime(cursor.get());
if (!envelopeTypes.isEmpty()) {
builder.envelopeTypes(envelopeTypes);
}
if (nameFilter != null && !nameFilter.isEmpty()) {
builder.nameFilter(nameFilter);
}

return read(builder.build())
.onErrorReturn(ReadResponse.builder().build())
.flatMapMany(
resp -> {
List<Envelope> raw =
resp.getEnvelopes() != null
? resp.getEnvelopes().getBatch()
: Collections.emptyList();

if (raw.isEmpty()) {
// Signal "no data" so repeatWhen can insert the
// back-off delay.
return Flux.empty();
}

// Sort ascending by timestamp and advance the
// cursor.
List<Envelope> sorted = new ArrayList<>(raw);
sorted.sort(
(a, b) ->
Long.compare(
a.getTimestamp() != null
? a.getTimestamp()
: 0L,
b.getTimestamp() != null
? b.getTimestamp()
: 0L));

Envelope last = sorted.get(sorted.size() - 1);
cursor.set(
(last.getTimestamp() != null
? last.getTimestamp()
: cursor.get())
+ 1);

return Flux.fromIterable(sorted);
});
})
// repeatWhen receives a Flux<Long> where each element is the count of items
// emitted in the previous cycle (0 = empty batch → insert delay).
.repeatWhen(
companion ->
companion.flatMap(
count ->
count == 0
? Mono.delay(request.getPollInterval())
: Mono.just(count)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

package org.cloudfoundry.reactor.logcache.v1;

import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.InfoRequest;
import org.cloudfoundry.logcache.v1.InfoResponse;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.MetaRequest;
import org.cloudfoundry.logcache.v1.MetaResponse;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import org.immutables.value.Value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URI;
Expand Down Expand Up @@ -53,6 +56,16 @@ public Mono<ReadResponse> read(ReadRequest request) {
return getReactorLogCacheEndpoints().read(request);
}

@Override
public Mono<ReadResponse> recentLogs(ReadRequest request) {
return getReactorLogCacheEndpoints().recentLogs(request);
}

@Override
public Flux<Envelope> logsTail(TailLogsRequest request) {
return getReactorLogCacheEndpoints().logsTail(request);
}

/**
* The connection context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.cloudfoundry.logcache.v1;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -46,4 +47,25 @@ public interface LogCacheClient {
* @return the read response
*/
Mono<ReadResponse> read(ReadRequest request);

/**
* Makes the Log Cache RecentLogs /api/v1/read request
*
* @param request the Recent Logs request
* @return the events from the recent logs
*/
Mono<ReadResponse> recentLogs(ReadRequest request);

/**
* Continuously polls the Log Cache /api/v1/read endpoint and streams new {@link Envelope}s
* as they appear. This is the Java equivalent of the Go {@code logcache.Walk()} API and
* {@code cf tail --follow}.
* <p>
* The returned {@link Flux} will never complete on its own – unsubscribe (or cancel) it to
* stop streaming.
*
* @param request the tail request (source id, optional filters, poll interval)
* @return an infinite stream of envelopes
*/
Flux<Envelope> logsTail(TailLogsRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cloudfoundry.logcache.v1;

import org.cloudfoundry.Nullable;
import org.immutables.value.Value;

import java.time.Duration;
import java.util.List;

/**
* The request options for the Log Cache tail (streaming follow) operation.
* This continuously polls the Log Cache /api/v1/read endpoint, emitting new envelopes
* as they appear – equivalent to {@code cf tail --follow} or the Go {@code logcache.Walk()} API.
*/
@Value.Immutable
abstract class _TailLogsRequest {

/**
* The source id (application guid or service guid) to stream logs for.
*/
abstract String getSourceId();

/**
* Optional start time (UNIX nanoseconds). Defaults to "now – 5 seconds" when not set.
*/
@Nullable
abstract Long getStartTime();

/**
* Optional envelope type filter.
*/
@Nullable
abstract List<EnvelopeType> getEnvelopeTypes();

/**
* Optional regex name filter (requires Log Cache ≥ 2.1.0).
*/
@Nullable
abstract String getNameFilter();

/**
* How long to wait between successive polls when no new envelopes are available.
* Defaults to 250 ms (matching the Go client's {@code AlwaysRetryBackoff}).
*/
@Value.Default
Duration getPollInterval() {
return Duration.ofMillis(250);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package org.cloudfoundry.operations.applications;

import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -126,11 +130,29 @@ public interface Applications {
@Deprecated
Flux<LogMessage> logs(LogsRequest request);

/**
* List the applications logs from logCacheClient.
* If no messages are available, an empty Flux is returned.
*
* @param request the application logs request
* @return the applications logs
*/
Flux<Log> logsRecent(ReadRequest request);

/**
* Continuously streams application log envelopes from Log Cache by repeatedly polling
* the {@code /api/v1/read} endpoint. The returned {@link Flux} is infinite – cancel it
* to stop streaming. This is the Java equivalent of {@code cf tail --follow}.
*
* @param request the tail request (source id, optional filters, poll interval)
* @return an infinite stream of envelopes
*/
Flux<Envelope> logsTail(TailLogsRequest request);

/**
* List the applications logs.
* Uses Log Cache under the hood when {@link ApplicationLogsRequest#getRecent()} is {@code true}.
* Log streaming still uses Doppler, which is not available in CF deployments following
* <a href="https://docs.cloudfoundry.org/loggregator/architecture.html#shared-nothing-architecture">shared-nothing architecture</a>.
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3}
* and {@code TAS < 4.0}.
*
* @param request the application logs request
* @return the applications logs
Expand Down
Loading