From 08f3748a34fe4d7d42da4670c8c46b7b73fbb720 Mon Sep 17 00:00:00 2001 From: GrailFinder Date: Thu, 16 Feb 2023 14:45:10 +0300 Subject: Feat: attempt concurrency --- ffmpeg.go | 1 - main.go | 26 +++++++++++++++++++------- workers.go | 15 +++++++++++++++ 3 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 workers.go diff --git a/ffmpeg.go b/ffmpeg.go index 329ae01..1157707 100644 --- a/ffmpeg.go +++ b/ffmpeg.go @@ -16,6 +16,5 @@ func cutoutClipAndTranscode(ut *Utterance) error { "ar": "22050", "metadata": fmt.Sprintf(`source="%s"`, ut.FD.VttPath), }).OverWriteOutput().ErrorToStdOut().Run() - return err } diff --git a/main.go b/main.go index 13729b9..52db403 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "strings" ) -// TODO: to config file const ( subExt = ".vtt" outdir = "data/utterances" @@ -73,9 +72,10 @@ func linesToUtterances(lines []string, fd *FileData) []*Utterance { RightTime: strings.TrimSpace(splitted[1]), FD: fd, } - u.OutPath = fmt.Sprintf("%s/%s_%s_%s.opus", outdir, fd.AudioBase, + u.OutPath = fmt.Sprintf("%s/%s_%s_%s.wav", outdir, fd.AudioBase, u.LeftTime, u.RightTime) + // todo: compare and filter time if u.LeftTime == u.RightTime { continue } @@ -162,15 +162,27 @@ func main() { panic(err) } - filteredUtterances := []*Utterance{} + // filteredUtterances := []*Utterance{} + + // === concurrent === + queue := make(chan *Utterance, 1000) + done := make(chan bool, 1) + + workers := 10 + for i := 0; i < workers; i++ { + go worker(queue, i, done) + } for _, ut := range utterances { if _, err := os.Stat(ut.OutPath); os.IsNotExist(err) { - if err := cutoutClipAndTranscode(ut); err == nil { - filteredUtterances = append(filteredUtterances, ut) - } + queue <- ut + // if err := cutoutClipAndTranscode(ut); err == nil { + // filteredUtterances = append(filteredUtterances, ut) + // } } } - newMeta := utterancesToFileTextMap(filteredUtterances) + done <- true + close(done) + newMeta := utterancesToFileTextMap(utterances) writeCSV(mapToCSV(newMeta)) } diff --git a/workers.go b/workers.go new file mode 100644 index 0000000..bbd5223 --- /dev/null +++ b/workers.go @@ -0,0 +1,15 @@ +package main + +import "fmt" + +func worker(queue chan *Utterance, worknumber int, done chan bool) { + for { + select { + case ut := <-queue: + cutoutClipAndTranscode(ut) + case <-done: + fmt.Println("worker stoped, number", worknumber) + return + } + } +} -- cgit v1.2.3