diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java index e3d6ce3d4..f464af894 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/handler/MQTTSessionHandler.java @@ -1063,7 +1063,6 @@ protected final void sendQoS0SubMessage(RoutedMessage msg) { write(pubMsg).addListener(f -> { memUsage.addAndGet(-msgSize); if (f.isSuccess()) { - lastActiveAtNanos = sessionCtx.nanoTime(); if (settings.debugMode) { eventCollector.report(getLocal(QoS0Pushed.class) .isRetain(msg.isRetain()) diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java index 756450015..552adc4bf 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java @@ -24,6 +24,7 @@ import static org.apache.bifromq.mqtt.handler.MQTTSessionIdUtil.userSessionId; import static org.apache.bifromq.plugin.eventcollector.EventType.DISCARD; import static org.apache.bifromq.plugin.eventcollector.EventType.EXCEED_RECEIVING_LIMIT; +import static org.apache.bifromq.plugin.eventcollector.EventType.IDLE; import static org.apache.bifromq.plugin.eventcollector.EventType.INVALID_TOPIC; import static org.apache.bifromq.plugin.eventcollector.EventType.INVALID_TOPIC_FILTER; import static org.apache.bifromq.plugin.eventcollector.EventType.MALFORMED_TOPIC; @@ -728,6 +729,35 @@ public void qos0Pub() { verifyEvent(MQTT_SESSION_START, QOS0_PUSHED, QOS0_PUSHED, QOS0_PUSHED, QOS0_PUSHED, QOS0_PUSHED); } + @Test + public void qos0PubDoesNotRefreshSessionKeepAlive() { + mockCheckPermission(true); + mockDistMatch(true); + transientSessionHandler.subscribe(System.nanoTime(), topicFilter, QoS.AT_MOST_ONCE); + channel.runPendingTasks(); + ArgumentCaptor longCaptor = ArgumentCaptor.forClass(Long.class); + verify(localDistService).match(anyLong(), eq(topicFilter), longCaptor.capture(), any()); + + testTicker.advanceTimeBy(100, TimeUnit.SECONDS); + channel.advanceTimeBy(100, TimeUnit.SECONDS); + channel.runScheduledPendingTasks(); + assertTrue(channel.isActive()); + + transientSessionHandler.publish(s2cMessageList(topic, 1, QoS.AT_MOST_ONCE), + Collections.singleton(new IMQTTTransientSession.MatchedTopicFilter(topicFilter, longCaptor.getValue()))); + channel.runPendingTasks(); + MqttPublishMessage message = channel.readOutbound(); + assertEquals(message.fixedHeader().qosLevel().value(), QoS.AT_MOST_ONCE_VALUE); + assertEquals(message.variableHeader().topicName(), topic); + + testTicker.advanceTimeBy(81, TimeUnit.SECONDS); + channel.advanceTimeBy(81, TimeUnit.SECONDS); + channel.runScheduledPendingTasks(); + + assertFalse(channel.isActive()); + verifyEvent(MQTT_SESSION_START, QOS0_PUSHED, IDLE); + } + @Test public void qos0PubExceedBufferCapacity() { mockCheckPermission(true);