Skip to content

Commit 45627bb

Browse files
committed
added ResilientStreamingClient and clean p StreamingClient
1 parent e936f08 commit 45627bb

8 files changed

Lines changed: 334 additions & 46 deletions

File tree

build/dependecies.props

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup>
44
<NetCoreCommonVersion>2.2.*</NetCoreCommonVersion>
5-
<BetCommonVersion>1.1.18</BetCommonVersion>
5+
<BetCommonVersion>1.1.19</BetCommonVersion>
66
</PropertyGroup>
77

88
<PropertyGroup Condition="'$(TargetFramework)' == 'netcoreapp2.1' Or '$(TargetFramework)' == 'netstandard2.0'">
@@ -18,26 +18,22 @@
1818
</PropertyGroup>
1919

2020
<ItemGroup Label="NetCore">
21-
<PackageReference Update="Microsoft.Extensions.Logging.Abstractions" Version="$(NetCoreCommonVersion)" />
22-
<PackageReference Update="Microsoft.Extensions.Logging" Version="$(NetCoreCommonVersion)" />
23-
<PackageReference Update="Microsoft.Extensions.Logging.Configuration" Version="$(NetCoreCommonVersion)" />
24-
25-
<PackageReference Update="Microsoft.Extensions.Logging.Console" Version="$(NetCoreCommonVersion)" />
26-
<PackageReference Update="Microsoft.Extensions.Logging.Debug" Version="$(NetCoreCommonVersion)" />
27-
21+
<PackageReference Update="Microsoft.AspNetCore.Hosting.Server.Abstractions" Version="$(NetCoreCommonVersion)" />
22+
<PackageReference Update="Microsoft.AspNetCore.Http" Version="$(NetCoreCommonVersion)" />
2823
<PackageReference Update="Microsoft.Extensions.Configuration.Binder" Version="$(NetCoreCommonVersion)" />
29-
<PackageReference Update="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="$(NetCoreCommonVersion)" />
3024
<PackageReference Update="Microsoft.Extensions.Configuration.CommandLine" Version="$(NetCoreCommonVersion)" />
25+
<PackageReference Update="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="$(NetCoreCommonVersion)" />
3126
<PackageReference Update="Microsoft.Extensions.Configuration.Json" Version="$(NetCoreCommonVersion)" />
3227
<PackageReference Update="Microsoft.Extensions.Configuration.UserSecrets" Version="$(NetCoreCommonVersion)" />
33-
28+
<PackageReference Update="Microsoft.Extensions.DependencyInjection" Version="$(NetCoreCommonVersion)" />
3429
<PackageReference Update="Microsoft.Extensions.Hosting" Version="$(NetCoreCommonVersion)" />
3530
<PackageReference Update="Microsoft.Extensions.Hosting.Abstractions" Version="$(NetCoreCommonVersion)" />
36-
37-
<PackageReference Update="Microsoft.Extensions.DependencyInjection" Version="$(NetCoreCommonVersion)" />
31+
<PackageReference Update="Microsoft.Extensions.Logging" Version="$(NetCoreCommonVersion)" />
32+
<PackageReference Update="Microsoft.Extensions.Logging.Abstractions" Version="$(NetCoreCommonVersion)" />
33+
<PackageReference Update="Microsoft.Extensions.Logging.Configuration" Version="$(NetCoreCommonVersion)" />
34+
<PackageReference Update="Microsoft.Extensions.Logging.Console" Version="$(NetCoreCommonVersion)" />
35+
<PackageReference Update="Microsoft.Extensions.Logging.Debug" Version="$(NetCoreCommonVersion)" />
3836
<PackageReference Update="Microsoft.Extensions.Options" Version="$(NetCoreCommonVersion)" />
39-
<PackageReference Update="Microsoft.AspNetCore.Hosting.Server.Abstractions" Version="$(NetCoreCommonVersion)" />
40-
<PackageReference Update="Microsoft.AspNetCore.Http" Version="$(NetCoreCommonVersion)" />
4137
</ItemGroup>
4238

4339
<ItemGroup Label="Bet">

build/settings.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
<PropertyGroup Label="Basic Settings">
44
<IsPackable>true</IsPackable>
5-
<VersionPrefix>2.1.0-pre</VersionPrefix>
5+
<VersionPrefix>2.2.0-pre</VersionPrefix>
66
<LangVersion>latest</LangVersion>
77
<TreatWarningsAsErrors>false</TreatWarningsAsErrors>
88
<SuppressNETCoreSdkPreviewMessage>true</SuppressNETCoreSdkPreviewMessage>

