Skip to content

Commit 1c53255

Browse files
authored
Feature/improve performance (#123)
* Set LogLevel to FailureOnly by default * Refactor the WebsocketClient background tasks to use Threading Timer and SemaphoreSlim instead of the infinite async/await in order to have less overhead + reduce the interval of each to 100ms + offset both tasks to not run simultaneously in order to spread the overhead across more frames * Remove Unity dependency from the WebsocketClient.cs * move period and offset config to consts + increase update interval from 10x per second to 20x per second
1 parent 528a73c commit 1c53255

3 files changed

Lines changed: 79 additions & 41 deletions

File tree

Assets/Plugins/StreamChat/Core/Configs/IStreamClientConfig.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ namespace StreamChat.Core.Configs
88
public interface IStreamClientConfig
99
{
1010
/// <summary>
11-
/// What type of logs are being emitted.
11+
/// What type of logs are being emitted. Available options:
12+
/// FailureOnly - only errors will be logged. This option is recommended for production
13+
/// All - all errors will be logged. This can be useful during development
14+
/// Debug - This included All logs + some additional that can be useful for debugging
15+
/// Disabled - no logs will be emitted. Not recommended in general - this could be only viable if you're capturing all of the thrown exceptions and handling the logging on your own.
1216
/// </summary>
1317
StreamLogLevel LogLevel { get; set; }
1418
}

Assets/Plugins/StreamChat/Core/Configs/StreamClientConfig.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ public class StreamClientConfig : IStreamClientConfig
77
{
88
public static IStreamClientConfig Default { get; set; } = new StreamClientConfig();
99

10-
public StreamLogLevel LogLevel { get; set; } = StreamLogLevel.All;
10+
public StreamLogLevel LogLevel { get; set; } = StreamLogLevel.FailureOnly;
1111
}
1212
}

Assets/Plugins/StreamChat/Libs/Websockets/WebsocketClient.cs

Lines changed: 73 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ public WebsocketClient(ILogs logs, Encoding encoding = default, bool isDebugMode
3131
_logs = logs ?? throw new ArgumentNullException(nameof(logs));
3232
_encoding = encoding ?? DefaultEncoding;
3333
_isDebugMode = isDebugMode;
34-
35-
var readBuffer = new byte[4 * 1024];
36-
_bufferSegment = new ArraySegment<byte>(readBuffer);
3734
}
3835

3936
public bool TryDequeueMessage(out string message) => _receiveQueue.TryDequeue(out message);
@@ -80,12 +77,8 @@ await TryDisposeResourcesAsync(WebSocketCloseStatus.NormalClosure,
8077
return;
8178
}
8279

83-
#pragma warning disable 4014
84-
Task.Factory.StartNew(SendMessagesLoopAsync, _connectionCts.Token, TaskCreationOptions.LongRunning,
85-
TaskScheduler.Default);
86-
Task.Factory.StartNew(ReceiveMessagesLoopAsync, _connectionCts.Token, TaskCreationOptions.LongRunning,
87-
TaskScheduler.Default);
88-
#pragma warning restore 4014
80+
_backgroundSendTimer = new Timer(SendMessagesCallback, null, 0, UpdatePeriod);
81+
_backgroundReceiveTimer = new Timer(ReceiveMessagesCallback, null, UpdatePeriodOffset, UpdatePeriod);
8982

9083
Connected?.Invoke();
9184
}
@@ -135,6 +128,10 @@ public void Dispose()
135128
DisconnectAsync(WebSocketCloseStatus.NormalClosure, "WebSocket client is disposed")
136129
.ContinueWith(_ => LogExceptionIfDebugMode(_.Exception), TaskContinuationOptions.OnlyOnFaulted);
137130
}
131+
132+
private const int UpdatesPerSecond = 20;
133+
private const int UpdatePeriod = 1000 / UpdatesPerSecond;
134+
private const int UpdatePeriodOffset = UpdatePeriod / 2;
138135

139136
private static Encoding DefaultEncoding { get; } = Encoding.UTF8;
140137

@@ -150,20 +147,35 @@ public void Dispose()
150147
private readonly BlockingCollection<ArraySegment<byte>> _sendQueue =
151148
new BlockingCollection<ArraySegment<byte>>();
152149

