Skip to content

Commit 7ba50db

Browse files
authored
Merge pull request #169 from GetStream/feature/improve-tests-concurrent-stability
Feature/improve tests concurrent stability
2 parents 3cb9e1d + 89ace30 commit 7ba50db

23 files changed

Lines changed: 741 additions & 386 deletions

Assets/Plugins/StreamChat/Core/LowLevelClient/API/Internal/InternalApiClientBase.cs

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Linq;
23
using System.Text;
34
using System.Threading.Tasks;
45
using StreamChat.Core.Exceptions;
@@ -83,7 +84,7 @@ private object TrySerializeRequestBodyContent(object content, out string seriali
8384
}
8485

8586
private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod, string endpoint,
86-
object requestBody = default, QueryParameters queryParameters = null)
87+
object requestBody = default, QueryParameters queryParameters = null, int attempt = 0)
8788
{
8889
//StreamTodo: perhaps remove this requirement, sometimes we send empty body without any properties
8990
if (requestBody == null && IsRequestBodyRequiredByHttpMethod(httpMethod))
@@ -101,7 +102,14 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
101102
queryParameters = QueryParameters.Default;
102103
}
103104

104-
queryParameters.Append("payload", serializedContent);
105+
if (queryParameters.ContainsKey("paload"))
106+
{
107+
queryParameters["payload"] = serializedContent;
108+
}
109+
else
110+
{
111+
queryParameters.Append("payload", serializedContent);
112+
}
105113
}
106114

107115
var uri = _requestUriFactory.CreateEndpointUri(endpoint, queryParameters);
@@ -129,7 +137,19 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
129137
Message = responseContent,
130138
Code = 504,
131139
};
140+
141+
#if !STREAM_TESTS_ENABLED
132142
throw new StreamApiException(apiError);
143+
#else
144+
if (attempt >= 20)
145+
{
146+
throw new StreamApiException(apiError);
147+
}
148+
149+
_logs.Warning($"API CLIENT, TESTS MODE, Upstream Request Timeout - Make another attempt");
150+
return await HttpRequest<TResponse>(httpMethod, endpoint,
151+
requestBody, queryParameters, ++attempt);
152+
#endif
133153
}
134154

135155
LogRestCall(uri, endpoint, httpMethod, responseContent, success: false, logContent);
@@ -146,6 +166,14 @@ private async Task<TResponse> HttpRequest<TResponse>(HttpMethodType httpMethod,
146166
throw new StreamDeserializationException(responseContent, typeof(TResponse), e);
147167
}
148168

169+
#if STREAM_TESTS_ENABLED
170+
if (apiError.StatusCode == StreamApiException.RateLimitErrorHttpStatusCode && attempt < 50)
171+
{
172+
return await HandleRateLimit<TResponse>(httpMethod, endpoint, requestBody, queryParameters, attempt,
173+
httpResponse);
174+
}
175+
#endif
176+
149177
if (apiError.Code != InvalidAuthTokenErrorCode)
150178
{
151179
LogRestCall(uri, endpoint, httpMethod, responseContent, success: false, logContent);
@@ -273,5 +301,45 @@ private void LogRestCall(Uri uri, string endpoint, HttpMethodType httpMethod, st
273301

274302
_logs.Info(_sb.ToString());
275303
}
304+
305+
private async Task<TResponse> HandleRateLimit<TResponse>(HttpMethodType httpMethod, string endpoint,
306+
object requestBody, QueryParameters queryParameters, int attempt, HttpResponse httpResponse)
307+
{
308+
if (attempt >= 50)
309+
{
310+
throw new StreamApiException(new APIErrorInternalDTO
311+
{ Code = StreamApiException.RateLimitErrorHttpStatusCode });
312+
}
313+
314+
var delaySeconds = GetBackoffDelay(attempt, httpResponse, out var resetHeaderTimestamp);
315+
var now = (int)new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds();
316+
_logs.Warning($"API CLIENT, TESTS MODE, Rate Limit API Error - Wait for {delaySeconds} seconds. " +
317+
$"Timestamp reset header: {resetHeaderTimestamp}, Current timestamp: {now}, Dif: {resetHeaderTimestamp - now}");
318+
await Task.Delay(delaySeconds * 1000);
319+
return await HttpRequest<TResponse>(httpMethod, endpoint, requestBody, queryParameters, ++attempt);
320+
}
321+
322+
private int GetBackoffDelay(int attempt, HttpResponse httpResponse, out int resetHeaderTimestamp)
323+
{
324+
resetHeaderTimestamp = -1;
325+
// StreamTodo: Backoff based on the header doesn't seem to work. Perhaps concurrency is conflicting with this approach
326+
if (httpResponse.TryGetHeader("x-ratelimit-reset", out var values))
327+
{
328+
var resetTimestamp = values.FirstOrDefault();
329+
330+
if (int.TryParse(resetTimestamp, out var rateLimitTimestamp))
331+
{
332+
resetHeaderTimestamp = rateLimitTimestamp;
333+
var now = (int)new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds();
334+
var secondsLeft = rateLimitTimestamp - now;
335+
// if (secondsLeft > 0)
336+
// {
337+
// return secondsLeft + 5;
338+
// }
339+
}
340+
}
341+
342+
return 61 + attempt * 20;
343+
}
276344
}
277345
}

0 commit comments

Comments
 (0)