diff --git a/chat_server/blueprints/admin.py b/chat_server/blueprints/admin.py index 1fc394f8..b0de348d 100644 --- a/chat_server/blueprints/admin.py +++ b/chat_server/blueprints/admin.py @@ -28,11 +28,13 @@ from fastapi import APIRouter from starlette.requests import Request +from starlette.responses import JSONResponse +from chat_server.server_utils.db_utils import DbUtils from utils.logging_utils import LOG from utils.http_utils import respond -from chat_server.server_config import k8s_config +from chat_server.server_config import k8s_config, db_controller from chat_server.server_utils.auth import login_required from chat_server.server_utils.k8s_utils import restart_deployment from chat_server.server_utils.admin_utils import run_mq_validation @@ -72,3 +74,27 @@ async def refresh_state( else: return respond(f"Unknown refresh type: {service_name!r}", 404) return respond("OK") + + +@router.get("/chats/list") +@login_required(tmp_allowed=False, required_roles=["admin"]) +async def chats_overview(request: Request, search_str: str = ""): + conversations_data = DbUtils.get_conversation_data( + search_str=search_str, + limit=100, + allow_regex_search=True, + ) + result_data = [] + + for conversation_data in conversations_data: + + result_data.append( + { + "cid": conversation_data["_id"], + "conversation_name": conversation_data["conversation_name"], + "bound_service": conversation_data.get("bound_service", ""), + } + ) + # TODO: sort it based on PopularityCounter.get_first_n_items + + return JSONResponse(content=dict(data=result_data)) diff --git a/chat_server/server_utils/db_utils.py b/chat_server/server_utils/db_utils.py index 45e29e9a..bf319811 100644 --- a/chat_server/server_utils/db_utils.py +++ b/chat_server/server_utils/db_utils.py @@ -25,7 +25,7 @@ # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - +import re from typing import List, Tuple, Union, Dict import pymongo @@ -129,12 +129,18 @@ def list_items( @classmethod def get_conversation_data( - cls, search_str: Union[list, str], column_identifiers: List[str] = None + cls, + search_str: Union[list, str], + column_identifiers: List[str] = None, + limit: int = 1, + allow_regex_search: bool = False, ) -> Union[None, dict]: """ Gets matching conversation data :param search_str: search string to lookup - :param column_identifiers: desired column identifiers to lookup + :param column_identifiers: desired column identifiers to look up + :param limit: limit found conversations + :param allow_regex_search: to allow search for matching entries that CONTAIN :param search_str """ if isinstance(search_str, str): search_str = [search_str] @@ -148,21 +154,31 @@ def get_conversation_data( or_expression.append({identifier: ObjectId(_keyword)}) except: pass + if allow_regex_search: + if not _keyword: + expression = ".*" + else: + expression = f".*{_keyword}.*" + _keyword = re.compile(expression, re.IGNORECASE) or_expression.append({identifier: _keyword}) - conversation_data = cls.db_controller.exec_query( - MongoQuery( - command=MongoCommands.FIND_ONE, - document=MongoDocuments.CHATS, - filters=MongoFilter( - value=or_expression, logical_operator=MongoLogicalOperators.OR + conversations_data = list( + cls.db_controller.exec_query( + MongoQuery( + command=MongoCommands.FIND_ALL, + document=MongoDocuments.CHATS, + filters=MongoFilter( + value=or_expression, logical_operator=MongoLogicalOperators.OR + ), + result_filters={"limit": limit}, ), ) ) - if not conversation_data: - return - conversation_data["_id"] = str(conversation_data["_id"]) - return conversation_data + for conversation_data in conversations_data: + conversation_data["_id"] = str(conversation_data["_id"]) + if conversations_data and limit == 1: + conversations_data = conversations_data[0] + return conversations_data @classmethod def fetch_shout_data( diff --git a/chat_server/sio.py b/chat_server/sio.py index c8cd17bb..42d55504 100644 --- a/chat_server/sio.py +++ b/chat_server/sio.py @@ -738,14 +738,14 @@ async def broadcast(sid, data): # TODO: introduce certification mechanism to forward messages only from trusted entities msg_type = data.pop("msg_type", None) msg_receivers = data.pop("to", None) - if not msg_type: - LOG.error(f'data={data} skipped - no "msg_type" provided') if msg_type: await sio.emit( msg_type, data=data, to=msg_receivers, ) + else: + LOG.error(f'data={data} skipped - no "msg_type" provided') async def emit_error( diff --git a/services/klatchat_observer/controller.py b/services/klatchat_observer/controller.py index 8129e0b3..f8a3b141 100644 --- a/services/klatchat_observer/controller.py +++ b/services/klatchat_observer/controller.py @@ -294,6 +294,16 @@ def register_sio_handlers(self): self._sio.on( "request_neon_translations", handler=self.request_neon_translations ) + self._sio.on("ban_submind", handler=self.request_ban_submind) + self._sio.on( + "ban_submind_from_conversation", + handler=self.request_ban_submind_from_conversation, + ) + self._sio.on("revoke_submind_ban", handler=self.request_revoke_submind_ban) + self._sio.on( + "revoke_submind_ban_from_conversation", + handler=self.request_revoke_submind_ban_from_conversation, + ) @retry(use_self=True) def connect_sio(self): @@ -717,3 +727,35 @@ def on_subminds_state(self, body: dict): LOG.info(f"Received submind state: {body}") body["msg_type"] = "subminds_state" self.sio.emit("broadcast", data=body) + + def request_ban_submind(self, data: dict): + self.send_message( + request_data=data, + vhost=self.get_vhost("chatbots"), + queue="ban_submind", + expiration=3000, + ) + + def request_ban_submind_from_conversation(self, data: dict): + self.send_message( + request_data=data, + vhost=self.get_vhost("chatbots"), + queue="ban_submind_from_conversation", + expiration=3000, + ) + + def request_revoke_submind_ban(self, data: dict): + self.send_message( + request_data=data, + vhost=self.get_vhost("chatbots"), + queue="revoke_submind_ban", + expiration=3000, + ) + + def request_revoke_submind_ban_from_conversation(self, data: dict): + self.send_message( + request_data=data, + vhost=self.get_vhost("chatbots"), + queue="revoke_submind_ban_from_conversation", + expiration=3000, + )