diff options
Diffstat (limited to 'bot.go')
| -rw-r--r-- | bot.go | 51 |
1 files changed, 44 insertions, 7 deletions
@@ -46,6 +46,7 @@ var ( ragger *rag.RAG chunkParser ChunkParser lastToolCall *models.FuncCall + lastRespStats *models.ResponseStats //nolint:unused // TTS_ENABLED conditionally uses this orator Orator asr STT @@ -532,6 +533,19 @@ func extractDetailedErrorFromBytes(body []byte, statusCode int) string { return fmt.Sprintf("HTTP Status: %d, Response Body: %s", statusCode, string(body)) } +func finalizeRespStats(tokenCount int, startTime time.Time) { + duration := time.Since(startTime).Seconds() + var tps float64 + if duration > 0 { + tps = float64(tokenCount) / duration + } + lastRespStats = &models.ResponseStats{ + Tokens: tokenCount, + Duration: duration, + TokensPerSec: tps, + } +} + // sendMsgToLLM expects streaming resp func sendMsgToLLM(body io.Reader) { choseChunkParser() @@ -590,6 +604,8 @@ func sendMsgToLLM(body io.Reader) { defer resp.Body.Close() reader := bufio.NewReader(resp.Body) counter := uint32(0) + tokenCount := 0 + startTime := time.Now() hasReasoning := false reasoningSent := false for { @@ -601,6 +617,7 @@ func sendMsgToLLM(body io.Reader) { // to stop from spiriling in infinity read of bad bytes that happens with poor connection if cfg.ChunkLimit > 0 && counter > cfg.ChunkLimit { logger.Warn("response hit chunk limit", "limit", cfg.ChunkLimit) + finalizeRespStats(tokenCount, startTime) streamDone <- true break } @@ -624,6 +641,7 @@ func sendMsgToLLM(body io.Reader) { logger.Error("failed to notify", "error", err) } } + finalizeRespStats(tokenCount, startTime) streamDone <- true break // } @@ -639,6 +657,7 @@ func sendMsgToLLM(body io.Reader) { line = line[6:] logger.Debug("debugging resp", "line", string(line)) if bytes.Equal(line, []byte("[DONE]\n")) { + finalizeRespStats(tokenCount, startTime) streamDone <- true break } @@ -652,6 +671,7 @@ func sendMsgToLLM(body io.Reader) { if err := notifyUser("LLM Response Error", "Failed to parse LLM response: "+err.Error()); err != nil { logger.Error("failed to notify user", "error", err) } + finalizeRespStats(tokenCount, startTime) streamDone <- true break } @@ -667,12 +687,15 @@ func sendMsgToLLM(body io.Reader) { // Close the thinking block if we were streaming reasoning and haven't closed it yet if hasReasoning && !reasoningSent { chunkChan <- "</think>" + tokenCount++ } if chunk.Chunk != "" { logger.Warn("text inside of finish llmchunk", "chunk", chunk, "counter", counter) answerText = strings.ReplaceAll(chunk.Chunk, "\n\n", "\n") chunkChan <- answerText + tokenCount++ } + finalizeRespStats(tokenCount, startTime) streamDone <- true break } @@ -684,12 +707,14 @@ func sendMsgToLLM(body io.Reader) { if !hasReasoning { // First reasoning chunk - send opening tag chunkChan <- "<think>" + tokenCount++ hasReasoning = true } // Stream reasoning content immediately answerText = strings.ReplaceAll(chunk.Reasoning, "\n\n", "\n") if answerText != "" { chunkChan <- answerText + tokenCount++ } } @@ -697,6 +722,7 @@ func sendMsgToLLM(body io.Reader) { if chunk.Chunk != "" && hasReasoning && !reasoningSent { // Close the thinking block before sending actual content chunkChan <- "</think>" + tokenCount++ reasoningSent = true } @@ -708,10 +734,12 @@ func sendMsgToLLM(body io.Reader) { if chunkParser.GetAPIType() == models.APITypeCompletion && slices.Contains(stopStrings, answerText) { logger.Debug("stop string detected on client side for completion endpoint", "stop_string", answerText) + finalizeRespStats(tokenCount, startTime) streamDone <- true } if answerText != "" { chunkChan <- answerText + tokenCount++ } openAIToolChan <- chunk.ToolChunk if chunk.FuncName != "" { @@ -723,6 +751,7 @@ func sendMsgToLLM(body io.Reader) { if interruptResp { // read bytes, so it would not get into beginning of the next req interruptResp = false logger.Info("interrupted bot response", "chunk_counter", counter) + finalizeRespStats(tokenCount, startTime) streamDone <- true break } @@ -914,7 +943,6 @@ out: textView.ScrollToEnd() } case <-streamDone: - // drain any remaining chunks from chunkChan before exiting for len(chunkChan) > 0 { chunk := <-chunkChan fmt.Fprint(textView, chunk) @@ -923,31 +951,40 @@ out: textView.ScrollToEnd() } if cfg.TTS_ENABLED { - // Send chunk to audio stream handler TTSTextChan <- chunk } } if cfg.TTS_ENABLED { - // msg is done; flush it down TTSFlushChan <- true } break out } } + var msgStats *models.ResponseStats + if lastRespStats != nil { + msgStats = &models.ResponseStats{ + Tokens: lastRespStats.Tokens, + Duration: lastRespStats.Duration, + TokensPerSec: lastRespStats.TokensPerSec, + } + lastRespStats = nil + } botRespMode = false - // numbers in chatbody and displayed must be the same if r.Resume { chatBody.Messages[len(chatBody.Messages)-1].Content += respText.String() - // lastM.Content = lastM.Content + respText.String() - // Process the updated message to check for known_to tags in resumed response updatedMsg := chatBody.Messages[len(chatBody.Messages)-1] processedMsg := processMessageTag(&updatedMsg) chatBody.Messages[len(chatBody.Messages)-1] = *processedMsg + if msgStats != nil && chatBody.Messages[len(chatBody.Messages)-1].Role != cfg.ToolRole { + chatBody.Messages[len(chatBody.Messages)-1].Stats = msgStats + } } else { - // Message was already added at the start, just process it for known_to tags chatBody.Messages[msgIdx].Content = respText.String() processedMsg := processMessageTag(&chatBody.Messages[msgIdx]) chatBody.Messages[msgIdx] = *processedMsg + if msgStats != nil && chatBody.Messages[msgIdx].Role != cfg.ToolRole { + chatBody.Messages[msgIdx].Stats = msgStats + } stopTTSIfNotForUser(&chatBody.Messages[msgIdx]) } cleanChatBody() |
