Class ServiceBusSessionReceiverClient
- All Implemented Interfaces:
AutoCloseable
ServiceBusReceiverClient instances that are tied to the locked sessions. Sessions can be used as a first in
first out (FIFO) processing of messages. Queues and topics/subscriptions support Service Bus sessions, however, it
must be enabled at the time
of entity creation.
The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".
Sample: Receive messages from a specific session
Use acceptSession(String) to acquire the lock of a session if you know the session id.
ServiceBusReceiveMode.PEEK_LOCK is strongly recommended so users have control over message
settlement.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.queueName(sessionEnabledQueueName)
.disableAutoComplete()
.buildClient();
ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<<my-session-id>>");
// Keep fetching messages from the session until there are no more messages.
// The receiveMessage operation returns when either 10 messages have been receiver or, 30 seconds have elapsed.
boolean hasMoreMessages = true;
while (hasMoreMessages) {
IterableStream<ServiceBusReceivedMessage> messages =
receiver.receiveMessages(10, Duration.ofSeconds(30));
Iterator<ServiceBusReceivedMessage> iterator = messages.iterator();
hasMoreMessages = iterator.hasNext();
while (iterator.hasNext()) {
ServiceBusReceivedMessage message = iterator.next();
System.out.printf("Session Id: %s. Contents: %s%n.", message.getSessionId(), message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
receiver.complete(message);
} else {
receiver.abandon(message);
}
}
}
// Use the receiver and finally close it along with the sessionReceiver.
receiver.close();
sessionReceiver.close();
Sample: Receive messages from the first available session
Use acceptNextSession() to acquire the lock of the next available session without specifying the session
id. ServiceBusReceiveMode.PEEK_LOCK is strongly recommended so users have control over
message settlement.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// throws a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession();
// Use the receiver and finally close it along with the sessionReceiver.
try {
// Keep fetching messages from the session until there are no more messages.
// The receiveMessage operation returns when either 10 messages have been receiver or, 30 seconds have elapsed.
boolean hasMoreMessages = true;
while (hasMoreMessages) {
IterableStream<ServiceBusReceivedMessage> messages =
receiver.receiveMessages(10, Duration.ofSeconds(30));
Iterator<ServiceBusReceivedMessage> iterator = messages.iterator();
hasMoreMessages = iterator.hasNext();
while (iterator.hasNext()) {
ServiceBusReceivedMessage message = iterator.next();
System.out.printf("Session Id: %s. Message: %s%n.", message.getSessionId(), message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
receiver.complete(message);
} else {
receiver.abandon(message);
}
}
}
} finally {
receiver.close();
sessionReceiver.close();
}
-
Method Summary
Modifier and TypeMethodDescriptionAcquires a session lock for the next available session and creates aServiceBusReceiverClientto receive messages from the session.acceptSession(String sessionId) Acquires a session lock forsessionIdand create aServiceBusReceiverClientto receive messages from the session.voidclose()
-
Method Details
-
acceptNextSession
Acquires a session lock for the next available session and creates aServiceBusReceiverClientto receive messages from the session. It will wait until a session is available if no one is available immediately.- Returns:
- A
ServiceBusReceiverClientthat is tied to the available session. - Throws:
UnsupportedOperationException- if the queue or topic subscription is not session-enabled.com.azure.core.amqp.exception.AmqpException- if the operation times out. The timeout duration is the tryTimeout of when you build this client with theServiceBusClientBuilder.retryOptions(AmqpRetryOptions).
-
acceptSession
Acquires a session lock forsessionIdand create aServiceBusReceiverClientto receive messages from the session. If the session is already locked by another client, anAmqpExceptionis thrown immediately.- Parameters:
sessionId- The session id.- Returns:
- A
ServiceBusReceiverClientthat is tied to the specified session. - Throws:
NullPointerException- ifsessionIdis null.IllegalArgumentException- ifsessionIdis empty.UnsupportedOperationException- if the queue or topic subscription is not session-enabled.ServiceBusException- if the lock cannot be acquired.com.azure.core.amqp.exception.AmqpException- if the operation times out. The timeout duration is the tryTimeout of when you build this client with theServiceBusClientBuilder.retryOptions(AmqpRetryOptions).
-
close
public void close()- Specified by:
closein interfaceAutoCloseable
-