Skip to content

Commit e44ea81

Browse files
authored
Update dist.py
1 parent d4380a3 commit e44ea81

File tree

1 file changed

+46
-22
lines changed

1 file changed

+46
-22
lines changed

utils/dist.py

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
CUCKOO_ROOT = os.path.join(os.path.abspath(os.path.dirname(__file__)), "..")
3636
sys.path.append(CUCKOO_ROOT)
3737

38+
from lib.cuckoo.common.iocs import dump_iocs, load_iocs
3839
from lib.cuckoo.common.cleaners_utils import free_space_monitor
3940
from lib.cuckoo.common.config import Config
4041
from lib.cuckoo.common.dist_db import ExitNodes, Machine, Node, Task, create_session
@@ -54,6 +55,7 @@
5455
init_database,
5556
)
5657
from lib.cuckoo.core.database import Task as MD_Task
58+
from dev_utils.mongodb import mongo_update_one
5759

5860
dist_conf = Config("distributed")
5961
main_server_name = dist_conf.distributed.get("main_server_name", "master")
@@ -369,7 +371,7 @@ def node_submit_task(task_id, node_id, main_task_id):
369371
7. Logs relevant information and errors during the process.
370372
"""
371373
db = session()
372-
node = db.scalar(select(Node.id, Node.name, Node.url, Node.apikey).where(Node.id == node_id))
374+
node = db.scalar(select(Node).where(Node.id == node_id))
373375
task = db.get(Task, task_id)
374376
check = False
375377
try:
@@ -939,6 +941,11 @@ def fetch_latest_reports_nfs(self):
939941

940942
start_copy = timeit.default_timer()
941943
copied = node_get_report_nfs(t.task_id, node.name, t.main_task_id)
944+
945+
if not copied:
946+
log.error("Can't copy report %d from node: %s for task: %d", t.task_id, node.name, t.main_task_id)
947+
continue
948+
942949
timediff = timeit.default_timer() - start_copy
943950
log.info(
944951
"It took %s seconds to copy report %d from node: %s for task: %d",
@@ -948,17 +955,20 @@ def fetch_latest_reports_nfs(self):
948955
t.main_task_id,
949956
)
950957

951-
if not copied:
952-
log.error("Can't copy report %d from node: %s for task: %d", t.task_id, node.name, t.main_task_id)
953-
continue
954-
955958
# this doesn't exist for some reason
956959
if path_exists(t.path):
957960
sample_sha256 = None
961+
sample_parent = None
958962
with main_db.session.begin():
959963
samples = main_db.find_sample(task_id=t.main_task_id)
960964
if samples:
961965
sample_sha256 = samples[0].sample.sha256
966+
if hasattr(samples[0].sample, "parent_links"):
967+
for parent in samples[0].sample.parent_links:
968+
if parent.task_id == t.main_task_id:
969+
sample_parent = parent.parent.to_dict()
970+
break
971+
962972
if sample_sha256 is None:
963973
# keep fallback for now
964974
sample = open(t.path, "rb").read()
@@ -980,6 +990,18 @@ def fetch_latest_reports_nfs(self):
980990

981991
self.delete_target_file(t.main_task_id, sample_sha256, t.path)
982992

993+
if sample_parent:
994+
try:
995+
report = load_iocs(t.main_task_id, detail=True)
996+
report["info"].update({"parent_sample": sample_parent})
997+
dump_iocs(report, t.main_task_id)
998+
# ToDo insert into mongo
999+
mongo_update_one(
1000+
"analysis", {"info.id": int(t.main_task_id)}, {"$set": {"info.parent_sample": sample_parent}}
1001+
)
1002+
except Exception as e:
1003+
log.exception("Failed to save iocs for parent sample: %s", str(e))
1004+
9831005
t.retrieved = True
9841006
t.finished = True
9851007
db.commit()
@@ -1059,7 +1081,7 @@ def fetch_latest_reports(self):
10591081
main_db.set_status(t.main_task_id, TASK_REPORTED)
10601082

10611083
# Fetch each requested report.
1062-
node = db.scalar(select(Node.id, Node.name, Node.url, Node.apikey).where(Node.id == node_id))
1084+
node = db.scalar(select(Node).where(Node.id == node_id))
10631085
report = node_get_report(t.task_id, "dist/", node.url, node.apikey, stream=True)
10641086

10651087
if report is None:
@@ -1217,12 +1239,12 @@ class StatusThread(threading.Thread):
12171239
The main loop that continuously checks the status of nodes and submits tasks.
12181240
"""
12191241

