Skip to content

Commit

Permalink
Added support for banning/unbanning subminds and admins endpoint for …
Browse files Browse the repository at this point in the history
…fetching chats data
  • Loading branch information
kirgrim committed Jan 5, 2024
1 parent eb85ddc commit fdd4e91
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 16 deletions.
28 changes: 27 additions & 1 deletion chat_server/blueprints/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@

from fastapi import APIRouter
from starlette.requests import Request
from starlette.responses import JSONResponse

from chat_server.server_utils.db_utils import DbUtils
from utils.logging_utils import LOG
from utils.http_utils import respond

from chat_server.server_config import k8s_config
from chat_server.server_config import k8s_config, db_controller
from chat_server.server_utils.auth import login_required
from chat_server.server_utils.k8s_utils import restart_deployment
from chat_server.server_utils.admin_utils import run_mq_validation
Expand Down Expand Up @@ -72,3 +74,27 @@ async def refresh_state(
else:
return respond(f"Unknown refresh type: {service_name!r}", 404)
return respond("OK")


@router.get("/chats/list")
@login_required(tmp_allowed=False, required_roles=["admin"])
async def chats_overview(request: Request, search_str: str = ""):
conversations_data = DbUtils.get_conversation_data(
search_str=search_str,
limit=100,
allow_regex_search=True,
)
result_data = []

for conversation_data in conversations_data:

result_data.append(
{
"cid": conversation_data["_id"],
"conversation_name": conversation_data["conversation_name"],
"bound_service": conversation_data.get("bound_service", ""),
}
)
# TODO: sort it based on PopularityCounter.get_first_n_items

return JSONResponse(content=dict(data=result_data))
42 changes: 29 additions & 13 deletions chat_server/server_utils/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import re
from typing import List, Tuple, Union, Dict

import pymongo
Expand Down Expand Up @@ -129,12 +129,18 @@ def list_items(

@classmethod
def get_conversation_data(
cls, search_str: Union[list, str], column_identifiers: List[str] = None
cls,
search_str: Union[list, str],
column_identifiers: List[str] = None,
limit: int = 1,
allow_regex_search: bool = False,
) -> Union[None, dict]:
"""
Gets matching conversation data
:param search_str: search string to lookup
:param column_identifiers: desired column identifiers to lookup
:param column_identifiers: desired column identifiers to look up
:param limit: limit found conversations
:param allow_regex_search: to allow search for matching entries that CONTAIN :param search_str
"""
if isinstance(search_str, str):
search_str = [search_str]
Expand All @@ -148,21 +154,31 @@ def get_conversation_data(
or_expression.append({identifier: ObjectId(_keyword)})
except:
pass
if allow_regex_search:
if not _keyword:
expression = ".*"
else:
expression = f".*{_keyword}.*"
_keyword = re.compile(expression, re.IGNORECASE)
or_expression.append({identifier: _keyword})

conversation_data = cls.db_controller.exec_query(
MongoQuery(
command=MongoCommands.FIND_ONE,
document=MongoDocuments.CHATS,
filters=MongoFilter(
value=or_expression, logical_operator=MongoLogicalOperators.OR
conversations_data = list(
cls.db_controller.exec_query(
MongoQuery(
command=MongoCommands.FIND_ALL,
document=MongoDocuments.CHATS,
filters=MongoFilter(
value=or_expression, logical_operator=MongoLogicalOperators.OR
),
result_filters={"limit": limit},
),
)
)
if not conversation_data:
return
conversation_data["_id"] = str(conversation_data["_id"])
return conversation_data
for conversation_data in conversations_data:
conversation_data["_id"] = str(conversation_data["_id"])
if conversations_data and limit == 1:
conversations_data = conversations_data[0]
return conversations_data

@classmethod
def fetch_shout_data(
Expand Down
4 changes: 2 additions & 2 deletions chat_server/sio.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,14 +738,14 @@ async def broadcast(sid, data):
# TODO: introduce certification mechanism to forward messages only from trusted entities
msg_type = data.pop("msg_type", None)
msg_receivers = data.pop("to", None)
if not msg_type:
LOG.error(f'data={data} skipped - no "msg_type" provided')
if msg_type:
await sio.emit(
msg_type,
data=data,
to=msg_receivers,
)
else:
LOG.error(f'data={data} skipped - no "msg_type" provided')


async def emit_error(
Expand Down
42 changes: 42 additions & 0 deletions services/klatchat_observer/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,16 @@ def register_sio_handlers(self):
self._sio.on(
"request_neon_translations", handler=self.request_neon_translations
)
self._sio.on("ban_submind", handler=self.request_ban_submind)
self._sio.on(
"ban_submind_from_conversation",
handler=self.request_ban_submind_from_conversation,
)
self._sio.on("revoke_submind_ban", handler=self.request_revoke_submind_ban)
self._sio.on(
"revoke_submind_ban_from_conversation",
handler=self.request_revoke_submind_ban_from_conversation,
)

@retry(use_self=True)
def connect_sio(self):
Expand Down Expand Up @@ -717,3 +727,35 @@ def on_subminds_state(self, body: dict):
LOG.info(f"Received submind state: {body}")
body["msg_type"] = "subminds_state"
self.sio.emit("broadcast", data=body)

def request_ban_submind(self, data: dict):
self.send_message(
request_data=data,
vhost=self.get_vhost("chatbots"),
queue="ban_submind",
expiration=3000,
)

def request_ban_submind_from_conversation(self, data: dict):
self.send_message(
request_data=data,
vhost=self.get_vhost("chatbots"),
queue="ban_submind_from_conversation",
expiration=3000,
)

def request_revoke_submind_ban(self, data: dict):
self.send_message(
request_data=data,
vhost=self.get_vhost("chatbots"),
queue="revoke_submind_ban",
expiration=3000,
)

def request_revoke_submind_ban_from_conversation(self, data: dict):
self.send_message(
request_data=data,
vhost=self.get_vhost("chatbots"),
queue="revoke_submind_ban_from_conversation",
expiration=3000,
)

0 comments on commit fdd4e91

Please sign in to comment.