Skip to content

System.NullReferenceException: Object reference not set to an instance of an object in CometD.NetCore.Salesforce.ResilientStreamingClient.ErrorExtension_ConnectionError function. #41

@KwizaKaneria

Description

@KwizaKaneria

Currently, we are using this library to listen to push topics whenever there is a change in salesforce. We are using this library as follows.

Code for registering an event and configuring the Salesforce Streaming Client in Program.cs is:

public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureServices((hostContext, services) =>
{
var salesforceConfiguration = services.BuildServiceProvider().GetRequiredService<IOptions>().Value;
services.AddResilientStreamingClient("", "", (conf) =>
{
conf.ClientId = salesforceConfiguration.ClientId;
conf.ClientSecret = salesforceConfiguration.ClientSecret;
conf.RefreshToken = salesforceConfiguration.RefreshToken;
conf.LoginUrl = salesforceConfiguration.BaseUrl;
conf.OAuthUri = Constants.OAuthUriConfig;
conf.EventOrTopicUri = Constants.EventOrTopicConfig;
conf.CometDUri = salesforceConfiguration.CometdUrl;
});
services.AddSingleton<IEventBus, EventBus>();
services.AddHostedService();
services.AddTransient<IMessageListener, UpdatedListener>();
services.AddTransient<IMessageListener, DeletedListener>();
});

    var app = builder.Build();
    app.Run();
}

Code for subscribing to events is:

public class SalesforceEventBusHostedService:IHostedService
{
private readonly ILogger _logger;
private readonly IEventBus _eventBus;
private readonly ICacheService _cacheService;

//Make a list of push topics that we created in salesforce
private readonly List<Tuple<string,int,Type>> _eventsMapping = new()
{
new Tuple<string,int,Type>("topic/Updated",-1,typeof(UpdatedListener)),
new Tuple<string, int, Type>("topic/Deleted", -1, typeof(DeletedListener))
};

public SalesforceEventBusHostedService(ILogger<SalesforceEventBusHostedService> logger,IEventBus eventBus, ICacheService<ReplayIdDto> cacheService)
{
    _logger = logger;
    _eventBus = eventBus;
    _cacheService = cacheService;
    _cacheService.CacheRegion = $"{Constants.MessageListenerCacheRegion}";
}

private static object[] GetPlatformEventObject(string eventName,int replayId,Type eventType)
{
    var platformEvent = Activator.CreateInstance(typeof(PlatformEvent<>).MakeGenericType(eventType));
    platformEvent?.GetType().GetProperty("Name")?.SetValue(platformEvent, eventName);
    platformEvent?.GetType().GetProperty("ReplayId")?.SetValue(platformEvent, replayId);
    return new[] {platformEvent};
}

private async Task SubscribeToEvent(string eventName,int replayId,Type eventType)
{
    var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
    var subscribeMethod = typeof(IEventBus).GetMethod("Subscribe")?.MakeGenericMethod(eventType);
    await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}

private async Task UnSubscribeToEvent(string eventName,int replayId,Type eventType)
{
    var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
    var subscribeMethod = typeof(IEventBus).GetMethod("Unsubscribe")?.MakeGenericMethod(eventType);
    await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} starting.");
    for (var i = 0; i < _eventsMapping.ToList().Count; i++)
    {
        var (originalEventName, originalReplayId, originalEventType) = _eventsMapping[i];
        var replayIdFromCache = (await _cacheService.GetItemAsync(StreamingApiHelper.RemoveEventOrTopicPrefix(originalEventName)))?.ReplayId??originalReplayId;
        var (eventName, replayId, eventType) = _eventsMapping[i] = new Tuple<string, int, Type>(originalEventName, replayIdFromCache, originalEventType);
        await SubscribeToEvent(eventName, replayId, eventType);
    }
}

public async Task StopAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} stopped.");
    foreach (var (eventName,replayId, eventType) in _eventsMapping)
    {
        await UnSubscribeToEvent(eventName, replayId, eventType);
    }
}

}

But I face an issue like:

System.NullReferenceException: Object reference not set to an instance of an object.
at CometD.NetCore.Salesforce.ResilientStreamingClient.ErrorExtension_ConnectionError(Object sender, String e) in C:\projects\cometd-netcore-salesforce\src\CometD.NetCore.Salesforce\ResilientStreamingClient.cs:line 210
at CometD.NetCore.Client.Extension.ErrorExtension.ReceiveMeta(IClientSession session, IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\Extension\ErrorExtension.cs:line 74
at CometD.NetCore.Common.AbstractClientSession.ExtendReceive(IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Common\AbstractClientSession.cs:line 188
at CometD.NetCore.Common.AbstractClientSession.Receive(IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Common\AbstractClientSession.cs:line 155
at CometD.NetCore.Client.BayeuxClient.PublishTransportListener.OnMessages(IList`1 messages) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\BayeuxClient.cs:line 769
at CometD.NetCore.Client.Transport.LongPollingTransport.GetResponseCallback(IAsyncResult asynchronousResult) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\Transport\LongPollingTransport.cs:line 310

Can anyone please guide me on why this error is received and if there is something I should be updating on our end?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions