44using System . Threading ;
55using System . Threading . Tasks ;
66using StreamChat . Core . Configs ;
7+ using StreamChat . Core . Exceptions ;
78using StreamChat . Core . Helpers ;
89using StreamChat . Core . InternalDTO . Events ;
910using StreamChat . Core . InternalDTO . Models ;
@@ -53,17 +54,17 @@ namespace StreamChat.Core
5354 //StreamTodo: Handle restoring state after lost connection
5455
5556 public delegate void ChannelInviteHandler ( IStreamChannel channel , IStreamUser invitee ) ;
56-
57+
5758 /// <summary>
5859 /// Member added to the channel handler
5960 /// </summary>
6061 public delegate void ChannelMemberAddedHandler ( IStreamChannel channel , IStreamChannelMember member ) ;
61-
62+
6263 /// <summary>
6364 /// Member removed from the channel handler
6465 /// </summary>
6566 public delegate void ChannelMemberRemovedHandler ( IStreamChannel channel , IStreamChannelMember member ) ;
66-
67+
6768 public sealed class StreamChatClient : IStreamChatClient
6869 {
6970 public event ConnectionMadeHandler Connected ;
@@ -79,7 +80,7 @@ public sealed class StreamChatClient : IStreamChatClient
7980 public event ChannelInviteHandler ChannelInviteReceived ;
8081 public event ChannelInviteHandler ChannelInviteAccepted ;
8182 public event ChannelInviteHandler ChannelInviteRejected ;
82-
83+
8384 public event ChannelMemberAddedHandler AddedToChannelAsMember ;
8485 public event ChannelMemberRemovedHandler RemovedFromChannelAsMember ;
8586
@@ -115,7 +116,7 @@ public static IStreamChatClient CreateDefaultClient(IStreamClientConfig config =
115116 {
116117 config = StreamClientConfig . Default ;
117118 }
118-
119+
119120 var logs = StreamDependenciesFactory . CreateLogger ( config . LogLevel . ToLogLevel ( ) ) ;
120121 var websocketClient
121122 = StreamDependenciesFactory . CreateWebsocketClient ( logs , config . LogLevel . IsDebugEnabled ( ) ) ;
@@ -206,6 +207,7 @@ var ownUserDto
206207 }
207208
208209 //StreamTodo: test scenario: ConnectUserAsync and immediately call DisconnectUserAsync
210+ //StreamTodo: this should cancel token that would be globally passed to all async tasks so the moment we disconnect all async tasks are cancelled
209211 public Task DisconnectUserAsync ( )
210212 {
211213 TryCancelWaitingForUserConnection ( ) ;
@@ -216,7 +218,8 @@ public Task DisconnectUserAsync()
216218
217219 public Task < IStreamChannel > GetOrCreateChannelWithIdAsync ( ChannelType channelType , string channelId ,
218220 string name = null , IDictionary < string , object > optionalCustomData = null )
219- => InternalGetOrCreateChannelWithIdAsync ( channelType , channelId , name , presence : true , state : true , watch : true , optionalCustomData ) ;
221+ => InternalGetOrCreateChannelWithIdAsync ( channelType , channelId , name , presence : true , state : true ,
222+ watch : true , optionalCustomData ) ;
220223
221224 public async Task < IStreamChannel > GetOrCreateChannelWithMembersAsync ( ChannelType channelType ,
222225 IEnumerable < IStreamUser > members , IDictionary < string , object > optionalCustomData = null )
@@ -572,10 +575,11 @@ void IStreamChatClientEventsListener.Destroy()
572575 void IStreamChatClientEventsListener . Update ( ) => InternalLowLevelClient . Update ( _timeService . DeltaTime ) ;
573576
574577 internal StreamChatLowLevelClient InternalLowLevelClient { get ; }
575-
578+
576579 // We probably don't want to expose the presence, state, watch params to the public API
577- internal async Task < IStreamChannel > InternalGetOrCreateChannelWithIdAsync ( ChannelType channelType , string channelId ,
578- string name = null , bool presence = true , bool state = true , bool watch = true ,
580+ internal async Task < IStreamChannel > InternalGetOrCreateChannelWithIdAsync ( ChannelType channelType ,
581+ string channelId ,
582+ string name = null , bool presence = true , bool state = true , bool watch = true ,
579583 IDictionary < string , object > optionalCustomData = null )
580584 {
581585 StreamAsserts . AssertChannelTypeIsValid ( channelType ) ;
@@ -607,13 +611,13 @@ internal IStreamLocalUserData UpdateLocalUser(OwnUserInternalDTO ownUserInternal
607611 {
608612 _localUserData = _cache . TryCreateOrUpdate ( ownUserInternalDto ) ;
609613
610- if ( LocalUserData == null )
614+ if ( LocalUserData == null )
611615 {
612616 _logs . Error ( "Local User Data is null" ) ;
613617 return _localUserData ;
614618 }
615619
616- if ( LocalUserData . ChannelMutes != null )
620+ if ( LocalUserData . ChannelMutes != null )
617621 {
618622 //StreamTodo: Can we not rely on whoever called TryCreateOrUpdate to update this but make it more reliable? Better to react to some event
619623 // This could be solved if ChannelMutes would be an observable collection
@@ -705,6 +709,47 @@ private void TryCancelWaitingForUserConnection()
705709 _connectUserTaskSource . TrySetCanceled ( ) ;
706710 }
707711 }
712+
713+ private async Task InternalGetOrCreateChannelAsync ( ChannelType channelType , string channelId )
714+ {
715+ #if STREAM_TESTS_ENABLED
716+ const int maxAttempts = 10 ;
717+ #else
718+ const int maxAttempts = 1 ;
719+ #endif
720+
721+ for ( int i = 1 ; i <= maxAttempts ; i ++ )
722+ {
723+ try
724+ {
725+ await GetOrCreateChannelWithIdAsync ( channelType , channelId ) ;
726+ }
727+ catch ( StreamApiException streamException )
728+ {
729+ if ( ! streamException . IsRateLimitExceededError ( ) || i == maxAttempts )
730+ {
731+ throw ;
732+ }
733+
734+ if ( ConnectionState != ConnectionState . Connected )
735+ {
736+ break ;
737+ }
738+
739+ var delay = 4 * i ;
740+ #if STREAM_TESTS_ENABLED
741+ _logs . Warning ( $ "InternalGetOrCreateChannelAsync attempt failed due to rate limit. Wait { delay } seconds and try again") ;
742+ #endif
743+ //StreamTodo: pass CancellationToken
744+ await Task . Delay ( delay * 1000 ) ;
745+
746+ if ( ConnectionState != ConnectionState . Connected )
747+ {
748+ break ;
749+ }
750+ }
751+ }
752+ }
708753
709754 #region Events
710755
@@ -716,13 +761,14 @@ private void OnConnected(HealthCheckEventInternalDTO dto)
716761
717762 // This can sometimes be null. I think it's when the client lost network and believes he's reconnecting
718763 // but the healthcheck timeout didn't pass on server and from the server perspective the client never disconnected
719- if ( localUserDto != null )
764+ if ( localUserDto != null )
720765 {
721766 UpdateLocalUser ( localUserDto ) ;
722767 }
723768 else
724769 {
725- _logs . Warning ( "OnConnected localUserDto was NULL and current LocalUserData is " + ( LocalUserData != null ) + " value " + LocalUserData ) ;
770+ _logs . Warning ( "OnConnected localUserDto was NULL and current LocalUserData is " +
771+ ( LocalUserData != null ) + " value " + LocalUserData ) ;
726772 }
727773
728774 Connected ? . Invoke ( LocalUserData ) ;
@@ -904,13 +950,26 @@ private void OnMarkReadNotification(NotificationMarkReadEventInternalDTO eventDt
904950
905951 private void OnAddedToChannelNotification ( NotificationAddedToChannelEventInternalDTO eventDto )
906952 {
907- //StreamTodo: sometimes when I run all tests the eventDto.Channel.Type is null. Inspect how different is this DTO from the channel kept in cache. If its incomplete we shouldn't update the cached value
908- if ( eventDto . Channel . Type == null && eventDto . ChannelType != null )
909- {
910- eventDto . Channel . Type = eventDto . ChannelType ;
911- }
953+ #if STREAM_TESTS_ENABLED
954+ var sb = new StringBuilder ( ) ;
955+ sb . AppendLine ( "OnAddedToChannelNotification" ) ;
956+ sb . AppendLine ( $ "{ nameof ( eventDto . ChannelType ) } : { eventDto . ChannelType } ") ;
957+ sb . AppendLine ( $ "{ nameof ( eventDto . Channel . Type ) } : { eventDto . Channel . Type } ") ;
958+ sb . AppendLine ( $ "{ nameof ( eventDto . Channel . Id ) } : { eventDto . Channel . Id } ") ;
959+ sb . AppendLine ( $ "{ nameof ( eventDto . Channel . Cid ) } : { eventDto . Channel . Cid } ") ;
960+ #endif
912961
913962 var channel = _cache . TryCreateOrUpdate ( eventDto . Channel , out var wasCreated ) ;
963+
964+ #if STREAM_TESTS_ENABLED
965+ sb . Length = 0 ;
966+ sb . AppendLine ( "Channel returned from cache:" ) ;
967+ sb . AppendLine ( $ "{ nameof ( channel . Type ) } : { channel . Type } ") ;
968+ sb . AppendLine ( $ "{ nameof ( channel . Id ) } : { channel . Id } ") ;
969+ sb . AppendLine ( $ "{ nameof ( channel . Cid ) } : { channel . Cid } ") ;
970+ _logs . Info ( sb . ToString ( ) ) ;
971+ #endif
972+
914973 var member = _cache . TryCreateOrUpdate ( eventDto . Member ) ;
915974 _cache . TryCreateOrUpdate ( eventDto . Member . User ) ;
916975
@@ -919,14 +978,15 @@ private void OnAddedToChannelNotification(NotificationAddedToChannelEventInterna
919978 AddedToChannelAsMember ? . Invoke ( channel , member ) ;
920979 return ;
921980 }
922-
981+
923982 // Watch channel, otherwise WS events won't be received
924- GetOrCreateChannelWithIdAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
983+ InternalGetOrCreateChannelAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
925984 {
926985 if ( t . IsFaulted )
927986 {
928987 _logs . Error ( $ "Failed to watch channel with type: { channel . Type } & id: { channel . Id } " +
929- $ "before triggering the { nameof ( AddedToChannelAsMember ) } event. Inspect the following exception.") ;
988+ $ "before triggering the { nameof ( AddedToChannelAsMember ) } event. Inspect the following exception: " +
989+ t . Exception ) ;
930990 _logs . Exception ( t . Exception ) ;
931991 return ;
932992 }
@@ -938,23 +998,43 @@ private void OnAddedToChannelNotification(NotificationAddedToChannelEventInterna
938998 private void OnRemovedFromChannelNotification (
939999 NotificationRemovedFromChannelEventInternalDTO eventDto )
9401000 {
1001+ #if STREAM_TESTS_ENABLED
1002+ var sb = new StringBuilder ( ) ;
1003+ sb . AppendLine ( "OnRemovedFromChannelNotification BEFORE CACHE" ) ;
1004+ sb . AppendLine ( $ "{ nameof ( eventDto . ChannelType ) } : { eventDto . ChannelType } ") ;
1005+ sb . AppendLine ( $ "{ nameof ( eventDto . Channel . Type ) } : { eventDto . Channel . Type } ") ;
1006+ sb . AppendLine ( $ "{ nameof ( eventDto . Channel . Id ) } : { eventDto . Channel . Id } ") ;
1007+ sb . AppendLine ( $ "{ nameof ( eventDto . Channel . Cid ) } : { eventDto . Channel . Cid } ") ;
1008+ _logs . Info ( sb . ToString ( ) ) ;
1009+ #endif
9411010 var channel = _cache . TryCreateOrUpdate ( eventDto . Channel , out var wasCreated ) ;
1011+
1012+ #if STREAM_TESTS_ENABLED
1013+ sb . Length = 0 ;
1014+ sb . AppendLine ( "Channel returned FROM CACHE:" ) ;
1015+ sb . AppendLine ( $ "{ nameof ( channel . Type ) } : { channel . Type } ") ;
1016+ sb . AppendLine ( $ "{ nameof ( channel . Id ) } : { channel . Id } ") ;
1017+ sb . AppendLine ( $ "{ nameof ( channel . Cid ) } : { channel . Cid } ") ;
1018+ _logs . Info ( sb . ToString ( ) ) ;
1019+ #endif
1020+
9421021 var member = _cache . TryCreateOrUpdate ( eventDto . Member ) ;
9431022 _cache . TryCreateOrUpdate ( eventDto . Member . User ) ;
944-
1023+
9451024 if ( ! wasCreated )
9461025 {
9471026 RemovedFromChannelAsMember ? . Invoke ( channel , member ) ;
9481027 return ;
9491028 }
950-
1029+
9511030 // Watch channel, otherwise WS events won't be received
952- GetOrCreateChannelWithIdAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
1031+ InternalGetOrCreateChannelAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
9531032 {
9541033 if ( t . IsFaulted )
9551034 {
9561035 _logs . Error ( $ "Failed to watch channel with type: { channel . Type } & id: { channel . Id } " +
957- $ "before triggering the { nameof ( RemovedFromChannelAsMember ) } event. Inspect the following exception.") ;
1036+ $ "before triggering the { nameof ( RemovedFromChannelAsMember ) } event. Inspect the following exception: " +
1037+ t . Exception ) ;
9581038 _logs . Exception ( t . Exception ) ;
9591039 return ;
9601040 }
@@ -967,20 +1047,21 @@ private void OnInvitedNotification(NotificationInvitedEventInternalDTO eventDto)
9671047 {
9681048 var channel = _cache . TryCreateOrUpdate ( eventDto . Channel , out var wasCreated ) ;
9691049 var user = _cache . TryCreateOrUpdate ( eventDto . User ) ;
970-
1050+
9711051 if ( ! wasCreated )
9721052 {
9731053 ChannelInviteReceived ? . Invoke ( channel , user ) ;
9741054 return ;
9751055 }
976-
1056+
9771057 // Watch channel, otherwise WS events won't be received
978- GetOrCreateChannelWithIdAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
1058+ InternalGetOrCreateChannelAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
9791059 {
9801060 if ( t . IsFaulted )
9811061 {
9821062 _logs . Error ( $ "Failed to watch channel with type: { channel . Type } & id: { channel . Id } " +
983- $ "before triggering the { nameof ( ChannelInviteReceived ) } event. Inspect the following exception.") ;
1063+ $ "before triggering the { nameof ( ChannelInviteReceived ) } event. Inspect the following exception: " +
1064+ t . Exception ) ;
9841065 _logs . Exception ( t . Exception ) ;
9851066 return ;
9861067 }
@@ -993,20 +1074,21 @@ private void OnInviteAcceptedNotification(NotificationInviteAcceptedEventInterna
9931074 {
9941075 var channel = _cache . TryCreateOrUpdate ( eventDto . Channel , out var wasCreated ) ;
9951076 var user = _cache . TryCreateOrUpdate ( eventDto . User ) ;
996-
1077+
9971078 if ( ! wasCreated )
9981079 {
9991080 ChannelInviteAccepted ? . Invoke ( channel , user ) ;
10001081 return ;
10011082 }
1002-
1083+
10031084 // Watch channel, otherwise WS events won't be received
1004- GetOrCreateChannelWithIdAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
1085+ InternalGetOrCreateChannelAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
10051086 {
10061087 if ( t . IsFaulted )
10071088 {
10081089 _logs . Error ( $ "Failed to watch channel with type: { channel . Type } & id: { channel . Id } " +
1009- $ "before triggering the { nameof ( ChannelInviteAccepted ) } event. Inspect the following exception.") ;
1090+ $ "before triggering the { nameof ( ChannelInviteAccepted ) } event. Inspect the following exception: " +
1091+ t . Exception ) ;
10101092 _logs . Exception ( t . Exception ) ;
10111093 return ;
10121094 }
@@ -1019,20 +1101,21 @@ private void OnInviteRejectedNotification(NotificationInviteRejectedEventInterna
10191101 {
10201102 var channel = _cache . TryCreateOrUpdate ( eventDto . Channel , out var wasCreated ) ;
10211103 var user = _cache . TryCreateOrUpdate ( eventDto . User ) ;
1022-
1104+
10231105 if ( ! wasCreated )
10241106 {
10251107 ChannelInviteRejected ? . Invoke ( channel , user ) ;
10261108 return ;
10271109 }
1028-
1110+
10291111 // Watch channel, otherwise WS events won't be received
1030- GetOrCreateChannelWithIdAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
1112+ InternalGetOrCreateChannelAsync ( channel . Type , channel . Id ) . ContinueWith ( t =>
10311113 {
10321114 if ( t . IsFaulted )
10331115 {
10341116 _logs . Error ( $ "Failed to watch channel with type: { channel . Type } & id: { channel . Id } " +
1035- $ "before triggering the { nameof ( ChannelInviteRejected ) } event. Inspect the following exception.") ;
1117+ $ "before triggering the { nameof ( ChannelInviteRejected ) } event. Inspect the following exception: " +
1118+ t . Exception ) ;
10361119 _logs . Exception ( t . Exception ) ;
10371120 return ;
10381121 }
0 commit comments