Class ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder

java.lang.Object
com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder
Enclosing class:
ServiceBusClientBuilder

public final class ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder extends Object
Builder for creating ServiceBusProcessorClient to consume messages from a session-based Service Bus entity. ServiceBusProcessorClient processes messages and errors via processMessage(Consumer) and processError(Consumer). When the processor finishes processing a session, it tries to fetch the next session to process.

By default, the processor:

Instantiate a session-enabled processor client

// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
    ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
        message.getSessionId(), message.getSequenceNumber(), message.getBody());
};

Consumer<ServiceBusErrorContext> onError = context -> {
    System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
        context.getFullyQualifiedNamespace(), context.getEntityPath());

    if (context.getException() instanceof ServiceBusException) {
        ServiceBusException exception = (ServiceBusException) context.getException();

        System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
            exception.getReason());
    } else {
        System.out.printf("Error occurred: %s%n", context.getException());
    }
};

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
    .credential(fullyQualifiedNamespace, tokenCredential)
    .sessionProcessor()
    .queueName(sessionEnabledQueueName)
    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
    .disableAutoComplete()
    .maxConcurrentSessions(2)
    .processMessage(onMessage)
    .processError(onError)
    .buildProcessorClient();

// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();

// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
See Also: