Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions # dify_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# dify_integration.py
from account_manager import AccountManager

account_manager = AccountManager()

def authenticate_user(username, password):
"""用于Dify认证的API端点"""
account = account_manager.authenticate(username, password)
if account:
return {
"success": True,
"user_id": account['user_id'],
"account_type": account['account_type']
}
return {"success": False, "message": "Invalid credentials"}

def get_account_info(user_id):
"""获取账户信息供智能体使用"""
account = account_manager.get_account(user_id)
if account:
return {
"balance": account['account_balance'],
"phone": account['phone'],
"transactions": account['transactions']
}
return None
37 changes: 37 additions & 0 deletions # generate_test_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# generate_test_data.py
import json
import random
import hashlib
from datetime import datetime, timedelta

def generate_accounts(num=10):
accounts = []
for i in range(1, num+1):
user_id = f"100{i:02d}"
accounts.append({
"user_id": user_id,
"username": f"customer{i}",
"password": hashlib.sha256(f"Password123!{i}".encode()).hexdigest(),
"phone": f"13800138{random.randint(100,999)}",
"account_type": random.choice(["personal", "business"]),
"account_balance": round(random.uniform(1000, 1000000), 2),
"last_login": (datetime.utcnow() - timedelta(days=random.randint(0,30))).isoformat() + 'Z',
"transactions": generate_transactions()
})
return accounts

def generate_transactions(max_txns=5):
txns = []
for i in range(random.randint(1, max_txns)):
txns.append({
"id": f"txn{random.randint(1000,9999)}",
"date": (datetime.utcnow() - timedelta(days=random.randint(1,90))).strftime("%Y-%m-%d"),
"amount": round(random.uniform(10, 10000), 2),
"type": random.choice(["deposit", "withdrawal", "transfer"])
})
return txns

if __name__ == "__main__":
accounts = generate_accounts()
with open('bank_users.json', 'w') as f:
json.dump(accounts, f, indent=2)
39 changes: 39 additions & 0 deletions # test_asr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# test_asr.py
import requests
import os

def test_asr():
"""测试ASR功能"""
print("测试语音识别功能...")

# 你可以录制一个英文语音文件来测试,或者使用现有的
test_audio_file = "E:\guolei\Documents\bank-user\tts_output\tts_1756829756_5376c557.wav" # 替换为你的测试文件

if not os.path.exists(test_audio_file):
print("请先创建一个测试音频文件")
return

try:
with open(test_audio_file, 'rb') as f:
files = {'audio': (test_audio_file, f, 'audio/wav')}
data = {'language': 'en'}

response = requests.post(
"http://127.0.0.1:8080/asr/transcribe",
files=files,
data=data,
timeout=30
)

print(f"状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
print(f"识别结果: {result}")
else:
print(f"错误: {response.text}")

except Exception as e:
print(f"测试失败: {e}")

if __name__ == "__main__":
test_asr()
46 changes: 46 additions & 0 deletions account_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# account_manager.py
import json
import hashlib
from datetime import datetime

class AccountManager:
def __init__(self, json_file='accounts.json'):
self.json_file = json_file
self.accounts = self._load_accounts()

def _load_accounts(self):
try:
with open(self.json_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return []

def _save_accounts(self):
with open(self.json_file, 'w') as f:
json.dump(self.accounts, f, indent=2)

def hash_password(self, password):
return hashlib.sha256(password.encode()).hexdigest()

def authenticate(self, username, password):
hashed_pw = self.hash_password(password)
for account in self.accounts:
if account['username'] == username and account['password'] == hashed_pw:
account['last_login'] = datetime.utcnow().isoformat() + 'Z'
self._save_accounts()
return account
return None

def get_account(self, user_id):
for account in self.accounts:
if account['user_id'] == user_id:
return account
return None

def update_account(self, user_id, updates):
for i, account in enumerate(self.accounts):
if account['user_id'] == user_id:
self.accounts[i].update(updates)
self._save_accounts()
return True
return False
Loading