Class ServiceBusProcessorClient

java.lang.Object
com.azure.messaging.servicebus.ServiceBusProcessorClient
All Implemented Interfaces:
AutoCloseable

public final class ServiceBusProcessorClient extends Object implements AutoCloseable
The processor client for processing Service Bus messages. ServiceBusProcessorClient provides a push-based mechanism that invokes the message processing callback when a message is received or the error handler when an error occurs when receiving messages. A ServiceBusProcessorClient can be created to process messages for a session-enabled or non session-enabled Service Bus entity. It supports auto-settlement of messages by default.

Sample code to instantiate a processor client and receive in PeekLock mode

// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
    // handling message reaches desired state such that it doesn't require Service Bus to redeliver
    // the same message, then context.complete() should be called otherwise context.abandon().
    final boolean success = Math.random() < 0.5;
    if (success) {
        try {
            context.complete();
        } catch (RuntimeException error) {
            System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                message.getMessageId(), error);
        }
    } else {
        try {
            context.abandon();
        } catch (RuntimeException error) {
            System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                message.getMessageId(), error);
        }
    }
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    if (errorContext.getException() instanceof ServiceBusException) {
        ServiceBusException exception = (ServiceBusException) errorContext.getException();

        System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
            exception.getReason());
    } else {
        System.out.printf("Error occurred: %s%n", errorContext.getException());
    }
};

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .credential(fullyQualifiedNamespace, tokenCredential)
    .processor()
    .queueName(queueName)
    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
    .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background. Control returns immediately.
processorClient.start();

// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();

Sample code to instantiate a processor client and receive in ReceiveAndDelete mode

// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
        message.getSessionId(), message.getSequenceNumber(), message.getBody());
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    if (errorContext.getException() instanceof ServiceBusException) {
        ServiceBusException exception = (ServiceBusException) errorContext.getException();

        System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
            exception.getReason());
    } else {
        System.out.printf("Error occurred: %s%n", errorContext.getException());
    }
};

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete()' will opt in to manual settlement (e.g. complete, abandon).
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .credential(fullyQualifiedNamespace, tokenCredential)
    .processor()
    .queueName(queueName)
    .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background. Control returns immediately.
processorClient.start();

// Stop processor and dispose when done processing messages.
processorClient.stop();
processorClient.close();

Create and run a session-enabled processor

// Function that gets called whenever a message is received.
Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
    ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
        message.getSessionId(), message.getSequenceNumber(), message.getBody());
};

Consumer<ServiceBusErrorContext> onError = context -> {
    System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
        context.getFullyQualifiedNamespace(), context.getEntityPath());

    if (context.getException() instanceof ServiceBusException) {
        ServiceBusException exception = (ServiceBusException) context.getException();

        System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
            exception.getReason());
    } else {
        System.out.printf("Error occurred: %s%n", context.getException());
    }
};

TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

// Create the processor client via the builder and its sub-builder
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
    .credential(fullyQualifiedNamespace, tokenCredential)
    .sessionProcessor()
    .queueName(sessionEnabledQueueName)
    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
    .disableAutoComplete()
    .maxConcurrentSessions(2)
    .processMessage(onMessage)
    .processError(onError)
    .buildProcessorClient();

// Starts the processor in the background. Control returns immediately.
sessionProcessor.start();

// Stop processor and dispose when done processing messages.
sessionProcessor.stop();
sessionProcessor.close();
See Also:
  • Method Details

    • start

      public void start()
      Starts the processor in the background. When this method is called, the processor will initiate a message receiver that will invoke the message handler when new messages are available. This method is idempotent (ie. calling start() again after the processor is already running is a no-op).

      Calling start() after calling stop() will resume processing messages using the same underlying connection.

      Calling start() after calling close() will start the processor with a new connection.

    • stop

      public void stop()
      Stops the message processing for this processor. The receiving links and sessions are kept active and this processor can resume processing messages by calling start() again.
    • close

      public void close()
      Stops message processing and closes the processor. The receiving links and sessions are closed and calling start() will create a new processing cycle with new links and new sessions.
      Specified by:
      close in interface AutoCloseable
    • isRunning

      public boolean isRunning()
      Returns true if the processor is running. If the processor is stopped or closed, this method returns false.
      Returns:
      true if the processor is running; false otherwise.
    • getQueueName

      public String getQueueName()
      Returns the queue name associated with this instance of ServiceBusProcessorClient.
      Returns:
      the queue name associated with this instance of ServiceBusProcessorClient or null if the processor instance is for a topic and subscription.
    • getTopicName

      public String getTopicName()
      Returns the topic name associated with this instance of ServiceBusProcessorClient.
      Returns:
      the topic name associated with this instance of ServiceBusProcessorClient or null if the processor instance is for a queue.
    • getSubscriptionName

      public String getSubscriptionName()
      Returns the subscription name associated with this instance of ServiceBusProcessorClient.
      Returns:
      the subscription name associated with this instance of ServiceBusProcessorClient or null if the processor instance is for a queue.
    • getIdentifier

      public String getIdentifier()
      Gets the identifier of the instance of ServiceBusProcessorClient.
      Returns:
      The identifier that can identify the instance of ServiceBusProcessorClient.