Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 61 additions & 57 deletions gpt_term/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def toggle_multi_line_mode(cls):


class ChatGPT:
def __init__(self, api_key: str, timeout: float):
def __init__(self, api_key: str, timeout: float = 30):
self.api_key = api_key
self.endpoint = "https://api.openai.com/v1/chat/completions"
self.headers = {
Expand Down Expand Up @@ -133,9 +133,8 @@ def add_total_tokens(self, tokens: int):

def send_request(self, data):
try:
with console.status(_("gpt_term.ChatGPT_thinking")):
response = requests.post(
self.endpoint, headers=self.headers, data=json.dumps(data), timeout=self.timeout, stream=ChatMode.stream_mode)
response = requests.post(
self.endpoint, headers=self.headers, data=json.dumps(data), timeout=self.timeout, stream=ChatMode.stream_mode)
# 匹配4xx错误,显示服务器返回的具体原因
if response.status_code // 100 == 4:
error_msg = response.json()['error']['message']
Expand Down Expand Up @@ -180,68 +179,42 @@ def send_request_silent(self, data):
return None

def process_stream_response(self, response: requests.Response):
reply: str = ""
client = sseclient.SSEClient(response)
with Live(console=console, auto_refresh=False, vertical_overflow=self.stream_overflow) as live:
for event in client.events():
if event.data == '[DONE]':
# finish_reason = part["choices"][0]['finish_reason']
yield '\n'
break
part = json.loads(event.data)
if "content" in part["choices"][0]["delta"]:
content = part["choices"][0]["delta"]["content"]
yield content

def process_response(self, response: requests.Response):
if ChatMode.stream_mode:
reply_message = {'role': 'assistant', 'content': ""}
try:
rprint("[bold cyan]ChatGPT: ")
for event in client.events():
if event.data == '[DONE]':
# finish_reason = part["choices"][0]['finish_reason']
break
part = json.loads(event.data)
if "content" in part["choices"][0]["delta"]:
content = part["choices"][0]["delta"]["content"]
reply += content
if ChatMode.raw_mode:
rprint(content, end="", flush=True),
else:
live.update(Markdown(reply), refresh=True)
if ChatMode.raw_mode:
for result in self.process_stream_response(response):
reply_message['content'] += result
rprint(result, end="", flush=True)
else:
with Live(console=console, auto_refresh=False, vertical_overflow=self.stream_overflow) as live:
rprint("[bold cyan]ChatGPT: ")
for result in self.process_stream_response(response):
reply_message['content'] += result
live.update(Markdown(reply_message['content']), refresh=True)
except KeyboardInterrupt:
live.stop()
console.print(_('gpt_term.Aborted'))
finally:
return {'role': 'assistant', 'content': reply}

def process_response(self, response: requests.Response):
if ChatMode.stream_mode:
return self.process_stream_response(response)
return reply_message
else:
response_json = response.json()
log.debug(f"Response: {response_json}")
reply_message: Dict[str, str] = response_json["choices"][0]["message"]
reply_message = response_json["choices"][0]["message"]
print_message(reply_message)
return reply_message

def delete_first_conversation(self):
if len(self.messages) >= 3:
question = self.messages[1]
del self.messages[1]
if self.messages[1]['role'] == "assistant":
# 如果第二个信息是回答才删除
del self.messages[1]
truncated_question = question['content'].split('\n')[0]
if len(question['content']) > len(truncated_question):
truncated_question += "..."

# recount current tokens
new_tokens = count_token(self.messages)
tokens_saved = self.current_tokens - new_tokens
self.current_tokens = new_tokens

console.print(
_('gpt_term.delete_first_conversation_yes',truncated_question=truncated_question,tokens_saved=tokens_saved))
else:
console.print(_('gpt_term.delete_first_conversation_no'))

def delete_all_conversation(self):
del self.messages[1:]
self.title = None
# recount current tokens
self.current_tokens = count_token(self.messages)
os.system('cls' if os.name == 'nt' else 'clear')
console.print(_('gpt_term.delete_all'))

def handle(self, message: str):
try:
self.messages.append({"role": "user", "content": message})
Expand All @@ -251,16 +224,18 @@ def handle(self, message: str):
"stream": ChatMode.stream_mode,
"temperature": self.temperature
}
response = self.send_request(data)
with console.status(_("gpt_term.ChatGPT_thinking")):
response = self.send_request(data)
if response is None:
# 如果没有得到回复(中断或失败)
self.messages.pop()
if self.current_tokens >= self.tokens_limit:
if confirm(_('gpt_term.tokens_reached')):
self.delete_first_conversation()
return

reply_message = self.process_response(response)
if reply_message is not None:
if reply_message:
log.info(f"ChatGPT: {reply_message['content']}")
self.messages.append(reply_message)
self.current_tokens = count_token(self.messages)
Expand All @@ -283,6 +258,35 @@ def handle(self, message: str):

return reply_message

def delete_first_conversation(self):
if len(self.messages) >= 3:
question = self.messages[1]
del self.messages[1]
if self.messages[1]['role'] == "assistant":
# 如果第二个信息是回答才删除
del self.messages[1]
truncated_question = question['content'].split('\n')[0]
if len(question['content']) > len(truncated_question):
truncated_question += "..."

# recount current tokens
new_tokens = count_token(self.messages)
tokens_saved = self.current_tokens - new_tokens
self.current_tokens = new_tokens

console.print(
_('gpt_term.delete_first_conversation_yes',truncated_question=truncated_question,tokens_saved=tokens_saved))
else:
console.print(_('gpt_term.delete_first_conversation_no'))

def delete_all_conversation(self):
del self.messages[1:]
self.title = None
# recount current tokens
self.current_tokens = count_token(self.messages)
os.system('cls' if os.name == 'nt' else 'clear')
console.print(_('gpt_term.delete_all'))

def gen_title(self, force: bool = False):
# Empty the title if there is only system message left
if len(self.messages) < 2:
Expand Down