Skip to content

Move crawl and QA logs to new mongo collection #2791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions backend/btrixcloud/crawl_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""crawl logs"""

from typing import TYPE_CHECKING, Any, Optional, Dict, Tuple, List

import json
from uuid import UUID, uuid4

from fastapi import HTTPException
import pymongo

from .models import CrawlLogLine, Organization
from .pagination import DEFAULT_PAGE_SIZE

if TYPE_CHECKING:
from .orgs import OrgOps
else:
OrgOps = object


# ============================================================================
class CrawlLogOps:
"""crawl log management"""

org_ops: OrgOps

# pylint: disable=too-many-locals, too-many-arguments, invalid-name

def __init__(self, mdb, org_ops):
self.logs = mdb["crawl_logs"]
self.org_ops = org_ops

async def init_index(self):
"""init index for crawl logs"""
await self.logs.create_index(
[
("crawlId", pymongo.HASHED),
("oid", pymongo.ASCENDING),
("qaRunId", pymongo.ASCENDING),
("timestamp", pymongo.ASCENDING),
]
)
await self.logs.create_index(
[
("crawlId", pymongo.HASHED),
("oid", pymongo.ASCENDING),
("qaRunId", pymongo.ASCENDING),
("logLevel", pymongo.ASCENDING),
]
)
await self.logs.create_index(
[
("crawlId", pymongo.HASHED),
("oid", pymongo.ASCENDING),
("qaRunId", pymongo.ASCENDING),
("context", pymongo.ASCENDING),
]
)
await self.logs.create_index(
[
("crawlId", pymongo.HASHED),
("oid", pymongo.ASCENDING),
("qaRunId", pymongo.ASCENDING),
("message", pymongo.ASCENDING),
]
)

async def add_log_line(
self,
crawl_id: str,
oid: UUID,
log_line: str,
qa_run_id: Optional[str] = None,
) -> bool:
"""add crawl log line to database"""
try:
log_dict = json.loads(log_line)

# Ensure details are a dictionary
# If they are a list, convert to a dict
details = None
log_dict_details = log_dict.get("details")
if log_dict_details:
if isinstance(log_dict_details, dict):
details = log_dict_details
else:
details = {"items": log_dict_details}

log_to_add = CrawlLogLine(
id=uuid4(),
crawlId=crawl_id,
oid=oid,
qaRunId=qa_run_id,
timestamp=log_dict["timestamp"],
logLevel=log_dict["logLevel"],
context=log_dict["context"],
message=log_dict["message"],
details=details,
)
res = await self.logs.insert_one(log_to_add.to_dict())
return res is not None
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error adding log line for crawl {crawl_id} to database: {err}",
flush=True,
)
return False

async def get_crawl_logs(
self,
org: Organization,
crawl_id: str,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
sort_by: str = "timestamp",
sort_direction: int = -1,
contexts: Optional[List[str]] = None,
log_levels: Optional[List[str]] = None,
qa_run_id: Optional[str] = None,
) -> Tuple[list[CrawlLogLine], int]:
"""list all logs for particular crawl"""
# pylint: disable=too-many-locals, duplicate-code

# Zero-index page for query
page = page - 1
skip = page_size * page

match_query: Dict[str, Any] = {
"oid": org.id,
"crawlId": crawl_id,
"qaRunId": qa_run_id,
}

if contexts:
match_query["context"] = {"$in": contexts}

if log_levels:
match_query["logLevel"] = {"$in": log_levels}

aggregate: List[Dict[str, Any]] = [{"$match": match_query}]

if sort_by:
if sort_by not in (
"timestamp",
"logLevel",
"context",
"message",
):
raise HTTPException(status_code=400, detail="invalid_sort_by")
if sort_direction not in (1, -1):
raise HTTPException(status_code=400, detail="invalid_sort_direction")

aggregate.extend([{"$sort": {sort_by: sort_direction}}])

aggregate.extend(
[
{
"$facet": {
"items": [
{"$skip": skip},
{"$limit": page_size},
],
"total": [{"$count": "count"}],
}
},
]
)

cursor = self.logs.aggregate(aggregate)
results = await cursor.to_list(length=1)
result = results[0]
items = result["items"]

try:
total = int(result["total"][0]["count"])
except (IndexError, ValueError):
total = 0

log_lines = [CrawlLogLine.from_dict(res) for res in items]

return log_lines, total
108 changes: 59 additions & 49 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datetime import datetime
from uuid import UUID

from typing import Optional, List, Dict, Union, Any, Sequence, AsyncIterator
from typing import Optional, List, Dict, Union, Any, Sequence, AsyncIterator, Tuple

from fastapi import Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
Expand All @@ -22,14 +22,14 @@
from .utils import (
dt_now,
date_to_str,
parse_jsonl_log_messages,
stream_dict_list_as_csv,
validate_regexes,
scale_from_browser_windows,
browser_windows_from_scale,
)
from .basecrawls import BaseCrawlOps
from .crawlmanager import CrawlManager
from .crawl_logs import CrawlLogOps
from .models import (
UpdateCrawl,
DeleteCrawlList,
Expand Down Expand Up @@ -66,6 +66,7 @@
CrawlScaleResponse,
CrawlQueueResponse,
MatchCrawlQueueResponse,
CrawlLogLine,
)