1220-
def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_push=False, db=None):
1242+
def submit_tasks(self, node_name, pend_tasks_num, options_like=False, force_push_push=False, db=None):
12211243
"""
12221244
Submits tasks to a specified node.
12231245
12241246
Args:
1225-
node_id (str): The identifier of the node to which tasks will be submitted.
1247+
node_name (str): The identifier of the node to which tasks will be submitted.
12261248
pend_tasks_num (int): The number of pending tasks to be submitted.
12271249
options_like (bool, optional): Flag to filter tasks based on options. Defaults to False.
12281250
force_push_push (bool, optional): Flag to forcefully push tasks to the node. Defaults to False.
@@ -1238,7 +1260,7 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
12381260
# HACK do not create a new session if the current one (passed as parameter) is still valid.
12391261
try:
12401262
# ToDo name should be id?
1241-
node = db.scalar(select(Node).where(Node.name == node_id))
1263+
node = db.scalar(select(Node).where(Node.name == node_name))
12421264
except (OperationalError, SQLAlchemyError) as e:
12431265
log.warning("Got an operational Exception when trying to submit tasks: %s", str(e))
12441266
return False
@@ -1291,7 +1313,6 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
12911313
)
12921314
main_db.set_status(t.id, TASK_BANNED)
12931315
continue
1294-
12951316
force_push = False
12961317
try:
12971318
# check if node exist and its correct
@@ -1308,18 +1329,17 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
13081329
except Exception as e:
13091330
log.exception(e)
13101331
# wtf are you doing in pendings?
1311-
tasks = db.scalars(select(Task).where(Task.main_task_id == t.id))
1332+
tasks = db.scalars(select(Task).where(Task.main_task_id == t.id)).all()
13121333
if tasks:
13131334
for task in tasks:
1314-
# log.info("Deleting incorrectly uploaded file from dist db, main_task_id: %s", t.id)
1335+
log.info("Deleting incorrectly uploaded file from dist db, main_task_id: %s", t.id)
13151336
if node.name == main_server_name:
13161337
main_db.set_status(t.id, TASK_RUNNING)
13171338
else:
13181339
main_db.set_status(t.id, TASK_DISTRIBUTED)
13191340
# db.delete(task)
13201341
db.commit()
13211342
continue
1322-
13231343
# Convert array of tags into comma separated list
13241344
tags = ",".join([tag.name for tag in t.tags])
13251345
# Append a comma, to make LIKE searches more precise
@@ -1340,7 +1360,6 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
13401360
t.options = ",".join([f"{k}={v}" for k, v in options.items()])
13411361
if t.options:
13421362
t.options += ","
1343-
13441363
t.options += f"main_task_id={t.id}"
13451364
args = dict(
13461365
package=t.package,
@@ -1360,7 +1379,6 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
13601379
tlp=t.tlp,
13611380
)
13621381
task = Task(path=t.target, **args)
1363-
13641382
db.add(task)
13651383
try:
13661384
db.commit()
@@ -1370,7 +1388,6 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
13701388
db.rollback()
13711389
log.info(e)
13721390
continue
1373-
13741391
if force_push or force_push_push:
13751392
# Submit appropriate tasks to node
13761393
submitted = node_submit_task(task.id, node.id, t.id)
@@ -1392,9 +1409,11 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
13921409
if q is None:
13931410
db.commit()
13941411
return True
1412+
13951413
# Order by task priority and task id.
13961414
q = q.order_by(-Task.priority, Task.main_task_id)
13971415
# if we have node set in options push
1416+
13981417
if dist_conf.distributed.enable_tags:
13991418
# Create filter query from tasks in ta
14001419
tags = [getattr(Task, "tags") == ""]
@@ -1410,14 +1429,18 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
14101429
tags.append(and_(*t_combined))
14111430
# Filter by available tags
14121431
q = q.filter(or_(*tags))
1432+
14131433
to_upload = q.limit(pend_tasks_num).all()
14141434
"""
14151435
# 1. Start with a select() statement and initial filters.
1416-
stmt = select(Task).where(or_(Task.node_id.is_(None), Task.task_id.is_(None)), Task.finished.is_(False))
1417-
1418-
# 2. Apply ordering with modern syntax.
1419-
stmt = stmt.order_by(Task.priority.desc(), Task.main_task_id)
1420-
1436+
stmt = (
1437+
select(Task)
1438+
.where(or_(Task.node_id.is_(None), Task.task_id.is_(None)), Task.finished.is_(False))
1439+
.order_by(Task.priority.desc(), Task.main_task_id)
1440+
)
1441+
# print(stmt, "stmt")
1442+
# ToDo broken
1443+
"""
14211444
# 3. Apply the dynamic tag filter.
14221445
if dist_conf.distributed.enable_tags:
14231446
tags_conditions = [Task.tags == ""]
@@ -1432,9 +1455,10 @@ def submit_tasks(self, node_id, pend_tasks_num, options_like=False, force_push_p
14321455
tags_conditions.append(and_(*t_combined))
14331456
14341457
stmt = stmt.where(or_(*tags_conditions))
1435-
1458+
"""
14361459
# 4. Apply the limit and execute the query.
14371460
to_upload = db.scalars(stmt.limit(pend_tasks_num)).all()
1461+
print(to_upload, node.name, pend_tasks_num)
14381462

14391463
if not to_upload:
14401464
db.commit()

0 commit comments

Comments
 (0)