Class PluggableCheckpointStoreEventProcessor<TPartition>
Provides a base for creating an event processor with custom processing logic which consumes events across all partitions of a given Event Hub for a specific consumer group. The processor is capable of collaborating with other instances for the same Event Hub and consumer group pairing to share work by using a common storage platform to communicate. Fault tolerance is also built-in, allowing the processor to be resilient in the face of errors.
Inheritance
Inherited Members
Namespace: System.Dynamic.ExpandoObject
Assembly: Azure.Messaging.EventHubs.dll
Syntax
public abstract class PluggableCheckpointStoreEventProcessor<TPartition> : Azure.Messaging.EventHubs.Primitives.EventProcessor<TPartition> where TPartition : EventProcessorPartitionnew()
Type Parameters
TPartition
The context of the partition for which an operation is being performed. |
Remarks
To enable coordination for sharing of partitions between PluggableCheckpointStoreEventProcessor<TPartition> instances, they will assert exclusive read access to partitions for the consumer group. No other readers should be active in the consumer group other than processors intending to collaborate. Non-exclusive readers will be denied access; exclusive readers, including processors using a different storage locations, will interfere with the processor's operation and performance.
The PluggableCheckpointStoreEventProcessor<TPartition> is safe to cache and use for the lifetime of an application, and that is best practice when the application processes events regularly or semi-regularly. The processor holds responsibility for efficient resource management, working to keep resource usage low during periods of inactivity and manage health during periods of higher use. Calling either the StopProcessingAsync(CancellationToken) or StopProcessing(CancellationToken) method when processing is complete or as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
Constructors
PluggableCheckpointStoreEventProcessor<TPartition>()
Initializes a new instance of the PluggableCheckpointStoreEventProcessor<TPartition> class.
Declaration
protected PluggableCheckpointStoreEventProcessor ();
PluggableCheckpointStoreEventProcessor<TPartition>(CheckpointStore, Int32, String, String, EventProcessorOptions)
Initializes a new instance of the PluggableCheckpointStoreEventProcessor<TPartition> class.
Declaration
protected PluggableCheckpointStoreEventProcessor (Azure.Messaging.EventHubs.Primitives.CheckpointStore checkpointStore, int eventBatchMaximumCount, string consumerGroup, string connectionString, Azure.Messaging.EventHubs.Primitives.EventProcessorOptions options = null);
Parameters
CheckpointStore
checkpointStore
Responsible for creation of checkpoints and for ownership claim. Processor instances sharing this storage will attempt to coordinate and share work. |
System.Int32
eventBatchMaximumCount
The desired number of events to include in a batch to be processed. This size is the maximum count in a batch; the actual count may be smaller, depending on whether events are available in the Event Hub. |
System.String
consumerGroup
The name of the consumer group this processor is associated with. The processor will assert exclusive read access to partitions for this group. |
System.String
connectionString
The connection string to use for connecting to the Event Hubs namespace; it is expected that the Event Hub name and the shared key properties are contained in this connection string. |
EventProcessorOptions
options
The set of options to use for the processor. |
Remarks
If the connection string is copied from the Event Hubs namespace, it will likely not contain the name of the desired Event Hub,
which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]" to the end of the
connection string. For example, ";EntityPath=telemetry-hub".
If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that
Event Hub will result in a connection string that contains the name.
Exceptions
System.ArgumentOutOfRangeException
Occurs when the requested |
PluggableCheckpointStoreEventProcessor<TPartition>(CheckpointStore, Int32, String, String, String, EventProcessorOptions)
Initializes a new instance of the PluggableCheckpointStoreEventProcessor<TPartition> class.
Declaration
protected PluggableCheckpointStoreEventProcessor (Azure.Messaging.EventHubs.Primitives.CheckpointStore checkpointStore, int eventBatchMaximumCount, string consumerGroup, string connectionString, string eventHubName, Azure.Messaging.EventHubs.Primitives.EventProcessorOptions options = null);
Parameters
CheckpointStore
checkpointStore
The provider of checkpoint and ownership data for the processor. Processor instances sharing this storage will attempt to coordinate and share work. |
System.Int32
eventBatchMaximumCount
The desired number of events to include in a batch to be processed. This size is the maximum count in a batch; the actual count may be smaller, depending on whether events are available in the Event Hub. |
System.String
consumerGroup
The name of the consumer group this processor is associated with. The processor will assert exclusive read access to partitions for this group. |
System.String
connectionString
The connection string to use for connecting to the Event Hubs namespace; it is expected that the shared key properties are contained in this connection string, but not the Event Hub name. |
System.String
eventHubName
The name of the specific Event Hub to associate the processor with. |
EventProcessorOptions
options
The set of options to use for the processor. |
Remarks
If the connection string is copied from the Event Hub itself, it will contain the name of the desired Event Hub,
and can be used directly without passing the eventHubName
. The name of the Event Hub should be
passed only once, either as part of the connection string or separately.
Exceptions
System.ArgumentOutOfRangeException
Occurs when the requested |
PluggableCheckpointStoreEventProcessor<TPartition>(CheckpointStore, Int32, String, String, String, AzureNamedKeyCredential, EventProcessorOptions)
Initializes a new instance of the PluggableCheckpointStoreEventProcessor<TPartition> class.
Declaration
protected PluggableCheckpointStoreEventProcessor (Azure.Messaging.EventHubs.Primitives.CheckpointStore checkpointStore, int eventBatchMaximumCount, string consumerGroup, string fullyQualifiedNamespace, string eventHubName, Azure.AzureNamedKeyCredential credential, Azure.Messaging.EventHubs.Primitives.EventProcessorOptions options = null);
Parameters
CheckpointStore
checkpointStore
The provider of checkpoint and ownership data for the processor. Processor instances sharing this storage will attempt to coordinate and share work. |
System.Int32
eventBatchMaximumCount
The desired number of events to include in a batch to be processed. This size is the maximum count in a batch; the actual count may be smaller, depending on whether events are available in the Event Hub. |
System.String
consumerGroup
The name of the consumer group this processor is associated with. The processor will assert exclusive read access to partitions for this group. |
System.String
fullyQualifiedNamespace
The fully qualified Event Hubs namespace to connect to. This is likely to be similar to |
System.String
eventHubName
The name of the specific Event Hub to associate the processor with. |
Azure.AzureNamedKeyCredential
credential
The shared access key credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration. |
EventProcessorOptions
options
The set of options to use for the processor. |
Exceptions
System.ArgumentOutOfRangeException
Occurs when the requested |
PluggableCheckpointStoreEventProcessor<TPartition>(CheckpointStore, Int32, String, String, String, AzureSasCredential, EventProcessorOptions)
Initializes a new instance of the PluggableCheckpointStoreEventProcessor<TPartition> class.
Declaration
protected PluggableCheckpointStoreEventProcessor (Azure.Messaging.EventHubs.Primitives.CheckpointStore checkpointStore, int eventBatchMaximumCount, string consumerGroup, string fullyQualifiedNamespace, string eventHubName, Azure.AzureSasCredential credential, Azure.Messaging.EventHubs.Primitives.EventProcessorOptions options = null);
Parameters
CheckpointStore
checkpointStore
The provider of checkpoint and ownership data for the processor. Processor instances sharing this storage will attempt to coordinate and share work. |
System.Int32
eventBatchMaximumCount
The desired number of events to include in a batch to be processed. This size is the maximum count in a batch; the actual count may be smaller, depending on whether events are available in the Event Hub. |
System.String
consumerGroup
The name of the consumer group this processor is associated with. The processor will assert exclusive read access to partitions for this group. |
System.String
fullyQualifiedNamespace
The fully qualified Event Hubs namespace to connect to. This is likely to be similar to |
System.String
eventHubName
The name of the specific Event Hub to associate the processor with. |
Azure.AzureSasCredential
credential
The shared signature credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration. |
EventProcessorOptions
options
The set of options to use for the processor. |
Exceptions
System.ArgumentOutOfRangeException
Occurs when the requested |
PluggableCheckpointStoreEventProcessor<TPartition>(CheckpointStore, Int32, String, String, String, TokenCredential, EventProcessorOptions)
Initializes a new instance of the PluggableCheckpointStoreEventProcessor<TPartition> class.
Declaration
protected PluggableCheckpointStoreEventProcessor (Azure.Messaging.EventHubs.Primitives.CheckpointStore checkpointStore, int eventBatchMaximumCount, string consumerGroup, string fullyQualifiedNamespace, string eventHubName, Azure.Core.TokenCredential credential, Azure.Messaging.EventHubs.Primitives.EventProcessorOptions options = null);
Parameters
CheckpointStore
checkpointStore
The provider of checkpoint and ownership data for the processor. Processor instances sharing this storage will attempt to coordinate and share work. |
System.Int32
eventBatchMaximumCount
The desired number of events to include in a batch to be processed. This size is the maximum count in a batch; the actual count may be smaller, depending on whether events are available in the Event Hub. |
System.String
consumerGroup
The name of the consumer group this processor is associated with. The processor will assert exclusive read access to partitions for this group. |
System.String
fullyQualifiedNamespace
The fully qualified Event Hubs namespace to connect to. This is likely to be similar to |
System.String
eventHubName
The name of the specific Event Hub to associate the processor with. |
Azure.Core.TokenCredential
credential
The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration. |
EventProcessorOptions
options
The set of options to use for the processor. |
Exceptions
System.ArgumentOutOfRangeException
Occurs when the requested |
Methods
ClaimOwnershipAsync(IEnumerable<EventProcessorPartitionOwnership>, CancellationToken)
Attempts to claim ownership of the specified partitions for processing. This operation is used by load balancing to enable distributing the responsibility for processing partitions for an Event Hub and consumer group pairing amongst the active event processors.
Declaration
protected override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync (System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken);
Parameters
System.Collections.Generic.IEnumerable<EventProcessorPartitionOwnership>
desiredOwnership
The set of partition ownership desired by the event processor instance; this is the set of partitions that it will attempt to request responsibility for processing. |
System.Threading.CancellationToken
cancellationToken
A System.Threading.CancellationToken instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down. |
Returns
System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<EventProcessorPartitionOwnership>>
The set of ownership records for the partitions that were successfully claimed; this is expected to be the |
GetCheckpointAsync(String, CancellationToken)
Returns a checkpoint for the Event Hub, consumer group, and identifier of the partition associated with the event processor instance, so that processing for a given partition can be properly initialized.
Declaration
protected override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync (string partitionId, System.Threading.CancellationToken cancellationToken);
Parameters
System.String
partitionId
The identifier of the partition for which to retrieve the checkpoint. |
System.Threading.CancellationToken
cancellationToken
A System.Threading.CancellationToken instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down. |
Returns
System.Threading.Tasks.Task<EventProcessorCheckpoint>
The checkpoint for the processor to take into account when initializing partition. |
Remarks
Should a partition not have a corresponding checkpoint, the DefaultStartingPosition will be used to initialize the partition for processing.
In the event that a custom starting point is desired for a single partition, or each partition should start at a unique place, it is recommended that this method express that intent by returning checkpoints for those partitions with the desired custom starting location set.
ListOwnershipAsync(CancellationToken)
Requests a list of the ownership assignments for partitions between each of the cooperating event processor instances for a given Event Hub and consumer group pairing. This method is used during load balancing to allow the processor to discover other active collaborators and to make decisions about how to best balance work between them.
Declaration
protected override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync (System.Threading.CancellationToken cancellationToken);
Parameters
System.Threading.CancellationToken
cancellationToken
A System.Threading.CancellationToken instance to signal the request to cancel the processing. This is most likely to occur when the processor is shutting down. |
Returns
System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<EventProcessorPartitionOwnership>>
The set of ownership data to take into account when making load balancing decisions. |
UpdateCheckpointAsync(String, CheckpointPosition, CancellationToken)
Creates or updates a checkpoint for a specific partition, identifying a position in the partition's event stream that an event processor should begin reading from.
Declaration
protected override System.Threading.Tasks.Task UpdateCheckpointAsync (string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken);
Parameters
System.String
partitionId
The identifier of the partition the checkpoint is for. |
CheckpointPosition
startingPosition
The starting position to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream. |
System.Threading.CancellationToken
cancellationToken
A System.Threading.CancellationToken instance to signal a request to cancel the operation |
Returns
System.Threading.Tasks.Task
|
UpdateCheckpointAsync(String, Int64, Nullable<Int64>, CancellationToken)
Obsolete.
Creates or updates a checkpoint for a specific partition, identifying a position in the partition's event stream that an event processor should begin reading from.
Declaration
[System.ComponentModel.EditorBrowsable]
[System.Obsolete("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync (string partitionId, long offset, Nullable<long> sequenceNumber, System.Threading.CancellationToken cancellationToken);
Parameters
System.String
partitionId
The identifier of the partition the checkpoint is for. |
System.Int64
offset
The offset to associate with the checkpoint, intended as informational metadata. This will only be used for positioning if there is no value provided for |
System.Nullable<System.Int64>
sequenceNumber
The sequence number to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream. |
System.Threading.CancellationToken
cancellationToken
A System.Threading.CancellationToken instance to signal a request to cancel the operation. |
Returns
System.Threading.Tasks.Task
|
Remarks
This method is obsolete and should no longer be used. Please use UpdateCheckpointAsync(String, CheckpointPosition, CancellationToken) instead.