Package com.azure.messaging.servicebus


package com.azure.messaging.servicebus

The Azure Service Bus client library allows Java developers to interact with Azure Service Bus entities by publishing to and/or subscribing from queues and topics/subscriptions. Microsoft Azure Service Bus is a fully managed enterprise integration message broker. Service Bus can decouple applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data and state. Data is transferred between different applications and services using messages.

Key Concepts

  • Queue : Allows for the sending and receiving of messages, ordered first-in-first-out(FIFO). It is often used for point to point communication.
  • Topic : Allows for sending messages to multiple receivers, simultaneously. This is suited for publisher and subscriber scenarios.
  • Subscription: Receives messages from a topic. Each subscription is independent and receives a copy of every message sent to the topic. Each subscription has a filter. Filters, also known as rules, are applied to each message to determine whether they will be published to the subscription.

Getting Started

Service clients are the point of interaction for developers to use Azure Event Hubs. ServiceBusSenderClient and ServiceBusSenderAsyncClient are the sync and async clients for publishing messages to a Service Bus queue or topic. Similarly, ServiceBusReceiverClient and ServiceBusReceiverAsyncClient are the sync and async clients for consuming messages from a Service Bus queue or topic. In production scenarios, we recommend customers leverage ServiceBusProcessorClient for consuming messages because recovers from transient failures.

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation.

Publishing Service Bus messages

This library provides two sender clients to publish messages to Azure Service Bus. The async client, ServiceBusSenderAsyncClient and its sync version, ServiceBusSenderClient. The samples below demonstrate basic scenarios, additional snippets can be found in the class documentation for ServiceBusClientBuilder and any of the clients.

Sample: Construct a synchronous sender and send messages

The following code sample demonstrates the creation and use of the synchronous client ServiceBusSenderClient to send messages to a queue. When performance is important, consider using ServiceBusMessageBatch to publish multiple messages at once.

 TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusSenderClient sender = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sender()
     .queueName(queueName)
     .buildClient();

 List<ServiceBusMessage> messages = Arrays.asList(
     new ServiceBusMessage("test-1"),
     new ServiceBusMessage("test-2"));

 // Creating a batch without options set.
 ServiceBusMessageBatch batch = sender.createMessageBatch();
 for (ServiceBusMessage message : messages) {
     if (batch.tryAddMessage(message)) {
         continue;
     }

     // The batch is full. Send the current batch and create a new one.
     sender.sendMessages(batch);

     batch = sender.createMessageBatch();

     // Add the message we couldn't before.
     if (!batch.tryAddMessage(message)) {
         throw new IllegalArgumentException("Message is too large for an empty batch.");
     }
 }

 // Send the final batch if there are any messages in it.
 if (batch.getCount() > 0) {
     sender.sendMessages(batch);
 }

 // Continue using the sender and finally, dispose of the sender.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 sender.close();
 

Receiving Service Bus messages

This library provides several clients to receive messages from Azure Service Bus. The async client, ServiceBusReceiverAsyncClient and its sync version, ServiceBusReceiverClient. For session-enabled entities, there is ServiceBusSessionReceiverAsyncClient and ServiceBusSessionReceiverClient. In production scenarios, ServiceBusProcessorClient is recommended because it recovers from transient errors such as temporary network failures.

The samples below demonstrate basic scenarios, additional snippets can be found in the class documentation.

Sample: Create a ServiceBusProcessorClient and receive messages

The following code sample demonstrates the creation and use of the synchronous client ServiceBusProcessorClient to receive messages from a Service Bus queue. By default, messages are received using ServiceBusReceiveMode.PEEK_LOCK and customers must settle their messages using one of the settlement methods on the receiver client. " "Settling receive operations" provides additional information about message settlement. ServiceBusProcessorClient continues fetching messages from the queue until the processor is stopped. If it encounters a transient error, it will try to recover, then continue processing messages.

 // Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
     final ServiceBusReceivedMessage message = context.getMessage();
     // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
     // handling message reaches desired state such that it doesn't require Service Bus to redeliver
     // the same message, then context.complete() should be called otherwise context.abandon().
     final boolean success = Math.random() < 0.5;
     if (success) {
         try {
             context.complete();
         } catch (RuntimeException error) {
             System.out.printf("Completion of the message %s failed.%n Error: %s%n",
                 message.getMessageId(), error);
         }
     } else {
         try {
             context.abandon();
         } catch (RuntimeException error) {
             System.out.printf("Abandoning of the message %s failed.%nError: %s%n",
                 message.getMessageId(), error);
         }
     }
 };

 // Sample code that gets called if there's an error
 Consumer<ServiceBusErrorContext> processError = errorContext -> {
     if (errorContext.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) errorContext.getException();

         System.out.printf("Error source: %s, reason %s%n", errorContext.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", errorContext.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .processor()
     .queueName(queueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()  // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
     .processMessage(processMessage)
     .processError(processError)
     .disableAutoComplete()
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 processorClient.start();

 // Stop processor and dispose when done processing messages.
 processorClient.stop();
 processorClient.close();
 

Sample: Create a receiver and receive messages

The following code sample demonstrates the creation and use of the synchronous client ServiceBusReceiverClient to receive messages from a Service Bus subscription. The receive operation returns when either 10 messages are received or 30 seconds has elapsed. By default, messages are received using ServiceBusReceiveMode.PEEK_LOCK and customers must settle their messages using one of the settlement methods on the receiver client. " "Settling receive operations" provides additional information about message settlement.

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .receiver()
     .topicName(topicName)
     .subscriptionName(subscriptionName)
     .buildClient();

 // Receives a batch of messages when 10 messages are received or until 30 seconds have elapsed, whichever
 // happens first.
 IterableStream<ServiceBusReceivedMessage> messages = receiver.receiveMessages(10, Duration.ofSeconds(30));
 messages.forEach(message -> {
     System.out.printf("Id: %s. Contents: %s%n", message.getMessageId(), message.getBody());

     // If able to process message, complete it. Otherwise, abandon it and allow it to be
     // redelivered.
     if (isMessageProcessed) {
         receiver.complete(message);
     } else {
         receiver.abandon(message);
     }
 });

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 receiver.close();