Expand All @@ -80,9 +81,10 @@ class CrawlOps(BaseCrawlOps):

crawl_manager: CrawlManager

def __init__(self, crawl_manager: CrawlManager, *args):
def __init__(self, crawl_manager: CrawlManager, log_ops: CrawlLogOps, *args):
super().__init__(*args)
self.crawl_manager = crawl_manager
self.log_ops = log_ops
self.crawl_configs.set_crawl_ops(self)
self.colls.set_crawl_ops(self)
self.event_webhook_ops.set_crawl_ops(self)
Expand Down Expand Up @@ -669,31 +671,6 @@ async def is_upload(self, crawl_id: str):
return False
return res.get("type") == "upload"

async def add_crawl_error(
self,
crawl_id: str,
is_qa: bool,
error: str,
) -> bool:
"""add crawl error from redis to mongodb errors field"""
prefix = "" if not is_qa else "qa."

res = await self.crawls.find_one_and_update(
{"_id": crawl_id}, {"$push": {f"{prefix}errors": error}}
)
return res is not None

async def add_crawl_behavior_log(
self,
crawl_id: str,
log_line: str,
) -> bool:
"""add crawl behavior log from redis to mongodb behaviorLogs field"""
res = await self.crawls.find_one_and_update(
{"_id": crawl_id}, {"$push": {"behaviorLogs": log_line}}
)
return res is not None

async def add_crawl_file(
self, crawl_id: str, is_qa: bool, crawl_file: CrawlFile, size: int
) -> bool:
Expand Down Expand Up @@ -1141,6 +1118,31 @@ async def get_qa_run_aggregate_stats(
textMatch=text_results,
)

async def get_crawl_logs(
self,
org: Organization,
crawl_id: str,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
sort_by: str = "timestamp",
sort_direction: int = 1,
contexts: Optional[List[str]] = None,
log_levels: Optional[List[str]] = None,
qa_run_id: Optional[str] = None,
) -> Tuple[list[CrawlLogLine], int]:
"""get crawl logs"""
return await self.log_ops.get_crawl_logs(
org,
crawl_id,
page_size=page_size,
page=page,
sort_by=sort_by,
sort_direction=sort_direction,
contexts=contexts,
log_levels=log_levels,
qa_run_id=qa_run_id,
)


# ============================================================================
async def recompute_crawl_file_count_and_size(crawls, crawl_id: str):
Expand All @@ -1162,11 +1164,13 @@ async def recompute_crawl_file_count_and_size(crawls, crawl_id: str):

# ============================================================================
# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
def init_crawls_api(crawl_manager: CrawlManager, app, user_dep, *args):
def init_crawls_api(
crawl_manager: CrawlManager, crawl_log_ops: CrawlLogOps, app, user_dep, *args
):
"""API for crawl management, including crawl done callback"""
# pylint: disable=invalid-name, duplicate-code

ops = CrawlOps(crawl_manager, *args)
ops = CrawlOps(crawl_manager, crawl_log_ops, *args)

org_viewer_dep = ops.orgs.org_viewer_dep
org_crawl_dep = ops.orgs.org_crawl_dep
Expand Down Expand Up @@ -1694,15 +1698,20 @@ async def get_crawl_errors(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
org: Organization = Depends(org_viewer_dep),
sortBy: str = "timestamp",
sortDirection: int = 1,
):
crawl = await ops.get_crawl(crawl_id, org)

skip = (page - 1) * pageSize
upper_bound = skip + pageSize

errors = crawl.errors[skip:upper_bound] if crawl.errors else []
parsed_errors = parse_jsonl_log_messages(errors)
return paginated_format(parsed_errors, len(crawl.errors or []), page, pageSize)
log_lines, total = await ops.get_crawl_logs(
org,
crawl_id,
page_size=pageSize,
page=page,
sort_by=sortBy,
sort_direction=sortDirection,
log_levels=["error", "fatal"],
qa_run_id=None,
)
return paginated_format(log_lines, total, page, pageSize)

@app.get(
"/orgs/{oid}/crawls/{crawl_id}/behaviorLogs",
Expand All @@ -1714,18 +1723,19 @@ async def get_crawl_behavior_logs(
pageSize: int = DEFAULT_PAGE_SIZE,
page: int = 1,
org: Organization = Depends(org_viewer_dep),
sortBy: str = "timestamp",
sortDirection: int = 1,
):
crawl = await ops.get_crawl(crawl_id, org)

skip = (page - 1) * pageSize
upper_bound = skip + pageSize

behavior_logs = (
crawl.behaviorLogs[skip:upper_bound] if crawl.behaviorLogs else []
)
parsed_logs = parse_jsonl_log_messages(behavior_logs)
return paginated_format(
parsed_logs, len(crawl.behaviorLogs or []), page, pageSize
log_lines, total = await ops.get_crawl_logs(
org,
crawl_id,
page_size=pageSize,
page=page,
sort_by=sortBy,
sort_direction=sortDirection,
contexts=["behavior", "behaviorScript", "behaviorScriptCustom"],
qa_run_id=None,
)
return paginated_format(log_lines, total, page, pageSize)

return ops
Loading
Loading