Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 1 addition & 28 deletions app/initial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,34 +413,7 @@ def init_mattermost_documents(db:Session, bot_obj: MattermostUserModel) -> None:
# nitmre-bot may be a member of 1000s of channels, this may take a lot of time
# start with 2 channels for now: fm_618aoc, jicc618aoc
channel_ids = ["49qb17rn4pyxzf8t7tn5q5i9by", "rzmytnht33fjxr7dy46p8aqb9e"]
history_depth = 0
filter_system_posts = True

adf = pd.DataFrame()
for channel_id in tqdm(channel_ids):
channel_obj = crud_mattermost.get_or_create_mm_channel_object(db, channel_id=channel_id)
df = mattermost_utils.get_channel_posts(
settings.mm_base_url,
settings.mm_token,
channel_id,
history_depth=history_depth,
filter_system_types=filter_system_posts).assign(channel=channel_obj.id)
adf = pd.concat([adf, df], ignore_index=True)
channel_uuids = adf['channel'].unique()

if not adf.empty:
user_ids = adf['user_id'].unique()
for uid in user_ids:
user_obj = crud_mattermost.get_or_create_mm_user_object(db, user_id=uid)
adf.loc[adf['user_id'] == uid, 'user'] = user_obj.id

channel_document_objs = crud_mattermost.mattermost_documents.get_all_channel_documents(
db, channels=channel_uuids)
existing_ids = [obj.message_id for obj in channel_document_objs]
adf = adf[~adf.id.isin(existing_ids)].drop_duplicates(subset='id')

adf.rename(columns={'id': 'message_id'}, inplace=True)
return crud_mattermost.mattermost_documents.create_all_using_df(db, ddf=adf, thread_type=ThreadTypeEnum.MESSAGE)
crud_mattermost.create_document_objects(db, channel_ids=channel_ids, history_depth=0, filter_system_posts=True)

########## large object uploads ################

Expand Down
52 changes: 50 additions & 2 deletions app/mattermost/crud/crud_mattermost.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from tqdm import tqdm
from fastapi import HTTPException
from sqlalchemy.orm import Session
from app.core.config import settings
Expand Down Expand Up @@ -376,8 +377,8 @@ def get_or_create_mm_user_object(db: Session, *, user_id: str):
user_name = mattermost_utils.get_user_name(
settings.mm_base_url, settings.mm_token, user_id)
if not user_name:
raise HTTPException(
status_code=422, detail="Mattermost user not found")
logger.warning('Mattermost user %s not found' % user_id)
return None
user_obj = populate_mm_user_team_info(
db, user_name=user_name)

Expand All @@ -397,6 +398,9 @@ def populate_mm_document_info(db: Session, *, document_df: pd.DataFrame):
user_ids = set(cdf.user_id)
for user_id in user_ids:
user_obj = get_or_create_mm_user_object(db, user_id=user_id)
if not user_obj:
continue

udf = cdf[cdf.user_id == user_id]
udf['channel'] = channel_obj.id
udf['user'] = user_obj.id
Expand All @@ -407,6 +411,50 @@ def populate_mm_document_info(db: Session, *, document_df: pd.DataFrame):

return new_mattermost_docs

def create_document_objects(db: Session, *,
channel_ids: list[str],
history_depth: int = mattermost_utils.DEFAULT_HISTORY_DEPTH_DAYS,
filter_system_posts: bool = True,
usernames_to_filter: list[str] = []):
adf = pd.DataFrame()
for channel_id in tqdm(channel_ids):
channel_obj = get_or_create_mm_channel_object(db, channel_id=channel_id)
df = mattermost_utils.get_channel_posts(
settings.mm_base_url,
settings.mm_token,
channel_id,
history_depth=history_depth,
filter_system_types=filter_system_posts,
usernames_to_filter=usernames_to_filter).assign(channel=channel_obj.id)
adf = pd.concat([adf, df], ignore_index=True)
channel_uuids = adf['channel'].unique()

# handle empty channels
# https://github.com/orgs/MIT-AI-Accelerator/projects/2/views/1?pane=issue&itemId=44143308
deleted_user_ids = []
if not adf.empty:
user_ids = adf['user_id'].unique()
for uid in user_ids:
user_obj = get_or_create_mm_user_object(db, user_id=uid)
# handle deleted users
# https://github.com/MIT-AI-Accelerator/c3po-model-server/issues/302
if not user_obj:
deleted_user_ids.append(uid)
continue
adf.loc[adf['user_id'] == uid, 'user'] = user_obj.id

channel_document_objs = mattermost_documents.get_all_channel_documents(
db, channels=channel_uuids)
existing_ids = [obj.message_id for obj in channel_document_objs]
adf = adf[~adf.id.isin(existing_ids)].drop_duplicates(subset='id')

