Class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder
- Enclosing class:
- ServiceBusClientBuilder
ServiceBusProcessorClient to consume messages from a Service Bus entity.
 ServiceBusProcessorClients provides a push-based mechanism that notifies
 the message processing callback when a message is received or the error handle when an error is observed. To
 create an instance, therefore, configuring the two callbacks - processMessage(Consumer) and
 processError(Consumer) are necessary. By default, a ServiceBusProcessorClient is configured
 with auto-completion and auto-lock renewal capabilities.
 Sample code to instantiate a processor client and receive in PeekLock mode
 // Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };
 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();
         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };
 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();
 // Starts the processor in the background. Control returns immediately.
 processorClient.start();
 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();
 
 
 Sample code to instantiate a processor client and receive in ReceiveAndDelete mode
 // Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };
 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();
         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };
 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();
 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();
 // Starts the processor in the background. Control returns immediately.
 processorClient.start();
 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();
 
 - See Also:
- 
Method SummaryModifier and TypeMethodDescriptionCreates Service Bus message processor responsible for readingmessagesfrom a specific queue or subscription.Disables auto-complete and auto-abandon of received messages.maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration) Sets the amount of time to continue auto-renewing the lock.maxConcurrentCalls(int maxConcurrentCalls) Max concurrent messages that this processor should process.prefetchCount(int prefetchCount) Sets the prefetch count of the processor.processError(Consumer<ServiceBusErrorContext> processError) The error handler for the processor which will be invoked in the event of an error while receiving messages.processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) The message processing callback for the processor which will be executed when a message is received.Sets the name of the queue to create a processor for.receiveMode(ServiceBusReceiveMode receiveMode) Sets the receive mode for the processor.Sets the type of theSubQueueto connect to.subscriptionName(String subscriptionName) Sets the name of the subscription in the topic to listen to.Sets the name of the topic.
- 
Method Details- 
prefetchCountSets the prefetch count of the processor. For bothPEEK_LOCKandRECEIVE_AND_DELETEmodes the default value is 0. Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when and before the application starts the processor. Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off.- Parameters:
- prefetchCount- The prefetch count.
- Returns:
- The modified ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
 
- 
queueNameSets the name of the queue to create a processor for.- Parameters:
- queueName- Name of the queue.
- Returns:
- The modified ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
 
- 
receiveModepublic ServiceBusClientBuilder.ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) Sets the receive mode for the processor.- Parameters:
- receiveMode- Mode for receiving messages.
- Returns:
- The modified ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
 
- 
subQueueSets the type of theSubQueueto connect to. Azure Service Bus queues and subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ).- Parameters:
- subQueue- The type of the sub queue.
- Returns:
- The modified ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
- See Also:
 
- 
subscriptionNamepublic ServiceBusClientBuilder.ServiceBusProcessorClientBuilder subscriptionName(String subscriptionName) Sets the name of the subscription in the topic to listen to.topicName(String)must also be set.- Parameters:
- subscriptionName- Name of the subscription.
- Returns:
- The modified ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
- See Also:
 
- 
topicNameSets the name of the topic.subscriptionName(String)must also be set.- Parameters:
- topicName- Name of the topic.
- Returns:
- The modified ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
- See Also:
 
- 
processMessagepublic ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) The message processing callback for the processor which will be executed when a message is received.- Parameters:
- processMessage- The message processing consumer that will be executed when a message is received.
- Returns:
- The updated ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
 
- 
processErrorpublic ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError) The error handler for the processor which will be invoked in the event of an error while receiving messages.- Parameters:
- processError- The error handler which will be executed when an error occurs.
- Returns:
- The updated ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject
 
- 
maxAutoLockRenewDurationpublic ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration) Sets the amount of time to continue auto-renewing the lock. SettingDuration.ZEROornulldisables auto-renewal. ForRECEIVE_AND_DELETEmode, auto-renewal is disabled.A Service Bus queue or subscription in a topic will have a lock duration set at the resource level. When the processor client pulls a message from the resource, the broker will apply an initial lock to the message. This initial lock lasts for the lock duration set at the resource level. If the client does not renew the initial lock before it expires then the message will be released and become available for other receivers. Each time the client renews the lock, the broker will extend the lock for the lock duration set at the resource level. To keep the message locked, the client will have to continuously renew the message lock before its expiration. maxAutoLockRenewDurationcontrols how long the background renewal task runs. So, it is possible that the previous renewed lock can be valid after the renewal task is disposed.By default, the message lock renewal task will run for 5 minutes. - Parameters:
- maxAutoLockRenewDuration- the amount of time to continue auto-renewing the lock.- Duration.ZEROor- nullindicates that auto-renewal is disabled.
- Returns:
- The updated ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
- Throws:
- IllegalArgumentException- If {code maxAutoLockRenewDuration} is negative.
 
- 
maxConcurrentCallspublic ServiceBusClientBuilder.ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls) Max concurrent messages that this processor should process. By default, this is set to 1.This setting allows the application to configure the number of concurrent calls to the message processing callback processMessage(Consumer), enabling the processing of multiple messages in parallel.- Parameters:
- maxConcurrentCalls- max concurrent messages that this processor should process.
- Returns:
- The updated ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
- Throws:
- IllegalArgumentException- if the- maxConcurrentCallsis set to a value less than 1.
- See Also:
 
- 
disableAutoCompleteDisables auto-complete and auto-abandon of received messages. By default, a successfully processed message iscompleted. If an error happens when the message is processed, it isabandoned.- Returns:
- The modified ServiceBusClientBuilder.ServiceBusProcessorClientBuilderobject.
 
- 
buildProcessorClientCreates Service Bus message processor responsible for readingmessagesfrom a specific queue or subscription.- Returns:
- An new ServiceBusProcessorClientthat processes messages from a queue or subscription.
- Throws:
- IllegalStateException- if- queueNameor- topicNameare not set or, both of these fields are set. It is also thrown if the Service Bus- connectionStringcontains an- EntityPaththat does not match one set in- queueNameor- topicName. Lastly, if a- topicNameis set, but- subscriptionNameis not.
- IllegalArgumentException- Queue or topic name are not set via- queueName()or- topicName(), respectively.
- NullPointerException- if the- processMessage(Consumer)or- processError(Consumer)callbacks are not set.
 
 
-