Skip to content

Commit 19a08d8

Browse files
committed
feat: added v1 datalake, improved communication with data lake handler in main and a2a_handler
1 parent a0c6a69 commit 19a08d8

File tree

1 file changed

+274
-0
lines changed

1 file changed

+274
-0
lines changed

agentic_rag/test_data_lake.py

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
"""
2+
Test script for the Oracle DB Data Lake Event Logging system
3+
4+
This script demonstrates the data lake functionality by:
5+
1. Initializing the event logger
6+
2. Logging sample events of each type
7+
3. Querying and displaying statistics
8+
4. Showing recent events
9+
"""
10+
11+
import sys
12+
import time
13+
from datetime import datetime
14+
15+
try:
16+
from OraDBEventLogger import OraDBEventLogger
17+
except ImportError:
18+
print("❌ Error: Could not import OraDBEventLogger")
19+
print("Make sure Oracle DB credentials are configured in config.yaml")
20+
sys.exit(1)
21+
22+
23+
def print_section(title):
24+
"""Print a formatted section header"""
25+
print("\n" + "="*70)
26+
print(f" {title}")
27+
print("="*70 + "\n")
28+
29+
30+
def test_event_logging():
31+
"""Test the event logging system"""
32+
33+
print_section("Oracle DB Data Lake Event Logger Test")
34+
35+
# Initialize logger
36+
print("1️⃣ Initializing Event Logger...")
37+
try:
38+
logger = OraDBEventLogger()
39+
print("✅ Event logger initialized successfully\n")
40+
except Exception as e:
41+
print(f"❌ Failed to initialize event logger: {str(e)}")
42+
return
43+
44+
# Test A2A Event Logging
45+
print_section("Testing A2A Event Logging")
46+
47+
print("Logging Planner Agent event...")
48+
logger.log_a2a_event(
49+
agent_id="planner_agent_v1",
50+
agent_name="Strategic Planner",
51+
method="agent.query",
52+
system_prompt="You are a strategic planning agent with expertise in problem decomposition.",
53+
user_prompt="How can I build a distributed RAG system?",
54+
response="Step 1: Design the architecture\nStep 2: Implement A2A protocol\nStep 3: Deploy specialized agents\nStep 4: Set up monitoring",
55+
metadata={"query_type": "planning", "steps_generated": 4},
56+
duration_ms=1234.5,
57+
status="success"
58+
)
59+
print("✅ Planner event logged\n")
60+
61+
print("Logging Researcher Agent event...")
62+
logger.log_a2a_event(
63+
agent_id="researcher_agent_v1",
64+
agent_name="Deep Researcher",
65+
method="agent.query",
66+
system_prompt="You are a research agent with expertise in information gathering.",
67+
user_prompt="Research distributed systems architecture",
68+
response="Key findings: Microservices, event-driven architecture, message queues...",
69+
metadata={"findings_count": 5, "sources_consulted": 3},
70+
duration_ms=2345.6,
71+
status="success"
72+
)
73+
print("✅ Researcher event logged\n")
74+
75+
print("Logging Synthesizer Agent event...")
76+
logger.log_a2a_event(
77+
agent_id="synthesizer_agent_v1",
78+
agent_name="Knowledge Synthesizer",
79+
method="agent.query",
80+
system_prompt="You are a synthesis agent that combines multiple perspectives.",
81+
user_prompt="Synthesize findings about distributed RAG",
82+
response="Based on the research, a distributed RAG system requires careful consideration of...",
83+
metadata={"reasoning_steps_count": 4},
84+
duration_ms=1567.8,
85+
status="success"
86+
)
87+
print("✅ Synthesizer event logged\n")
88+
89+
# Test API Event Logging
90+
print_section("Testing API Event Logging")
91+
92+
print("Logging /query endpoint event...")
93+
logger.log_api_event(
94+
endpoint="/query",
95+
method="POST",
96+
request_data={
97+
"query": "What is machine learning?",
98+
"use_cot": True,
99+
"model": "qwen2"
100+
},
101+
response_data={
102+
"answer_length": 450,
103+
"context_chunks": 3
104+
},
105+
status_code=200,
106+
duration_ms=3456.7,
107+
user_agent="Mozilla/5.0",
108+
client_ip="127.0.0.1"
109+
)
110+
print("✅ API event logged\n")
111+
112+
print("Logging /a2a endpoint event...")
113+
logger.log_api_event(
114+
endpoint="/a2a",
115+
method="POST",
116+
request_data={
117+
"method": "document.query",
118+
"params": {"query": "Explain neural networks", "collection": "PDF"}
119+
},
120+
response_data={
121+
"result": "Neural networks are...",
122+
"sources": ["paper1.pdf", "paper2.pdf"]
123+
},
124+
status_code=200,
125+
duration_ms=2567.8
126+
)
127+
print("✅ A2A API event logged\n")
128+
129+
# Test Model Event Logging
130+
print_section("Testing Model Event Logging")
131+
132+
print("Logging qwen2 model inference...")
133+
logger.log_model_event(
134+
model_name="qwen2",
135+
model_type="ollama",
136+
system_prompt="You are a helpful AI assistant.",
137+
user_prompt="Explain quantum computing in simple terms",
138+
response="Quantum computing uses quantum mechanics principles...",
139+
collection_used="general_knowledge",
140+
use_cot=False,
141+
tokens_used=256,
142+
duration_ms=1890.2,
143+
context_chunks=0
144+
)
145+
print("✅ Model event logged\n")
146+
147+
print("Logging deepseek-r1 model inference with CoT...")
148+
logger.log_model_event(
149+
model_name="deepseek-r1",
150+
model_type="ollama",
151+
system_prompt="You are an analytical reasoning agent.",
152+
user_prompt="How does A2A protocol improve distributed systems?",
153+
response="Through standardized communication, agent discovery, and task management...",
154+
collection_used="repository_documents",
155+
use_cot=True,
156+
tokens_used=512,
157+
duration_ms=3456.1,
158+
context_chunks=5
159+
)
160+
print("✅ Model event logged\n")
161+
162+
# Test Document Event Logging
163+
print_section("Testing Document Event Logging")
164+
165+
print("Logging PDF document processing...")
166+
logger.log_document_event(
167+
document_type="pdf",
168+
document_id="doc_12345",
169+
source="machine_learning_research.pdf",
170+
chunks_processed=45,
171+
processing_time_ms=5678.9,
172+
status="success"
173+
)
174+
print("✅ Document event logged\n")
175+
176+
print("Logging repository processing...")
177+
logger.log_document_event(
178+
document_type="repository",
179+
document_id="repo_67890",
180+
source="https://github.com/example/agentic-rag",
181+
chunks_processed=123,
182+
processing_time_ms=8901.2,
183+
status="success"
184+
)
185+
print("✅ Repository event logged\n")
186+
187+
# Test Query Event Logging
188+
print_section("Testing Query Event Logging")
189+
190+
print("Logging vector store query...")
191+
logger.log_query_event(
192+
query_text="machine learning algorithms",
193+
collection_name="pdf_documents",
194+
results_count=10,
195+
query_time_ms=123.4,
196+
metadata={"similarity_threshold": 0.7}
197+
)
198+
print("✅ Query event logged\n")
199+
200+
# Get Statistics
201+
print_section("Event Statistics")
202+
203+
stats = logger.get_statistics()
204+
print(f"Total Events: {stats['total_events']}")
205+
print(f"A2A Events: {stats['a2a_events']}")
206+
print(f"API Events: {stats['api_events']}")
207+
print(f"Model Events: {stats['model_events']}")
208+
print(f"Document Events: {stats['document_events']}")
209+
print(f"Query Events: {stats['query_events']}")
210+
print(f"\nAvg A2A Duration: {stats['avg_a2a_duration_ms']:.2f} ms")
211+
print(f"Avg Model Duration: {stats['avg_model_duration_ms']:.2f} ms")
212+
213+
if stats['top_models']:
214+
print("\nTop Models:")
215+
for i, model_stat in enumerate(stats['top_models'], 1):
216+
print(f" {i}. {model_stat['model']}: {model_stat['count']} calls")
217+
218+
# Show Recent Events
219+
print_section("Recent A2A Events (Last 5)")
220+
221+
recent_events = logger.get_events(event_type="a2a", limit=5)
222+
for i, event in enumerate(recent_events, 1):
223+
print(f"{i}. Agent: {event.get('AGENT_NAME', 'Unknown')}")
224+
print(f" Method: {event.get('METHOD', 'N/A')}")
225+
print(f" Duration: {event.get('DURATION_MS', 0):.2f} ms")
226+
print(f" Status: {event.get('STATUS', 'N/A')}")
227+
print(f" Time: {event.get('TIMESTAMP', 'N/A')}")
228+
print()
229+
230+
# Show Recent Model Events
231+
print_section("Recent Model Events (Last 3)")
232+
233+
model_events = logger.get_events(event_type="model", limit=3)
234+
for i, event in enumerate(model_events, 1):
235+
print(f"{i}. Model: {event.get('MODEL_NAME', 'Unknown')} ({event.get('MODEL_TYPE', 'N/A')})")
236+
print(f" CoT: {'Yes' if event.get('USE_COT') == 1 else 'No'}")
237+
print(f" Duration: {event.get('DURATION_MS', 0):.2f} ms")
238+
print(f" Context Chunks: {event.get('CONTEXT_CHUNKS', 0)}")
239+
print(f" Time: {event.get('TIMESTAMP', 'N/A')}")
240+
print()
241+
242+
# Show Event Counts
243+
print_section("Event Counts by Type")
244+
245+
for event_type in ["a2a", "api", "model", "document", "query"]:
246+
count = logger.get_event_count(event_type)
247+
print(f"{event_type.upper():12s}: {count:6d} events")
248+
249+
# Close connection
250+
print_section("Cleanup")
251+
logger.close()
252+
print("✅ Database connection closed")
253+
254+
print_section("Test Complete")
255+
print("✅ All event types tested successfully!")
256+
print("\nThe data lake is now storing all events in Oracle DB 23ai.")
257+
print("You can query these events using:")
258+
print(" - SQL queries directly on the database")
259+
print(" - REST API endpoints (/events/statistics, /events/{type})")
260+
print(" - OraDBEventLogger Python API")
261+
print()
262+
263+
264+
if __name__ == "__main__":
265+
try:
266+
test_event_logging()
267+
except KeyboardInterrupt:
268+
print("\n\n⚠️ Test interrupted by user")
269+
except Exception as e:
270+
print(f"\n\n❌ Test failed with error: {str(e)}")
271+
import traceback
272+
print("\nTraceback:")
273+
print(traceback.format_exc())
274+

0 commit comments

Comments
 (0)