invalid_post_index = adf[adf['user_id'].isin(deleted_user_ids)].index
adf = adf.drop(invalid_post_index)
adf.rename(columns={'id': 'message_id'}, inplace=True)
mattermost_documents.create_all_using_df(db, ddf=adf, thread_type=ThreadTypeEnum.MESSAGE)

return channel_uuids


# Takes message utterances (i.e. individual rows) from chat dataframe, and converts them to conversation threads
# Returns a dataframe with original structure; messages updated to include full conversation
Expand Down
33 changes: 5 additions & 28 deletions app/mattermost/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,34 +113,11 @@ async def upload_mm_channel_docs(request: UploadDocumentRequest, db: Session = D
if request.filter_bot_posts:
usernames_to_filter.add(mattermost_utils.MM_BOT_USERNAME)

adf = pd.DataFrame()
for channel_id in tqdm(request.channel_ids):
channel_obj = crud_mattermost.get_or_create_mm_channel_object(db, channel_id=channel_id)
df = mattermost_utils.get_channel_posts(
settings.mm_base_url,
settings.mm_token,
channel_id,
history_depth=request.history_depth,
filter_system_types=request.filter_system_posts,
usernames_to_filter=usernames_to_filter).assign(channel=channel_obj.id)
adf = pd.concat([adf, df], ignore_index=True)
channel_uuids = adf['channel'].unique()

# handle empty channels
# https://github.com/orgs/MIT-AI-Accelerator/projects/2/views/1?pane=issue&itemId=44143308
if not adf.empty:
user_ids = adf['user_id'].unique()
for uid in user_ids:
user_obj = crud_mattermost.get_or_create_mm_user_object(db, user_id=uid)
adf.loc[adf['user_id'] == uid, 'user'] = user_obj.id

channel_document_objs = crud_mattermost.mattermost_documents.get_all_channel_documents(
db, channels=channel_uuids)
existing_ids = [obj.message_id for obj in channel_document_objs]
adf = adf[~adf.id.isin(existing_ids)].drop_duplicates(subset='id')

adf.rename(columns={'id': 'message_id'}, inplace=True)
crud_mattermost.mattermost_documents.create_all_using_df(db, ddf=adf, thread_type=ThreadTypeEnum.MESSAGE)
channel_uuids = crud_mattermost.create_document_objects(db,
channel_ids=request.channel_ids,
history_depth=request.history_depth,
filter_system_posts=request.filter_system_posts,
usernames_to_filter=usernames_to_filter)

return crud_mattermost.mattermost_documents.get_all_channel_documents(db,
channels=channel_uuids,
Expand Down
35 changes: 28 additions & 7 deletions app/ppg_common/services/mattermost_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
from datetime import datetime
import pandas as pd
import requests
import time as tm
from app.core.logging import logger

HTTP_REQUEST_TIMEOUT_S = 60
DEFAULT_HISTORY_DEPTH_DAYS = 45
MM_BOT_USERNAME = "nitmre-bot"


def get_all_pages(url, mm_token, is_channel=False, do_pagination=True):
"""iterate through pages of an http request"""

Expand All @@ -23,17 +25,36 @@ def get_all_pages(url, mm_token, is_channel=False, do_pagination=True):
if not do_pagination:
do_loop = False

resp = requests.get(url, headers={'Authorization': f'Bearer {mm_token}'},
params={'page': page_num, 'per_page': per_page},
timeout=HTTP_REQUEST_TIMEOUT_S)
if resp.status_code < 400:
(rdf, rlen) = get_page_data(resp, rdf, per_page, is_channel)
try:
resp = requests.get(
url,
headers={'Authorization': f'Bearer {mm_token}'},
params={'page': page_num, 'per_page': per_page},
timeout=HTTP_REQUEST_TIMEOUT_S
)

# raise an HTTPError for bad responses (4xx and 5xx)
resp.raise_for_status()

rdf, rlen = get_page_data(resp, rdf, per_page, is_channel)

if rlen < per_page:
break

else:
logger.error(f"{resp.url} request failed: {resp.status_code}")
except requests.exceptions.ReadTimeout as e:
logger.error(f"{url} request timed out: {resp.status_code}. {e}")
logger.error(f"response headers: {resp.headers}")
logger.error(f"response content: {resp.text}")
break

except requests.exceptions.HTTPError as e:
logger.error(f"{url} request failed: {resp.status_code}. {e}")
logger.error(f"response headers: {resp.headers}")
logger.error(f"response content: {resp.text}")
break

except Exception as e:
logger.error(f"{url} unexpected error: {e}")
break

page_num += 1
Expand Down