153-
private readonly ArraySegment<byte> _bufferSegment;
150+
private readonly ArraySegment<byte> _bufferSegment = new ArraySegment<byte>(new byte[4 * 1024]);
154151

155152
private readonly ILogs _logs;
156153
private readonly Encoding _encoding;
157154
private readonly bool _isDebugMode;
158155

156+
private readonly SemaphoreSlim _backgroundSendSemaphore = new SemaphoreSlim(1);
157+
private readonly SemaphoreSlim _backgroundReceiveSemaphore = new SemaphoreSlim(1);
158+
159+
private Timer _backgroundSendTimer;
160+
private Timer _backgroundReceiveTimer;
161+
159162
private Uri _uri;
160163
private ClientWebSocket _internalClient;
161164
private CancellationTokenSource _connectionCts;
162165

163-
// Runs on a background thread
164-
private async Task SendMessagesLoopAsync()
166+
private async void SendMessagesCallback(object state)
165167
{
166-
while (IsConnected && _connectionCts != null && !_connectionCts.IsCancellationRequested)
168+
if (!IsConnected || _connectionCts == null || _connectionCts.IsCancellationRequested)
169+
{
170+
return;
171+
}
172+
173+
if (!_backgroundSendSemaphore.Wait(0))
174+
{
175+
return;
176+
}
177+
178+
try
167179
{
168180
while (_sendQueue.TryTake(out var msg))
169181
{
@@ -185,45 +197,67 @@ private async Task SendMessagesLoopAsync()
185197
_threadExceptionsLog.Enqueue(e);
186198
}
187199
}
188-
189-
await Task.Delay(1);
200+
}
201+
finally
202+
{
203+
_backgroundSendSemaphore.Release();
190204
}
191205
}
192206

193207
// Runs on a background thread
194-
private async Task ReceiveMessagesLoopAsync()
208+
private async void ReceiveMessagesCallback(object state)
195209
{
196-
while (IsConnected && _connectionCts != null && !_connectionCts.IsCancellationRequested)
210+
if (!IsConnected || _connectionCts == null || _connectionCts.IsCancellationRequested)
197211
{
198-
try
199-
{
200-
var result = await TryReceiveSingleMessageAsync();
201-
if (!string.IsNullOrEmpty(result))
202-
{
203-
_receiveQueue.Enqueue(result);
204-
continue;
205-
}
206-
}
207-
catch (OperationCanceledException)
208-
{
209-
return;
210-
}
211-
catch (WebSocketException webSocketException)
212-
{
213-
_threadWebsocketExceptionsLog.Enqueue(webSocketException);
214-
return;
215-
}
216-
catch (Exception e)
212+
return;
213+
}
214+
215+
if (!_backgroundReceiveSemaphore.Wait(0))
216+
{
217+
return;
218+
}
219+
220+
try
221+
{
222+
var result = await TryReceiveSingleMessageAsync();
223+
if (!string.IsNullOrEmpty(result))
217224
{
218-
_threadExceptionsLog.Enqueue(e);
225+
_receiveQueue.Enqueue(result);
219226
}
220-
221-
await Task.Delay(1);
227+
}
228+
catch (OperationCanceledException)
229+
{
230+
return;
231+
}
232+
catch (WebSocketException webSocketException)
233+
{
234+
_threadWebsocketExceptionsLog.Enqueue(webSocketException);
235+
return;
236+
}
237+
catch (Exception e)
238+
{
239+
_threadExceptionsLog.Enqueue(e);
240+
}
241+
finally
242+
{
243+
_backgroundReceiveSemaphore.Release();
222244
}
223245
}
224246

225247
private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, string closeMessage)
226248
{
249+
try
250+
{
251+
_backgroundReceiveTimer?.Dispose();
252+
_backgroundReceiveTimer = null;
253+
_backgroundSendTimer?.Dispose();
254+
_backgroundSendTimer = null;
255+
}
256+
catch (Exception e)
257+
{
258+
LogExceptionIfDebugMode(e);
259+
}
260+
227261
try
228262
{
229263
if (_connectionCts != null)

0 commit comments

Comments
 (0)