# Slixmpp: The Slick XMPP Library
# Copyright (C) 2025 Mathieu Pasquet
# This file is part of Slixmpp.
# See the file LICENSE for copying permission
import logging
from asyncio import Event, Task, FIRST_COMPLETED, wait, CancelledError
from enum import Enum
from slixmpp.stanza import Message, Presence
from slixmpp.exceptions import IqError, IqTimeout
from slixmpp.jid import JID
from slixmpp.plugins import BasePlugin
from slixmpp.types import ErrorConditions
log = logging.getLogger(__name__)
[docs]
class PingStatus(Enum):
"""
Enum representing the status of a ping.
- `UNTRIED`: the ping was not sent yet
- `JOINED`: we were considered still in the MUC at the last ping
- `JOINED_NICKCHANGED`: we were joined but another client just changed
the nick.
- `DISCONNECTED`: we got disconnected
- `TIMEOUT`: slixmpp was unable to reach the server
"""
UNTRIED = 0
JOINED = 1
JOINED_NICKCHANGED = 1
DISCONNECTED = 2
TIMEOUT = 3
class PingTask:
"""
Class representing a current muc self-ping task.
"""
_event: Event
_current_task: Task
timeout: float
interval: float
_plugin: 'XEP_0410'
def __init__(self, muc_resource: JID, orig_jid: JID,
plugin: 'XEP_0410', interval: float, timeout: float) -> None:
self._event = Event()
self._plugin = plugin
self.interval = interval
self.timeout = timeout
self._current_task = plugin.xmpp.loop.create_task(self.run(
muc_resource,
orig_jid,
))
def reset_timer(self) -> None:
"""
Triggers the internal event to reset the internal timer.
"""
self._event.set()
def cancel(self) -> None:
"""Cancel the task before dying"""
self._current_task.cancel()
async def run(self, muc_resource: JID, orig_jid: JID) -> None:
"""
Loop that goes on forever and sends pings at the desired interval.
"""
while True:
# Wait on the event or timeout, whichever comes first
try:
done, pending = await wait(
[self._plugin.xmpp.loop.create_task(self._event.wait())],
return_when=FIRST_COMPLETED,
timeout=self.interval,
)
# If the event is set, then the timer was reset and we clear it
# before going back to waiting
if self._event.is_set():
self._event.clear()
continue
result = await self._plugin.send_self_ping(
muc_resource,
orig_jid,
timeout=self.timeout,
)
key = (muc_resource, orig_jid)
self._plugin._update_ping_results(key, result)
except CancelledError:
return
except:
log.exception("Error while trying to ping a muc resource")
return
[docs]
class XEP_0410(BasePlugin):
"""
XEP-0410: MUC Self-Ping (Schrödinger's Chat)
This plugin provides features for clients (or non-MUC components) to
enable or disable self-ping.
Configuration options:
- `ping_interval`: the number of seconds between the last activity and a ping.
Ping status is represented using the :class:`~slixmpp.plugins.XEP_0410.PingStatus` class.
Users of this plugin can either schedule the pings manually and send them
using `send_self_ping`, or they can use the provided scheduler with
`enable_self_ping` and `disable_self_ping`.
Doing so requires calling `update_nick` if your nickname in a MUC changes,
because the plugin is not aware of this. The scheduled ping will trigger
a `muc_ping_changed` event in case the ping status changes, which should
be handled properly.
"""
name = 'xep_0410'
description = 'XEP-0410: MUC Self-Ping (Schrödinger\'s Chat)'
dependencies = {'xep_0045', 'xep_0199'}
default_config = {
"ping_interval": 900,
"ping_timeout": 30,
}
ping_interval: int | float
# Cache of the last bound JID, to be able to recover if we bind to
# another resource while running
boundjid: JID | None = None
# Dictionary mapping a (muc resource, from jid) to an asyncio task in
# the process of being executed
ping_timers: dict[tuple[JID, JID], PingTask]
# Cache of the latest ping results
last_ping_results: dict[tuple[JID, JID], PingStatus]
def plugin_init(self):
self.ping_timers = dict()
self.boundjid = None
self.last_ping_results = dict()
self.xmpp.add_event_handler(
'groupchat_message',
self._on_muc_activity
)
self.xmpp.add_event_handler(
'groupchat_presence',
self._on_muc_activity,
)
def plugin_end(self):
self.ping_timers = dict()
self.last_ping_results = dict()
def session_bind(self, jid: JID):
self.__update_boundjid(jid)
def __update_boundjid(self, jid: JID):
"""
If the bound JID has changed, we need to update the timers so that
the ping stanzas have the correct from value.
This is a bit painful.
"""
new_boundjid = jid
if self.boundjid and self.boundjid != new_boundjid:
self.ping_results = dict()
for key, timer in list(self.ping_timers.items()):
if key[1] == self.boundjid:
del self.ping_timers[key]
timer.cancel()
key = (key[0], new_boundjid)
self.ping_timers[key] = PingTask(
key[0],
key[1],
self,
timeout=timer.timeout,
interval=timer.interval,
)
self.boundjid = jid
[docs]
def update_nick(self, previous_jid: JID, new_jid: JID) -> None:
"""
Update the self-ping targets when a nickname changes.
:param previous_jid: full JID of the previous MUC resource.
:param new_jid: full JID of the new MUC resource.
"""
for key, timer in list(self.ping_timers.items()):
if key[0] == previous_jid:
del self.ping_timers[key]
timer.cancel()
key = (new_jid, key[1])
self.ping_timers[key] = PingTask(
key[0],
key[1],
self,
timeout=timer.timeout,
interval=timer.interval,
)
[docs]
def enable_self_ping(self, muc_resource: JID,
orig_jid: JID | None = None,
interval: float | None = None,
timeout: float | None = None) -> None:
"""
Enable client self-ping.
The given MUC resource will be pinged periodically if the MUC is inactive,
and an event will be generated in case of changes.
:param muc_resource: The MUC resource to send the ping to.
:param orig_jid: The "from" resource to send the ping from (for components).
"""
if orig_jid is None:
orig_jid = self.xmpp.boundjid
if timeout is None:
timeout = self.ping_timeout
if interval is None:
interval = self.ping_interval
key = (muc_resource, orig_jid)
if key not in self.ping_timers:
self.ping_timers[key] = PingTask(
muc_resource=muc_resource,
orig_jid=orig_jid,
plugin=self,
timeout=timeout,
interval=interval,
)
[docs]
def disable_self_ping(self, muc_resource: JID,
orig_jid: JID | None = None) -> None:
"""
Disable client self-ping. Cancels the scheduled pings for the given
MUC resource.
:param muc_resource: The MUC resource to send the ping to.
:param orig_jid: The "from" resource the ping was sent from (for components).
"""
if orig_jid is None:
orig_jid = self.xmpp.boundjid
key = (muc_resource, orig_jid)
if key in self.ping_timers:
self.ping_timers[key].cancel()
del self.ping_timers[key]
if key in self.last_ping_results:
del self.last_ping_results[key]
[docs]
def get_ping_status(self, muc_resource: JID,
orig_jid: JID | None = None) -> PingStatus:
"""
Return the last pinged status for a specific muc resource.
:param muc_resource: The MUC resource to send the ping to.
:param orig_jid: The "from" resource the ping was sent from (for components).
:return: The status, or None if no ping has been sent yet.
"""
if orig_jid is None:
orig_jid = self.xmpp.boundjid
key = (muc_resource, orig_jid)
return self.last_ping_results.get(key, PingStatus.UNTRIED)
def _on_muc_activity(self, event: Presence | Message):
"""Handle both messages and presences from mucs to see if we need to
reset the timer"""
if event['type'] == 'error':
return
key = (event['from'], event['to'])
if key in self.ping_timers:
self.ping_timers[key].reset_timer()
def _handle_condition(self, condition: ErrorConditions) -> PingStatus:
"""
Interpret the error conditions as defined in XEP-0410.
"""
if condition in ('service-unavailable', 'feature-not-implemented'):
return PingStatus.JOINED
elif condition == 'item-not-found':
return PingStatus.JOINED_NICKCHANGED
elif condition in ('remote-server-not-found',
'remote-server-timeout'):
return PingStatus.TIMEOUT
else:
return PingStatus.DISCONNECTED
[docs]
async def send_self_ping(self, muc_resource: JID,
orig_jid: JID | None = None,
timeout: float | None = None) -> PingStatus:
"""
Send a single self-ping to a MUC, and return the result.
:param muc_resource: The MUC resource to send the ping to.
:param orig_jid: The "from" resource the ping was sent from (for components).
:return: The ping status
"""
if orig_jid is None:
orig_jid = self.xmpp.boundjid
result = PingStatus.UNTRIED
try:
await self.xmpp.plugin['xep_0199'].send_ping(
muc_resource,
ifrom=orig_jid,
timeout=timeout,
)
result = PingStatus.JOINED
except IqTimeout:
result = PingStatus.TIMEOUT
except IqError as exc:
result = self._handle_condition(exc.condition)
return result
def _update_ping_results(self, key: tuple[JID, JID], result: PingStatus) -> None:
"""
Internal use only: used to update the ping results dict from the timer.
"""
previous = self.last_ping_results.get(key, PingStatus.UNTRIED)
self.last_ping_results[key] = result
if result != previous:
self.xmpp.event('muc_ping_changed', {
"key": key,
"previous": previous,
"result": result
})