Add optional message limit for receive command

This commit is contained in:
AsamK 2022-10-31 11:17:52 +01:00
parent 5ed9db4f08
commit de2bfc7f79
6 changed files with 59 additions and 43 deletions

View File

@ -202,12 +202,9 @@ public interface Manager extends Closeable {
/** /**
* Receive new messages from server, returns if no new message arrive in a timespan of timeout. * Receive new messages from server, returns if no new message arrive in a timespan of timeout.
*/ */
void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException; public void receiveMessages(
Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
/** ) throws IOException;
* Receive new messages from server, returns only if the thread is interrupted.
*/
void receiveMessages(ReceiveMessageHandler handler) throws IOException;
void setReceiveConfig(ReceiveConfig receiveConfig); void setReceiveConfig(ReceiveConfig receiveConfig);

View File

@ -961,17 +961,16 @@ class ManagerImpl implements Manager {
} }
@Override @Override
public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException { public void receiveMessages(
receiveMessages(timeout, true, handler); Optional<Duration> timeout,
} Optional<Integer> maxMessages,
ReceiveMessageHandler handler
@Override ) throws IOException {
public void receiveMessages(ReceiveMessageHandler handler) throws IOException { receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler);
receiveMessages(Duration.ofMinutes(1), false, handler);
} }
private void receiveMessages( private void receiveMessages(
Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
if (isReceiving()) { if (isReceiving()) {
throw new IllegalStateException("Already receiving message."); throw new IllegalStateException("Already receiving message.");
@ -979,7 +978,7 @@ class ManagerImpl implements Manager {
isReceivingSynchronous = true; isReceivingSynchronous = true;
receiveThread = Thread.currentThread(); receiveThread = Thread.currentThread();
try { try {
context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler); context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, handler);
} finally { } finally {
receiveThread = null; receiveThread = null;
isReceivingSynchronous = false; isReceivingSynchronous = false;

View File

@ -80,7 +80,7 @@ public class ReceiveHelper {
public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) { public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
while (!shouldStop) { while (!shouldStop) {
try { try {
receiveMessages(Duration.ofMinutes(1), false, handler); receiveMessages(Duration.ofMinutes(1), false, null, handler);
break; break;
} catch (IOException e) { } catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e); logger.warn("Receiving messages failed, retrying", e);
@ -89,7 +89,7 @@ public class ReceiveHelper {
} }
public void receiveMessages( public void receiveMessages(
Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
needsToRetryFailedMessages = true; needsToRetryFailedMessages = true;
hasCaughtUpWithOldMessages = false; hasCaughtUpWithOldMessages = false;
@ -107,7 +107,7 @@ public class ReceiveHelper {
signalWebSocket.connect(); signalWebSocket.connect();
try { try {
receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, handler, queuedActions); receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
} finally { } finally {
hasCaughtUpWithOldMessages = false; hasCaughtUpWithOldMessages = false;
handleQueuedActions(queuedActions.keySet()); handleQueuedActions(queuedActions.keySet());
@ -122,13 +122,15 @@ public class ReceiveHelper {
final SignalWebSocket signalWebSocket, final SignalWebSocket signalWebSocket,
Duration timeout, Duration timeout,
boolean returnOnTimeout, boolean returnOnTimeout,
Integer maxMessages,
Manager.ReceiveMessageHandler handler, Manager.ReceiveMessageHandler handler,
final Map<HandleAction, HandleAction> queuedActions final Map<HandleAction, HandleAction> queuedActions
) throws IOException { ) throws IOException {
int remainingMessages = maxMessages == null ? -1 : maxMessages;
var backOffCounter = 0; var backOffCounter = 0;
isWaitingForMessage = false; isWaitingForMessage = false;
while (!shouldStop) { while (!shouldStop && remainingMessages != 0) {
if (needsToRetryFailedMessages) { if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler); retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false; needsToRetryFailedMessages = false;
@ -154,6 +156,9 @@ public class ReceiveHelper {
backOffCounter = 0; backOffCounter = 0;
if (result.isPresent()) { if (result.isPresent()) {
if (remainingMessages > 0) {
remainingMessages -= 1;
}
envelope = result.get(); envelope = result.get();
logger.debug("New message received from server"); logger.debug("New message received from server");
} else { } else {

View File

@ -372,6 +372,9 @@ In json mode this is outputted as one json object per line.
Number of seconds to wait for new messages (negative values disable timeout). Number of seconds to wait for new messages (negative values disable timeout).
Default is 5 seconds. Default is 5 seconds.
*--max-messages*::
Maximum number of messages to receive, before returning.
*--ignore-attachments*:: *--ignore-attachments*::
Dont download attachments of received messages. Dont download attachments of received messages.

View File

@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Optional;
public class ReceiveCommand implements LocalCommand { public class ReceiveCommand implements LocalCommand {
@ -37,6 +38,10 @@ public class ReceiveCommand implements LocalCommand {
.type(double.class) .type(double.class)
.setDefault(3.0) .setDefault(3.0)
.help("Number of seconds to wait for new messages (negative values disable timeout)"); .help("Number of seconds to wait for new messages (negative values disable timeout)");
subparser.addArgument("--max-messages")
.type(int.class)
.setDefault(-1)
.help("Maximum number of messages to receive, before returning.");
subparser.addArgument("--ignore-attachments") subparser.addArgument("--ignore-attachments")
.help("Dont download attachments of received messages.") .help("Dont download attachments of received messages.")
.action(Arguments.storeTrue()); .action(Arguments.storeTrue());
@ -58,6 +63,7 @@ public class ReceiveCommand implements LocalCommand {
final Namespace ns, final Manager m, final OutputWriter outputWriter final Namespace ns, final Manager m, final OutputWriter outputWriter
) throws CommandException { ) throws CommandException {
final var timeout = ns.getDouble("timeout"); final var timeout = ns.getDouble("timeout");
final var maxMessagesRaw = ns.getInt("max-messages");
final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories")); final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories"));
final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts")); final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts"));
@ -65,11 +71,9 @@ public class ReceiveCommand implements LocalCommand {
try { try {
final var handler = outputWriter instanceof JsonWriter ? new JsonReceiveMessageHandler(m, final var handler = outputWriter instanceof JsonWriter ? new JsonReceiveMessageHandler(m,
(JsonWriter) outputWriter) : new ReceiveMessageHandler(m, (PlainTextWriter) outputWriter); (JsonWriter) outputWriter) : new ReceiveMessageHandler(m, (PlainTextWriter) outputWriter);
if (timeout < 0) { final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000));
m.receiveMessages(handler); final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw;
} else { m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler);
m.receiveMessages(Duration.ofMillis((long) (timeout * 1000)), handler);
}
} catch (IOException e) { } catch (IOException e) {
throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e); throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e);
} }

View File

@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -497,32 +498,30 @@ public class DbusManagerImpl implements Manager {
} }
} }
@Override
public void receiveMessages(final ReceiveMessageHandler handler) throws IOException {
addReceiveHandler(handler);
try {
synchronized (this) {
this.wait();
}
} catch (InterruptedException ignored) {
}
removeReceiveHandler(handler);
}
@Override @Override
public void receiveMessages( public void receiveMessages(
final Duration timeout, final ReceiveMessageHandler handler Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1));
final var lastMessage = new AtomicLong(System.currentTimeMillis()); final var lastMessage = new AtomicLong(System.currentTimeMillis());
final var thread = Thread.currentThread();
final ReceiveMessageHandler receiveHandler = (envelope, e) -> { final ReceiveMessageHandler receiveHandler = (envelope, e) -> {
lastMessage.set(System.currentTimeMillis()); lastMessage.set(System.currentTimeMillis());
handler.handleMessage(envelope, e); handler.handleMessage(envelope, e);
if (remainingMessages.get() > 0) {
if (remainingMessages.decrementAndGet() <= 0) {
remainingMessages.set(0);
thread.interrupt();
}
}
}; };
addReceiveHandler(receiveHandler); addReceiveHandler(receiveHandler);
while (true) { if (timeout.isPresent()) {
while (remainingMessages.get() != 0) {
try { try {
final var sleepTimeRemaining = timeout.toMillis() - (System.currentTimeMillis() - lastMessage.get()); final var passedTime = System.currentTimeMillis() - lastMessage.get();
final var sleepTimeRemaining = timeout.get().toMillis() - passedTime;
if (sleepTimeRemaining < 0) { if (sleepTimeRemaining < 0) {
break; break;
} }
@ -530,6 +529,15 @@ public class DbusManagerImpl implements Manager {
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
} }
} else {
try {
synchronized (this) {
this.wait();
}
} catch (InterruptedException ignored) {
}
}
removeReceiveHandler(receiveHandler); removeReceiveHandler(receiveHandler);
} }