Package com.azure.messaging.servicebus
Class ServiceBusProcessorClient
java.lang.Object
com.azure.messaging.servicebus.ServiceBusProcessorClient
- All Implemented Interfaces:
AutoCloseable
The processor client for processing Service Bus messages.
ServiceBusProcessorClient
provides a push-based
mechanism that invokes the message processing callback when a message is received or the error handler when an error
occurs when receiving messages. A ServiceBusProcessorClient
can be created to process messages for a
session-enabled or non session-enabled Service Bus entity. It supports auto-settlement of messages by default.
Sample code to instantiate a processor client and receive in PeekLock mode
// Function that gets called whenever a message is received. Consumer<ServiceBusReceivedMessageContext> processMessage = context -> { final ServiceBusReceivedMessage message = context.getMessage(); // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic // handling message reaches desired state such that it doesn't require Service Bus to redeliver // the same message, then context.complete() should be called otherwise context.abandon(). final boolean success = Math.random() < 0.5; if (success) { try { context.complete(); } catch (RuntimeException error) { System.out.printf("Completion of the message %s failed.%n Error: %s%n", message.getMessageId(), error); } } else { try { context.abandon(); } catch (RuntimeException error) { System.out.printf("Abandoning of the message %s failed.%nError: %s%n", message.getMessageId(), error); } } }; // Sample code that gets called if there's an error Consumer<ServiceBusErrorContext> processError = errorContext -> { if (errorContext.getException() instanceof ServiceBusException) { ServiceBusException exception = (ServiceBusException) errorContext.getException(); System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(), exception.getReason()); } else { System.out.printf("Error occurred: %s%n", errorContext.getException()); } }; TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build(); // Create the processor client via the builder and its sub-builder // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net" ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .credential(fullyQualifiedNamespace, tokenCredential) .processor() .queueName(queueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon). .processMessage(processMessage) .processError(processError) .disableAutoComplete() .buildProcessorClient(); // Starts the processor in the background. Control returns immediately. processorClient.start(); // Stop processor and dispose when done processing messages. processorClient.stop(); processorClient.close();
Sample code to instantiate a processor client and receive in ReceiveAndDelete mode
// Function that gets called whenever a message is received. Consumer<ServiceBusReceivedMessageContext> processMessage = context -> { final ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getSessionId(), message.getSequenceNumber(), message.getBody()); }; // Sample code that gets called if there's an error Consumer<ServiceBusErrorContext> processError = errorContext -> { if (errorContext.getException() instanceof ServiceBusException) { ServiceBusException exception = (ServiceBusException) errorContext.getException(); System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(), exception.getReason()); } else { System.out.printf("Error occurred: %s%n", errorContext.getException()); } }; TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build(); // Create the processor client via the builder and its sub-builder // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net" // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon). ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .credential(fullyQualifiedNamespace, tokenCredential) .processor() .queueName(queueName) .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) .processMessage(processMessage) .processError(processError) .disableAutoComplete() .buildProcessorClient(); // Starts the processor in the background. Control returns immediately. processorClient.start(); // Stop processor and dispose when done processing messages. processorClient.stop(); processorClient.close();
Create and run a session-enabled processor
// Function that gets called whenever a message is received. Consumer<ServiceBusReceivedMessageContext> onMessage = context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getSessionId(), message.getSequenceNumber(), message.getBody()); }; Consumer<ServiceBusErrorContext> onError = context -> { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (context.getException() instanceof ServiceBusException) { ServiceBusException exception = (ServiceBusException) context.getException(); System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(), exception.getReason()); } else { System.out.printf("Error occurred: %s%n", context.getException()); } }; TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build(); // Create the processor client via the builder and its sub-builder // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net" ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder() .credential(fullyQualifiedNamespace, tokenCredential) .sessionProcessor() .queueName(sessionEnabledQueueName) .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .disableAutoComplete() .maxConcurrentSessions(2) .processMessage(onMessage) .processError(onError) .buildProcessorClient(); // Starts the processor in the background. Control returns immediately. sessionProcessor.start(); // Stop processor and dispose when done processing messages. sessionProcessor.stop(); sessionProcessor.close();
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
Stops message processing and closes the processor.Gets the identifier of the instance ofServiceBusProcessorClient
.Returns the queue name associated with this instance ofServiceBusProcessorClient
.Returns the subscription name associated with this instance ofServiceBusProcessorClient
.Returns the topic name associated with this instance ofServiceBusProcessorClient
.boolean
Returnstrue
if the processor is running.void
start()
Starts the processor in the background.void
stop()
Stops the message processing for this processor.
-
Method Details
-
start
public void start()Starts the processor in the background. When this method is called, the processor will initiate a message receiver that will invoke the message handler when new messages are available. This method is idempotent (ie. callingstart()
again after the processor is already running is a no-op).Calling
start()
after callingstop()
will resume processing messages using the same underlying connection.Calling
start()
after callingclose()
will start the processor with a new connection. -
stop
public void stop()Stops the message processing for this processor. The receiving links and sessions are kept active and this processor can resume processing messages by callingstart()
again. -
close
public void close()Stops message processing and closes the processor. The receiving links and sessions are closed and callingstart()
will create a new processing cycle with new links and new sessions.- Specified by:
close
in interfaceAutoCloseable
-
isRunning
public boolean isRunning()Returnstrue
if the processor is running. If the processor is stopped or closed, this method returnsfalse
.- Returns:
true
if the processor is running;false
otherwise.
-
getQueueName
Returns the queue name associated with this instance ofServiceBusProcessorClient
.- Returns:
- the queue name associated with this instance of
ServiceBusProcessorClient
ornull
if the processor instance is for a topic and subscription.
-
getTopicName
Returns the topic name associated with this instance ofServiceBusProcessorClient
.- Returns:
- the topic name associated with this instance of
ServiceBusProcessorClient
ornull
if the processor instance is for a queue.
-
getSubscriptionName
Returns the subscription name associated with this instance ofServiceBusProcessorClient
.- Returns:
- the subscription name associated with this instance of
ServiceBusProcessorClient
ornull
if the processor instance is for a queue.
-
getIdentifier
Gets the identifier of the instance ofServiceBusProcessorClient
.- Returns:
- The identifier that can identify the instance of
ServiceBusProcessorClient
.
-