Class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder

java.lang.Object
com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusProcessorClientBuilder
Enclosing class:
ServiceBusClientBuilder

public final class ServiceBusClientBuilder.ServiceBusProcessorClientBuilder extends Object
Builder for creating ServiceBusProcessorClient to consume messages from a Service Bus entity. ServiceBusProcessorClients provides a push-based mechanism that notifies the message processing callback when a message is received or the error handle when an error is observed. To create an instance, therefore, configuring the two callbacks - processMessage(Consumer) and processError(Consumer) are necessary. By default, a ServiceBusProcessorClient is configured with auto-completion and auto-lock renewal capabilities.

Sample code to instantiate a processor client and receive in PeekLock mode

// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
    // handling message reaches desired state such that it doesn't require Service Bus to redeliver
    // the same message, then context.complete() should be called otherwise context.abandon().
    final boolean success = Math.random() < 0.5;
    if (success) {
        try {
            context.complete();
        } catch (RuntimeException error) {
            System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                message.getMessageId(), error);
        }
    } else {
        try {
            context.abandon();
        } catch (RuntimeException error) {
            System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                message.getMessageId(), error);
        }
    }
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    if (errorContext.getException() instanceof ServiceBusException) {
        ServiceBusException exception = (ServiceBusException) errorContext.getException();

        System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
            exception.getReason());
    } else {
        System.out.printf("Error occurred: %s%n", errorContext.getException());
    }
};

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .credential(fullyQualifiedNamespace, tokenCredential)
    .processor()
    .queueName(queueName)
    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
    .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background. Control returns immediately.
processorClient.start();

// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();

Sample code to instantiate a processor client and receive in ReceiveAndDelete mode

// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
        message.getSessionId(), message.getSequenceNumber(), message.getBody());
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    if (errorContext.getException() instanceof ServiceBusException) {
        ServiceBusException exception = (ServiceBusException) errorContext.getException();

        System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
            exception.getReason());
    } else {
        System.out.printf("Error occurred: %s%n", errorContext.getException());
    }
};

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .credential(fullyQualifiedNamespace, tokenCredential)
    .processor()
    .queueName(queueName)
    .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background. Control returns immediately.
processorClient.start();

// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();
See Also: