I’m currently working on a project where we use Azure Event Hubs to collect telemetry data from connected field devices such as gas and electricity meters. This data gets processed in a variety of ways. We use Azure Stream Analytics to transform and forward the data to various subscribers. For more complex scenario’s where the functionality provided by Stream Analytics isn’t sufficient, we use the Event Processor Host SDK (hosted in a stateless Service Fabric service) to process the events.

The Event Processor Host SDK works by registering event processors with the Event Processor Host. An event processor has a ProcessEventsAsync method which is called for each batch of messages received from the Event Hub. In the code sample below, I filter out all interactive events (such as alerts from the electricity meters) and forward those messages to a Service Bus queue for further processing.

public class InteractiveEventProcessor : IEventProcessor
{
    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        // Filter out all the events that must be forwarded to a service bus topic.
        var interactiveEvents = messages.Where(msg => msg.Properties[“IsInteractive”] == true).ToList();

        // Publish the filtered events to the topic.
        await PublishToServiceBus(interactiveEvents);

        // Set a checkpoint to mark our progress.
        await context.CheckpointAsync();
    }
    ...

While running some integration tests, I noticed that not all interactive events were being forwarded to the Service Bus queue. Further investigation showed that the InteractiveEventProcessor encountered some exceptions when the service started. These were transient errors caused by the fact that I’m deleting and re-creating Service Bus queues on each test run. I didn’t think this would cause any problems because when the event processor is registered, I specify an exception handler by passing a configured EventProcessorOptions instance.

var processorOptions = new EventProcessorOptions();
processorOptions.ExceptionReceived += EventProcessorOptions_ExceptionReceived;

await _processorHost.RegisterEventProcessorAsync<InteractiveEventProcessor>(processorOptions);

The handler simply logs the exception. I’m using NLog here:

private void EventProcessorOptions_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
    _logger.Error()
        .Exception(e.Exception)
        .Message("Error occured in event processor")
        .Property("Action", e.Action)
        .Write();
}

After the exception is logged, the Event Processor Host will keep the InteractiveEventProcessor running. Since the exception occurs before the CheckpointAsync method has been called, the checkpoint is still at its original location and the ProcessEventsAsync method will receive the same batch of messages again to retry. At least I thought so…

As it turns out, that last batch of messages is lost. The Event Processor Host will continue with the next batch of messages as if nothing happened. The fact that the checkpoint is still at its original location is irrelevant because the checkpoint is only used when the lease for the InteractiveEventProcessor is lost, which isn’t the case here.

So if you don’t want to lose any messages, you need to add some retry logic to the ProcessEventsAsync method to handle any transient errors. One way to accomplish this is to use a specialized library such as Polly:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    // Filter out all the events that must be forwarded to a service bus topic.
    var interactiveEvents = messages.Where(msg => msg.Properties[“IsInteractive”] == true).ToList();

    // Publish the filtered events to the topic, wrapped in a Polly retry policy.
    Policy
        .Handle<MessagingException>()
        .WaitAndRetryForeverAsync(_ => TimeSpan.FromSeconds(1))
        .ExecuteAsync(() => PublishToServiceBus(interactiveEvents));

    // Set a checkpoint to mark our progress.
    await context.CheckpointAsync();
}

I don’t think it’s a bad design decision that the event processor host skips the messages when an exception is thrown. It was just a bit unexpected because the exception occured before the checkpoint call. Event Hubs and the Event Processor Host are mainly used to receive and process very large amounts of messages. In those scenario’s it’s usually more important to keep the messages flowing than to block the stream because some messages encountered an error. If you’re concerned about losing even a single message, Service Bus may be even a better option with its message-level Complete and Abandon operations.

Update 2017-02-20:

Some additional remarks:

  • Make sure your retry code correctly handles LeaseLostException. This exception is thrown when the specific event processor loses its lease on the Event Hub partition. The correct way to handle this is to either not catch it or rethrow it. Another event processor will get the lease and start processing from the last checkpointed position.

  • While retrying, the main program may want to cancel the event processor, which the code snippet above does not take into account. Since the ProcessEventsAsync method does not take a CancellationToken parameter, you’ll need to inject your own cancellation token through the constructor and check it periodically while retrying.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class InteractiveEventProcessor : IEventProcessor
    {
    private readonly CancellationToken _cancellationToken;

    public InteractiveEventProcessor(CancellationToken cancellationToken)
    {
    _cancellationToken = cancellationToken;
    }

    ...

    When an event processor does not have a default constructor, it needs an IEventProcessorFactory to be instantiated:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class InteractiveEventProcessorFactory : IEventProcessorFactory
    {
    private readonly CancellationToken _cancellationToken;

    public InteractiveEventProcessorFactory(CancellationToken cancellationToken)
    {
    _cancellationToken = cancellationToken;
    }

    public IEventProcessor CreateEventProcessor(PartitionContext context)
    {
    return new InteractiveEventProcessor(_cancellationToken);
    }
    }

    The last step is to register the factory with the Event Processor Host:

    1
    2
    3
    4
    CancellationTokenSource cts = new CancellationTokenSource();
    IEventProcessorFactory processorFactory = new InteractiveEventProcessorFactory(cts.Token);

    await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory);
  • Remember that the goal of the Event Processor Host is to process a lot of messages fast. Whether it’s worth it to take some time to perform retries when message processing fails is dependent on your specific scenario. An alternative to retrying the same message processing operation is to push the message on a dead-letter queue, which may be another Event Hub or some other queueing mechanism such as a Service Bus queue.