Source code for slixmpp.plugins.xep_0490.mds

from asyncio import Future

from slixmpp import Iq
from slixmpp.plugins import BasePlugin
from slixmpp.types import JidStr

from . import stanza


[docs] class XEP_0490(BasePlugin): """ XEP-0490: Message Displayed Synchronization """ name = "xep_0490" description = "XEP-0490: Message Displayed Synchronization" dependencies = {"xep_0060", "xep_0163", "xep_0223", "xep_0359"} stanza = stanza def plugin_init(self): stanza.register_plugin() self.xmpp.plugin["xep_0163"].register_pep( "message_displayed_synchronization", stanza.Displayed, ) self.xmpp.plugin["xep_0223"].node_profiles[self.stanza.NS] = { "pubsub#max_items": "max", "pubsub#send_last_published_item": "never", }
[docs] def flag_chat(self, chat: JidStr, stanza_id: str, **kwargs) -> Future[Iq]: """ Flag a chat as displayed. :param chat: JID of the chat to set as displayed. :param stanza_id: stanza-id of which to set the display marker. """ displayed = stanza.Displayed() displayed["stanza_id"]["id"] = stanza_id return self.xmpp.plugin["xep_0223"].store( displayed, node=stanza.NS, id=str(chat), **kwargs )
[docs] def catch_up(self, **kwargs) -> Future[Iq]: """ Get all displayed status. Can take any keyword parameters from XEP-0060’s ``get_items()``. """ return self.xmpp.plugin["xep_0060"].get_items( self.xmpp.boundjid.bare, stanza.NS, **kwargs )