From c80587f0db05083d9768c43ebba5a1779d7e710c Mon Sep 17 00:00:00 2001 From: NeonKirill <74428618+NeonKirill@users.noreply.github.com> Date: Mon, 8 Apr 2024 23:58:08 +0200 Subject: [PATCH] Fixing Chat Messages Overflow Issue (#84) * Removed dependency on chat_flow property to track messages in conversation --- chat_client/static/js/chat_utils.js | 17 +-- chat_client/static/js/db.js | 52 +++++++- chat_client/static/js/message_utils.js | 33 ++++-- chat_server/blueprints/chat.py | 53 ++++----- chat_server/server_utils/models/chats.py | 43 +++++++ chat_server/services/popularity_counter.py | 53 ++++++--- .../mongo_utils/queries/dao/abc.py | 10 ++ .../mongo_utils/queries/dao/chats.py | 19 +-- .../mongo_utils/queries/mongo_queries.py | 112 ++++++++---------- 9 files changed, 260 insertions(+), 132 deletions(-) create mode 100644 chat_server/server_utils/models/chats.py diff --git a/chat_client/static/js/chat_utils.js b/chat_client/static/js/chat_utils.js index 2c4d9cb1..d331796a 100644 --- a/chat_client/static/js/chat_utils.js +++ b/chat_client/static/js/chat_utils.js @@ -262,7 +262,7 @@ async function buildConversation(conversationData={}, skin = CONVERSATION_SKINS. const newConversationHTML = await buildConversationHTML(conversationData, skin); const conversationsBody = document.getElementById(conversationParentID); conversationsBody.insertAdjacentHTML('afterbegin', newConversationHTML); - initMessages(conversationData, skin); + await initMessages(conversationData, skin); const messageListContainer = getMessageListContainer(cid); const currentConversation = document.getElementById(cid); @@ -363,18 +363,18 @@ async function buildConversation(conversationData={}, skin = CONVERSATION_SKINS. /** * Gets conversation data based on input string * @param input: input string text - * @param firstMessageID: id of the the most recent message + * @param oldestMessageTS: creation timestamp of the oldest displayed message * @param skin: resolves by server for which data to return * @param maxResults: max number of messages to fetch * @param alertParent: parent of error alert (optional) * @returns {Promise<{}>} promise resolving conversation data returned */ -async function getConversationDataByInput(input="", skin=CONVERSATION_SKINS.BASE, firstMessageID=null, maxResults=20, alertParent=null){ +async function getConversationDataByInput(input="", skin=CONVERSATION_SKINS.BASE, oldestMessageTS=null, maxResults=20, alertParent=null){ let conversationData = {}; - if(input && typeof input === "string"){ - let query_url = `chat_api/search/${input}?limit_chat_history=${maxResults}&skin=${skin}`; - if(firstMessageID){ - query_url += `&first_message_id=${firstMessageID}`; + if(input){ + let query_url = `chat_api/search/${input.toString()}?limit_chat_history=${maxResults}&skin=${skin}`; + if(oldestMessageTS){ + query_url += `&creation_time_from=${oldestMessageTS}`; } await fetchServer(query_url) .then(response => { @@ -443,7 +443,8 @@ async function addNewCID(cid, skin){ * @param cid: conversation id to remove */ async function removeConversation(cid){ - return await getChatAlignmentTable().where({cid: cid}).delete(); + return await Promise.all([DBGateway.getInstance(DB_TABLES.CHAT_ALIGNMENT).deleteItem(cid), + DBGateway.getInstance(DB_TABLES.CHAT_MESSAGES_PAGINATION).deleteItem(cid)]); } /** diff --git a/chat_client/static/js/db.js b/chat_client/static/js/db.js index bb654592..e60adbd3 100644 --- a/chat_client/static/js/db.js +++ b/chat_client/static/js/db.js @@ -3,12 +3,14 @@ const DATABASES = { } const DB_TABLES = { CHAT_ALIGNMENT: 'chat_alignment', - MINIFY_SETTINGS: 'minify_settings' + MINIFY_SETTINGS: 'minify_settings', + CHAT_MESSAGES_PAGINATION: 'chat_messages_pagination' } const __db_instances = {} const __db_definitions = { - "chats": { - "chat_alignment": `cid, added_on, skin` + [DATABASES.CHATS]: { + [DB_TABLES.CHAT_ALIGNMENT]: `cid, added_on, skin`, + [DB_TABLES.CHAT_MESSAGES_PAGINATION]: `cid, oldest_created_on` } } @@ -30,4 +32,46 @@ const getDb = (db, table) => { _instance = __db_instances[db]; } return _instance[table]; -} \ No newline at end of file +} + + +class DBGateway { + constructor(db, table) { + this.db = db; + this.table = table; + + this._db_instance = getDb(this.db, this.table); + this._db_columns_definitions = __db_definitions[this.db][this.table] + this._db_key = this._db_columns_definitions.split(',')[0] + } + + async getItem(key = "") { + return await this._db_instance.where( {[this._db_key]: key} ).first(); + } + + async listItems(orderBy="") { + let expression = this._db_instance; + if (orderBy !== ""){ + expression = expression.orderBy(orderBy) + } + return await expression.toArray(); + } + + async putItem(data = {}){ + return await this._db_instance.put(data, [data[this._db_key]]) + } + + updateItem(data = {}) { + const key = data[this._db_key] + delete data[this._db_key] + return this._db_instance.update(key, data); + } + + async deleteItem(key = "") { + return await this._db_instance.where({[this._db_key]: key}).delete(); + } + + static getInstance(table){ + return new DBGateway(DATABASES.CHATS, table); + } +} diff --git a/chat_client/static/js/message_utils.js b/chat_client/static/js/message_utils.js index c80774ab..937e6581 100644 --- a/chat_client/static/js/message_utils.js +++ b/chat_client/static/js/message_utils.js @@ -165,10 +165,10 @@ async function addOldMessages(cid, skin=CONVERSATION_SKINS.BASE) { if (messageContainer.children.length > 0) { for (let i = 0; i < messageContainer.children.length; i++) { const firstMessageItem = messageContainer.children[i]; - const firstMessageID = getFirstMessageFromCID( firstMessageItem ); - if (firstMessageID) { + const oldestMessageTS = await DBGateway.getInstance(DB_TABLES.CHAT_MESSAGES_PAGINATION).getItem(cid).then(res=> res?.oldest_created_on || null); + if (oldestMessageTS) { const numMessages = await getCurrentSkin(cid) === CONVERSATION_SKINS.PROMPTS? 50: 20; - await getConversationDataByInput( cid, skin, firstMessageID, numMessages, null ).then( async conversationData => { + await getConversationDataByInput( cid, skin, oldestMessageTS, numMessages, null ).then( async conversationData => { if (messageContainer) { const userMessageList = getUserMessages( conversationData, null ); userMessageList.sort( (a, b) => { @@ -183,7 +183,7 @@ async function addOldMessages(cid, skin=CONVERSATION_SKINS.BASE) { console.debug( `!!message_id=${message["message_id"]} is already displayed` ) } } - initMessages( conversationData, skin ); + await initMessages( conversationData, skin ); } } ).then( _ => { firstMessageItem.scrollIntoView( {behavior: "smooth"} ); @@ -293,7 +293,7 @@ function addProfileDisplay(cid, messageId, messageType='plain'){ /** * Inits addProfileDisplay() on each message of provided conversation - * @param conversationData: target conversation data + * @param conversationData - target conversation data */ function initProfileDisplay(conversationData){ getUserMessages(conversationData, null).forEach(message => { @@ -302,9 +302,25 @@ function initProfileDisplay(conversationData){ } +/** + * Inits pagination based on the oldest message creation timestamp + * @param conversationData - target conversation data + */ +async function initPagination(conversationData) { + const userMessages = getUserMessages(conversationData, null); + if (userMessages.length > 0){ + const oldestMessage = Math.min(...userMessages.map(msg => parseInt(msg.created_on))); + await DBGateway + .getInstance(DB_TABLES.CHAT_MESSAGES_PAGINATION) + .putItem({cid: conversationData['_id'], + oldest_created_on: oldestMessage}) + } +} + + /** * Initializes messages based on provided conversation aata - * @param conversationData: JS Object containing conversation data of type: + * @param conversationData - JS Object containing conversation data of type: * { * '_id': 'id of conversation', * 'conversation_name': 'title of the conversation', @@ -318,14 +334,15 @@ function initProfileDisplay(conversationData){ * 'created_on': 'creation time of the message' * }, ... (num of user messages returned)] * } - * @param skin: target conversation skin to consider + * @param skin - target conversation skin to consider */ -function initMessages(conversationData, skin = CONVERSATION_SKINS.BASE){ +async function initMessages(conversationData, skin = CONVERSATION_SKINS.BASE){ initProfileDisplay(conversationData); attachReplies(conversationData); addAttachments(conversationData); addCommunicationChannelTransformCallback(conversationData); initLoadOldMessages(conversationData, skin); + await initPagination(conversationData); } /** diff --git a/chat_server/blueprints/chat.py b/chat_server/blueprints/chat.py index ba854ee5..6d81a57b 100644 --- a/chat_server/blueprints/chat.py +++ b/chat_server/blueprints/chat.py @@ -26,17 +26,18 @@ # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import warnings -from typing import Optional from time import time -from fastapi import APIRouter, Request, Form +from fastapi import APIRouter, Request, Form, Depends from fastapi.responses import JSONResponse -from chat_server.constants.conversations import ConversationSkins from chat_server.server_utils.auth import login_required from chat_server.server_utils.conversation_utils import build_message_json +from chat_server.server_utils.dependencies import CurrentUserDependency +from chat_server.server_utils.models.chats import GetConversationModel from chat_server.services.popularity_counter import PopularityCounter from utils.common import generate_uuid +from utils.database_utils.mongo_utils import MongoFilter, MongoLogicalOperators from utils.database_utils.mongo_utils.queries.mongo_queries import fetch_message_data from utils.database_utils.mongo_utils.queries.wrapper import MongoDocumentsAPI from utils.http_utils import respond @@ -86,53 +87,51 @@ async def new_conversation( "created_on": int(time()), } MongoDocumentsAPI.CHATS.add_item(data=request_data_dict) - PopularityCounter.add_new_chat(cid=cid, name=conversation_name) + PopularityCounter.add_new_chat(cid=cid) return JSONResponse(content=request_data_dict) @router.get("/search/{search_str}") -# @login_required async def get_matching_conversation( - request: Request, - search_str: str, - chat_history_from: int = 0, - first_message_id: Optional[str] = None, - limit_chat_history: int = 100, - skin: str = ConversationSkins.BASE, + current_user: CurrentUserDependency, model: GetConversationModel = Depends() ): """ Gets conversation data matching search string - :param request: Starlette Request object - :param search_str: provided search string - :param chat_history_from: upper time bound for messages - :param first_message_id: id of the first message to start from - :param limit_chat_history: lower time bound for messages - :param skin: conversation skin type from ConversationSkins + :param current_user: current user data + :param model: request data model described in GetConversationModel :returns conversation data if found, 401 error code otherwise """ conversation_data = MongoDocumentsAPI.CHATS.get_conversation_data( - search_str=search_str + search_str=model.search_str, requested_user_id=current_user.user_id ) if not conversation_data: - return respond(f'No conversation matching = "{search_str}"', 404) + return respond(f'No conversation matching = "{model.search_str}"', 404) + + if model.creation_time_from: + query_filter = MongoFilter( + key="created_on", + logical_operator=MongoLogicalOperators.LT, + value=int(model.creation_time_from), + ) + else: + query_filter = None message_data = ( fetch_message_data( - skin=skin, + skin=model.skin, conversation_data=conversation_data, - start_idx=chat_history_from, - limit=limit_chat_history, - start_message_id=first_message_id, + limit=model.limit_chat_history, + creation_time_filter=query_filter, ) or [] ) - conversation_data["chat_flow"] = [] - for i in range(len(message_data)): - message_record = build_message_json(raw_message=message_data[i], skin=skin) - conversation_data["chat_flow"].append(message_record) + conversation_data["chat_flow"] = [ + build_message_json(raw_message=message_data[i], skin=model.skin) + for i in range(len(message_data)) + ] return conversation_data diff --git a/chat_server/server_utils/models/chats.py b/chat_server/server_utils/models/chats.py new file mode 100644 index 00000000..222455a0 --- /dev/null +++ b/chat_server/server_utils/models/chats.py @@ -0,0 +1,43 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2022 Neongecko.com Inc. +# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds, +# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo +# BSD-3 License +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from time import time + +from fastapi import Query, Path +from pydantic import BaseModel, Field + +from chat_server.constants.conversations import ConversationSkins + + +class GetConversationModel(BaseModel): + search_str: str = Field(Path(), examples=["1"]) + limit_chat_history: int = (Field(Query(default=100), examples=[100]),) + creation_time_from: str | None = Field(Query(default=None), examples=[int(time())]) + skin: str = Field( + Query(default=ConversationSkins.BASE), examples=[ConversationSkins.BASE] + ) diff --git a/chat_server/services/popularity_counter.py b/chat_server/services/popularity_counter.py index 7d8b8585..77ec66bb 100644 --- a/chat_server/services/popularity_counter.py +++ b/chat_server/services/popularity_counter.py @@ -25,6 +25,7 @@ # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from collections import Counter from dataclasses import dataclass from time import time from typing import List @@ -63,8 +64,9 @@ def get_data(cls): return cls.__DATA @classmethod - def add_new_chat(cls, cid, name, popularity: int = 0): + def add_new_chat(cls, cid, popularity: int = 0): """Adds new chat to the tracked chat popularity records""" + name = MongoDocumentsAPI.CHATS.get_item(item_id=cid).get("conversation_name") cls.__DATA.append( ChatPopularityRecord(cid=cid, name=name, popularity=popularity) ) @@ -78,29 +80,47 @@ def init_data(cls, actuality_days: int = 7): :param actuality_days: number of days for message to affect the chat popularity """ curr_time = int(time()) - chats = MongoDocumentsAPI.CHATS.list_items(include_private=False) + oldest_timestamp = curr_time - 3600 * 24 * actuality_days + chats = MongoDocumentsAPI.CHATS.list_items( + filters=[ + MongoFilter( + key="last_shout_ts", + logical_operator=MongoLogicalOperators.GTE, + value=oldest_timestamp, + ) + ], + include_private=False, + result_as_cursor=False, + ) relevant_shouts = MongoDocumentsAPI.SHOUTS.list_items( filters=[ MongoFilter( key="created_on", logical_operator=MongoLogicalOperators.GTE, - value=curr_time - 3600 * 24 * actuality_days, - ) + value=oldest_timestamp, + ), + MongoFilter( + key="cid", + value=[chat["_id"] for chat in chats], + logical_operator=MongoLogicalOperators.IN, + ), ] ) - relevant_shouts = set(x["_id"] for x in relevant_shouts) + cids_popularity_counter = Counter() + for shout in relevant_shouts: + cids_popularity_counter[str(shout["cid"])] += 1 formatted_chats = [] - for chat in chats: - chat_flow = set(chat.get("chat_flow", [])) - popularity = len(chat_flow.intersection(relevant_shouts)) - if chat["_id"] is not None: - formatted_chats.append( - ChatPopularityRecord( - cid=str(chat["_id"]), - name=chat["conversation_name"], - popularity=popularity, - ) + for cid in cids_popularity_counter: + relevant_chat = [ + chat for chat in chats if str(chat.get("_id", "")) == str(cid) + ][0] + formatted_chats.append( + ChatPopularityRecord( + cid=cid, + name=relevant_chat["conversation_name"], + popularity=cids_popularity_counter[cid], ) + ) cls.last_updated_ts = int(time()) cls.__DATA = sorted(formatted_chats, key=lambda x: x.popularity, reverse=True) @@ -111,7 +131,8 @@ def increment_cid_popularity(cls, cid): matching_item = [item for item in cls.get_data() if item.cid == cid][0] matching_item.popularity += 1 except IndexError: - LOG.error(f"No cid matching = {cid}") + LOG.debug(f"No cid matching = {cid}") + cls.add_new_chat(cid=cid, popularity=1) @classmethod def get_first_n_items(cls, search_str, exclude_items: list = None, limit: int = 10): diff --git a/utils/database_utils/mongo_utils/queries/dao/abc.py b/utils/database_utils/mongo_utils/queries/dao/abc.py index afb55e00..ded0bcf9 100644 --- a/utils/database_utils/mongo_utils/queries/dao/abc.py +++ b/utils/database_utils/mongo_utils/queries/dao/abc.py @@ -28,6 +28,7 @@ from abc import ABC, abstractmethod +import pymongo from neon_sftp import NeonSFTPConnector from utils.database_utils import DatabaseController @@ -81,6 +82,7 @@ def list_items( self, filters: list[MongoFilter] = None, limit: int = None, + ordering_expression: dict[str, int] | None = None, result_as_cursor: bool = True, ) -> dict: """ @@ -88,12 +90,20 @@ def list_items( :param filters: filters to consider (optional) :param limit: limit number of returned attributes (optional) + :param ordering_expression: items ordering expression (optional) :param result_as_cursor: to return result as cursor (defaults to True) :returns results of FIND operation over the desired document according to applied filters """ result_filters = {} if limit: result_filters["limit"] = limit + if ordering_expression: + result_filters["sort"] = [] + for attr, order in ordering_expression.items(): + if order == -1: + result_filters["sort"].append((attr, pymongo.DESCENDING)) + else: + result_filters["sort"].append((attr, pymongo.ASCENDING)) items = self._execute_query( command=MongoCommands.FIND_ALL, filters=filters, diff --git a/utils/database_utils/mongo_utils/queries/dao/chats.py b/utils/database_utils/mongo_utils/queries/dao/chats.py index 9b71eb3d..12d69d4c 100644 --- a/utils/database_utils/mongo_utils/queries/dao/chats.py +++ b/utils/database_utils/mongo_utils/queries/dao/chats.py @@ -53,6 +53,7 @@ def get_conversation_data( limit: int = 1, allow_regex_search: bool = False, include_private: bool = False, + requested_user_id: str = None, ) -> Union[None, dict]: """ Gets matching conversation data @@ -61,6 +62,7 @@ def get_conversation_data( :param limit: limit found conversations :param allow_regex_search: to allow search for matching entries that CONTAIN :param search_str :param include_private: to include private conversations (defaults to False) + :param requested_user_id: id of the requested user (defaults to None) - used to find owned private conversations """ if isinstance(search_str, str): search_str = [search_str] @@ -98,24 +100,23 @@ def get_conversation_data( chats = chats[0] return chats - def add_shout(self, cid: str, shout_id: str): - return self._execute_query( - command=MongoCommands.UPDATE_MANY, - filters=MongoFilter(key="_id", value=cid), - data={"chat_flow": shout_id}, - data_action="push", - ) - def list_items( self, filters: list[MongoFilter] = None, limit: int = None, result_as_cursor: bool = True, include_private: bool = False, + requested_user_id: str = None, ) -> dict: filters = filters or [] if not include_private: - filters.append(MongoFilter(key="is_private", value=False)) + expression = {"is_private": False} + if requested_user_id: + expression["user_id"] = requested_user_id + expression = MongoFilter( + value=expression, logical_operator=MongoLogicalOperators.OR + ) + filters.append(expression) return super().list_items( filters=filters, limit=limit, diff --git a/utils/database_utils/mongo_utils/queries/mongo_queries.py b/utils/database_utils/mongo_utils/queries/mongo_queries.py index 6bb4bc29..0cf94b9b 100644 --- a/utils/database_utils/mongo_utils/queries/mongo_queries.py +++ b/utils/database_utils/mongo_utils/queries/mongo_queries.py @@ -25,10 +25,10 @@ # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - +from time import time from typing import List, Tuple +from ..structures import MongoFilter from .constants import UserPatterns, ConversationSkins from .wrapper import MongoDocumentsAPI from utils.logging_utils import LOG @@ -84,18 +84,16 @@ def get_translations(translation_mapping: dict) -> Tuple[dict, dict]: def fetch_message_data( skin: ConversationSkins, conversation_data: dict, - start_idx: int = 0, limit: int = 100, fetch_senders: bool = True, - start_message_id: str = None, + creation_time_filter: MongoFilter = None, ) -> list[dict]: """Fetches message data based on provided conversation skin""" message_data = fetch_shout_data( conversation_data=conversation_data, fetch_senders=fetch_senders, - start_idx=start_idx, - id_from=start_message_id, limit=limit, + creation_time_filter=creation_time_filter, ) for message in message_data: message["message_type"] = "plain" @@ -122,63 +120,56 @@ def fetch_message_data( def fetch_shout_data( conversation_data: dict, - start_idx: int = 0, limit: int = 100, fetch_senders: bool = True, - id_from: str = None, - shout_ids: List[str] = None, -) -> List[dict]: - """ - Fetches shout data out of conversation data - - :param conversation_data: input conversation data - :param start_idx: message index to start from (sorted by recency) - :param limit: number of shouts to fetch - :param fetch_senders: to fetch shout senders data - :param id_from: message id to start from - :param shout_ids: list of shout ids to fetch - """ - if not shout_ids and conversation_data.get("chat_flow", None): - if id_from: - try: - start_idx = len(conversation_data["chat_flow"]) - conversation_data[ - "chat_flow" - ].index(id_from) - except ValueError: - LOG.warning("Matching start message id not found") - return [] - if start_idx == 0: - conversation_data["chat_flow"] = conversation_data["chat_flow"][ - start_idx - limit : - ] - else: - conversation_data["chat_flow"] = conversation_data["chat_flow"][ - -start_idx - limit : -start_idx - ] - shout_ids = [str(msg_id) for msg_id in conversation_data["chat_flow"]] - shouts = MongoDocumentsAPI.SHOUTS.fetch_shouts(shout_ids=shout_ids) - result = list() - if shouts and fetch_senders: - users_from_shouts = MongoDocumentsAPI.USERS.list_contains( - source_set=[shout["user_id"] for shout in shouts] + creation_time_filter: MongoFilter = None, + shout_ids: list = None, +): + query_filters = [MongoFilter(key="cid", value=conversation_data["_id"])] + if creation_time_filter: + query_filters.append(creation_time_filter) + if shout_ids: + shouts = MongoDocumentsAPI.SHOUTS.list_contains( + source_set=shout_ids, + aggregate_result=False, + result_as_cursor=False, + filters=query_filters, + limit=limit, + ordering_expression={"created_on": -1}, ) - for shout in shouts: - matching_user = users_from_shouts.get(shout["user_id"], {}) - if not matching_user: - matching_user = MongoDocumentsAPI.USERS.create_from_pattern( - UserPatterns.UNRECOGNIZED_USER - ) - else: - matching_user = matching_user[0] - matching_user.pop("password", None) - matching_user.pop("is_tmp", None) - shout["message_id"] = shout["_id"] - shout_data = {**shout, **matching_user} - result.append(shout_data) - shouts = result + else: + shouts = MongoDocumentsAPI.SHOUTS.list_items( + filters=query_filters, + limit=limit, + ordering_expression={"created_on": -1}, + result_as_cursor=False, + ) + if shouts and fetch_senders: + shouts = _attach_senders_data(shouts=shouts) return sorted(shouts, key=lambda user_shout: int(user_shout["created_on"])) +def _attach_senders_data(shouts: list[dict]): + result = list() + users_from_shouts = MongoDocumentsAPI.USERS.list_contains( + source_set=[shout["user_id"] for shout in shouts] + ) + for shout in shouts: + matching_user = users_from_shouts.get(shout["user_id"], {}) + if not matching_user: + matching_user = MongoDocumentsAPI.USERS.create_from_pattern( + UserPatterns.UNRECOGNIZED_USER + ) + else: + matching_user = matching_user[0] + matching_user.pop("password", None) + matching_user.pop("is_tmp", None) + shout["message_id"] = shout["_id"] + shout_data = {**shout, **matching_user} + result.append(shout_data) + return result + + def fetch_prompt_data( cid: str, limit: int = 100, @@ -243,6 +234,7 @@ def fetch_prompt_data( def add_shout(data: dict): """Records shout data and pushes its id to the relevant conversation flow""" MongoDocumentsAPI.SHOUTS.add_item(data=data) - if cid := data.get("cid"): - shout_id = data["_id"] - MongoDocumentsAPI.CHATS.add_shout(cid=cid, shout_id=shout_id) + MongoDocumentsAPI.CHATS.update_item( + filters=MongoFilter(key="_id", value=data["cid"]), + data={"last_shout_ts": int(time())}, + )