src/CometD.NetCore.Salesforce/Resilience/IResilientForceClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ Task<SearchResult<T>> SearchAsync<T>(
209209
/// <param name="currentInstanceUrl"></param>
210210
/// <param name="cancellationToken"></param>
211211
/// <returns> True or false. Does not throw exceptions, only false in case of any errors.</returns>
212-
bool TestConnectionAsync(
212+
Task<bool> TestConnectionAsync(
213213
string currentInstanceUrl = null,
214214
CancellationToken cancellationToken = default);
215215

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Collections.Specialized;
4+
using System.Net;
5+
using System.Threading;
6+
using CometD.NetCore.Bayeux.Client;
7+
using CometD.NetCore.Client;
8+
using CometD.NetCore.Client.Extension;
9+
using CometD.NetCore.Client.Transport;
10+
using Microsoft.Extensions.Logging;
11+
using Microsoft.Extensions.Options;
12+
using NetCoreForce.Client.Models;
13+
14+
namespace CometD.NetCore.Salesforce
15+
{
16+
public class ResilientStreamingClient : IStreamingClient
17+
{
18+
private BayeuxClient _bayeuxClient = null;
19+
private bool _isDisposed = false;
20+
private ErrorExtension _errorExtension;
21+
private LongPollingTransport _clientTransport;
22+
private ReplayExtension _replayIdExtension;
23+
24+
private readonly ILogger<ResilientStreamingClient> _logger;
25+
private readonly SalesforceConfiguration _options;
26+
private readonly AsyncExpiringLazy<AccessTokenResponse> _tokenResponse;
27+
28+
// long polling duration
29+
private const int ReadTimeOut = 120 * 1000;
30+
31+
public bool IsConnected => _bayeuxClient.Connected;
32+
33+
public event EventHandler<bool> Reconnect;
34+
35+
public ResilientStreamingClient(
36+
Func<AsyncExpiringLazy<AccessTokenResponse>> tokenResponse,
37+
IOptions<SalesforceConfiguration> options,
38+
ILogger<ResilientStreamingClient> logger)
39+
{
40+
_logger = logger;
41+
_options = options.Value;
42+
_tokenResponse = tokenResponse();
43+
44+
CreateBayeuxClient();
45+
}
46+
47+
///<inheritdoc/>
48+
public void Disconnect()
49+
{
50+
Disconnect(1000);
51+
}
52+
53+
///<inheritdoc/>
54+
public void Disconnect(int timeout)
55+
{
56+
if (_isDisposed)
57+
{
58+
throw new ObjectDisposedException("Cannot disconnect when disposed");
59+
}
60+
61+
_bayeuxClient?.ResetSubscriptions();
62+
63+
_logger.LogDebug("Disconnecting...");
64+
_bayeuxClient?.Disconnect();
65+
_bayeuxClient?.WaitFor(timeout, new[] { BayeuxClient.State.DISCONNECTED });
66+
67+
_errorExtension.ConnectionError -= ErrorExtension_ConnectionError;
68+
_errorExtension.ConnectionException -= ErrorExtension_ConnectionException;
69+
_errorExtension.ConnectionMessage -= ErrorExtension_ConnectionMessage;
70+
71+
_logger.LogDebug("Disconnected...");
72+
}
73+
74+
/// <summary>
75+
/// The Handshake uses the default value of 1000 ms.
76+
/// </summary>
77+
public void Handshake()
78+
{
79+
Handshake(1000);
80+
}
81+
82+
///<inheritdoc/>
83+
public void Handshake(int timeout)
84+
{
85+
if (_isDisposed)
86+
{
87+
throw new ObjectDisposedException("Cannot connect when disposed");
88+
}
89+
90+
_logger.LogDebug("Handshaking...");
91+
_bayeuxClient.Handshake();
92+
_bayeuxClient.WaitFor(timeout, new[] { BayeuxClient.State.CONNECTED });
93+
_logger.LogDebug("Connected");
94+
}
95+
96+
///<inheritdoc/>
97+
public void SubscribeTopic(
98+
string topicName,
99+
IMessageListener listener,
100+
long replayId = -1)
101+
{
102+
if (topicName == null || (topicName = topicName.Trim()).Length == 0)
103+
{
104+
throw new ArgumentNullException(nameof(topicName));
105+
}
106+
107+
if (listener == null)
108+
{
109+
throw new ArgumentNullException(nameof(listener));
110+
}
111+
112+
var channel = _bayeuxClient.GetChannel(topicName, replayId);
113+
channel?.Subscribe(listener);
114+
}
115+
116+
///<inheritdoc/>
117+
public bool UnsubscribeTopic(
118+
string topicName,
119+
IMessageListener listener = null,
120+
long replayId = -1)
121+
{
122+
if (topicName == null || (topicName = topicName.Trim()).Length == 0)
123+
{
124+
throw new ArgumentNullException(nameof(topicName));
125+
}
126+
127+
var channel = _bayeuxClient.GetChannel(topicName, replayId);
128+
if (channel != null)
129+
{
130+
if (listener != null)
131+
{
132+
channel.Unsubscribe(listener);
133+
}
134+
else
135+
{
136+
channel.Unsubscribe();
137+
}
138+
return true;
139+
}
140+
return false;
141+
}
142+
143+
/// <summary>
144+
/// Disposing of the resources
145+
/// </summary>
146+
public void Dispose()
147+
{
148+
Dispose(true);
149+
GC.SuppressFinalize(this);
150+
}
151+
152+
/// <summary>
153+
/// Disposing of the resources
154+
/// </summary>
155+
/// <param name="disposing"></param>
156+
protected virtual void Dispose(bool disposing)
157+
{
158+
if (disposing && !_isDisposed)
159+
{
160+
Disconnect();
161+
_isDisposed = true;
162+
}
163+
}
164+
165+
~ResilientStreamingClient()
166+
{
167+
Dispose(false);
168+
}
169+
170+
private void CreateBayeuxClient()
171+
{
172+
if (_isDisposed)
173+
{
174+
throw new ObjectDisposedException("Cannot create connection when disposed");
175+
}
176+
177+
_logger.LogDebug("Creating {name} ...", nameof(BayeuxClient));
178+
179+
var accessToken = _tokenResponse.Value().Result;
180+
181+
// only need the scheme and host, strip out the rest
182+
var serverUri = new Uri(accessToken.InstanceUrl);
183+
var endpoint = $"{serverUri.Scheme}://{serverUri.Host}{_options.CometDUri}";
184+
185+
var headers = new NameValueCollection { { HttpRequestHeader.Authorization.ToString(), $"OAuth {accessToken.AccessToken}" } };
186+
187+
// Salesforce socket timeout during connection(CometD session) = 110 seconds
188+
var options = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
189+
{
190+
{ClientTransport.TIMEOUT_OPTION, _options.ReadTimeOut ?? ReadTimeOut },
191+
{ClientTransport.MAX_NETWORK_DELAY_OPTION, _options.ReadTimeOut ?? ReadTimeOut }
192+
};
193+
194+
_clientTransport = new LongPollingTransport(options, headers);
195+
196+
_bayeuxClient = new BayeuxClient(endpoint, _clientTransport);
197+
198+
// adds logging and also raises an event to process reconnection to the server.
199+
_errorExtension = new ErrorExtension();
200+
_errorExtension.ConnectionError += ErrorExtension_ConnectionError;
201+
_errorExtension.ConnectionException += ErrorExtension_ConnectionException;
202+
_errorExtension.ConnectionMessage += ErrorExtension_ConnectionMessage;
203+
204+
_replayIdExtension = new ReplayExtension();
205+
_bayeuxClient.AddExtension(_replayIdExtension);
206+
207+
_logger.LogDebug("{name} was created...", nameof(BayeuxClient));
208+
}
209+
210+
private void ErrorExtension_ConnectionError(
211+
object sender,
212+
string e)
213+
{
214+
// authentication failure
215+
if (string.Equals(e, "403::Handshake denied", StringComparison.OrdinalIgnoreCase)
216+
|| string.Equals(e, "403:denied_by_security_policy:create_denied", StringComparison.OrdinalIgnoreCase)
217+
|| string.Equals(e, "403::unknown client", StringComparison.OrdinalIgnoreCase)
218+
)
219+
{
220+
_logger.LogWarning("Handled CometD Exception: {message}", e);
221+
222+
// 1. Disconnect existing client.
223+
Disconnect();
224+
225+
// 2. Invalidate the access token.
226+
_tokenResponse.Invalidate();
227+
228+
_logger.LogDebug("Invalidate token for {name} ...", nameof(BayeuxClient));
229+
230+
// 3. Recreate BayeuxClient and populate it with a new transport with new security headers.
231+
CreateBayeuxClient();
232+
233+
// 4. Invoke the Reconnect Event
234+
Reconnect?.Invoke(this, true);
235+
}
236+
else
237+
{
238+
_logger.LogError("{name} failed with the following message: {message}", nameof(StreamingClient), e);
239+
}
240+
}
241+
242+
private void ErrorExtension_ConnectionException(
243+
object sender,
244+
Exception ex)
245+
{
246+
// ongoing time out issue not to be considered as error in the log.
247+
if (ex?.Message == "The operation has timed out.")
248+
{
249+
_logger.LogDebug(ex.Message);
250+
}
251+
else
252+
{
253+
_logger.LogError(ex.ToString());
254+
}
255+
}
256+
257+
private void ErrorExtension_ConnectionMessage(
258+
object sender,
259+
string meaage)
260+
{
261+
_logger.LogDebug(meaage);
262+
}
263+
}
264+
}

0 commit comments

Comments
 (0)