diff options
Diffstat (limited to 'rag/rag.go')
| -rw-r--r-- | rag/rag.go | 26 |
1 files changed, 2 insertions, 24 deletions
@@ -19,10 +19,7 @@ import ( "github.com/neurosnap/sentences/english" ) -const ( - // batchTimeout is the maximum time allowed for embedding a single batch - batchTimeout = 2 * time.Minute -) +const () var ( // Status messages for TUI integration @@ -102,10 +99,6 @@ func New(l *slog.Logger, s storage.FullRepo, cfg *config.Config) (*RAG, error) { return rag, nil } -func wordCounter(sentence string) int { - return len(strings.Split(strings.TrimSpace(sentence), " ")) -} - func createChunks(sentences []string, wordLimit, overlapWords uint32) []string { if len(sentences) == 0 { return nil @@ -181,7 +174,6 @@ func (r *RAG) LoadRAG(fpath string) error { func (r *RAG) LoadRAGWithContext(ctx context.Context, fpath string) error { r.mu.Lock() defer r.mu.Unlock() - fileText, err := ExtractText(fpath) if err != nil { return err @@ -190,7 +182,6 @@ func (r *RAG) LoadRAGWithContext(ctx context.Context, fpath string) error { // Send initial status (non-blocking with retry) r.sendStatusNonBlocking(LoadedFileRAGStatus) - tokenizer, err := english.NewSentenceTokenizer(nil) if err != nil { return err @@ -210,7 +201,6 @@ func (r *RAG) LoadRAGWithContext(ctx context.Context, fpath string) error { if len(paragraphs) == 0 { return errors.New("no valid paragraphs found in file") } - totalBatches := (len(paragraphs) + r.cfg.RAGBatchSize - 1) / r.cfg.RAGBatchSize r.logger.Debug("starting parallel embedding", "total_batches", totalBatches, "batch_size", r.cfg.RAGBatchSize) @@ -223,7 +213,7 @@ func (r *RAG) LoadRAGWithContext(ctx context.Context, fpath string) error { concurrency = 1 } // If using ONNX embedder, limit concurrency to 1 due to mutex serialization - isONNX := false + var isONNX bool if _, isONNX = r.embedder.(*ONNXEmbedder); isONNX { concurrency = 1 } @@ -258,7 +248,6 @@ func (r *RAG) LoadRAGWithContext(ctx context.Context, fpath string) error { // Ensure task channel is closed when this goroutine exits defer close(taskCh) r.logger.Debug("task distributor started", "total_batches", totalBatches) - for i := 0; i < totalBatches; i++ { start := i * r.cfg.RAGBatchSize end := start + r.cfg.RAGBatchSize @@ -304,7 +293,6 @@ func (r *RAG) LoadRAGWithContext(ctx context.Context, fpath string) error { resultsBuffer := make(map[int]batchResult) filename := path.Base(fpath) batchesProcessed := 0 - for { select { case <-ctx.Done(): @@ -382,7 +370,6 @@ func (r *RAG) LoadRAGWithContext(ctx context.Context, fpath string) error { break } } - r.logger.Debug("finished writing vectors", "batches", batchesProcessed) r.resetIdleTimer() r.sendStatusNonBlocking(FinishedRAGStatus) @@ -406,7 +393,6 @@ func (r *RAG) embeddingWorker(ctx context.Context, workerID int, taskCh <-chan b } } }() - for task := range taskCh { select { case <-ctx.Done(): @@ -432,7 +418,6 @@ func (r *RAG) embeddingWorker(ctx context.Context, workerID int, taskCh <-chan b r.logger.Debug("worker sent empty batch", "worker", workerID, "batch", task.batchIndex) continue } - // Embed with retry for API embedder embeddings, err := r.embedWithRetry(ctx, task.paragraphs, 3) if err != nil { @@ -444,7 +429,6 @@ func (r *RAG) embeddingWorker(ctx context.Context, workerID int, taskCh <-chan b } return } - // Send result with context awareness select { case resultCh <- batchResult{ @@ -465,7 +449,6 @@ func (r *RAG) embeddingWorker(ctx context.Context, workerID int, taskCh <-chan b // embedWithRetry attempts embedding with exponential backoff for API embedder func (r *RAG) embedWithRetry(ctx context.Context, paragraphs []string, maxRetries int) ([][]float32, error) { var lastErr error - for attempt := 0; attempt < maxRetries; attempt++ { if attempt > 0 { // Exponential backoff @@ -473,13 +456,11 @@ func (r *RAG) embedWithRetry(ctx context.Context, paragraphs []string, maxRetrie if backoff > 10*time.Second { backoff = 10 * time.Second } - select { case <-time.After(backoff): case <-ctx.Done(): return nil, ctx.Err() } - r.logger.Debug("retrying embedding", "attempt", attempt, "max_retries", maxRetries) } @@ -499,7 +480,6 @@ func (r *RAG) embedWithRetry(ctx context.Context, paragraphs []string, maxRetrie break } } - return nil, fmt.Errorf("embedding failed after %d attempts: %w", maxRetries, lastErr) } @@ -509,7 +489,6 @@ func (r *RAG) writeBatchToStorage(ctx context.Context, result batchResult, filen // Empty batch, skip return nil } - // Check context before starting select { case <-ctx.Done(): @@ -534,7 +513,6 @@ func (r *RAG) writeBatchToStorage(ctx context.Context, result batchResult, filen r.sendStatusNonBlocking(ErrRAGStatus) return fmt.Errorf("failed to write vectors batch: %w", err) } - r.logger.Debug("wrote batch to db", "batch", result.batchIndex+1, "size", len(result.paragraphs)) return nil } |
