diff --git a/go.mod b/go.mod index f1dd0cf..e0e9580 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( golang.org/x/arch v0.18.0 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/net v0.41.0 // indirect + golang.org/x/sync v0.17.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.26.0 // indirect google.golang.org/protobuf v1.36.6 // indirect diff --git a/go.sum b/go.sum index 0360803..ad46009 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= diff --git a/tool/update-dypid.go b/tool/update-dypid.go index 2cd867a..bcda6d0 100644 --- a/tool/update-dypid.go +++ b/tool/update-dypid.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "context" "fmt" "io" "net/http" @@ -14,6 +15,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/spf13/viper" + "golang.org/x/sync/errgroup" ) var httpClient = &http.Client{ @@ -25,6 +27,11 @@ var httpClient = &http.Client{ Timeout: 30 * time.Second, } +type Task struct { + FilePath string + FileLines int +} + func main() { initConfig() @@ -39,20 +46,61 @@ func main() { if files != nil { start := time.Now() - wg := sync.WaitGroup{} + fileLines := make(map[string]int) + fmt.Println("正在统计", len(files), "个文件行数") for _, filePath := range files { - fmt.Println("正在上传文件:", filePath) - wg.Add(1) - go func() { - processFile(filePath) - err := os.Truncate(filePath, 0) - if err != nil { - fmt.Println("清空文件失败:", err) - } - wg.Done() - }() + + file, err := os.Open(filePath) + if err != nil { + fmt.Println("打开文件失败:", err) + } + defer file.Close() + // 使用 bufio.Scanner 逐行读取 + scanner := bufio.NewScanner(file) + lineCount := 0 + for scanner.Scan() { + lineCount++ + } + if lineCount == 0 { + return + } + fileLines[filepath.Base(filePath)] = lineCount + fmt.Println(filepath.Base(filePath), "文件行数:", lineCount) + } + + var tasks []Task + for k, v := range fileLines { + tasks = append(tasks, Task{FilePath: k, FileLines: v}) + } + + // 使用errgroup控制并发 + g, ctx := errgroup.WithContext(context.Background()) + g.SetLimit(50) // 设置最大并发数为50 + // 执行所有任务 + for _, task := range tasks { + task := task // 创建局部变量 + g.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + fmt.Println("正在上传文件:", filepath.Base(task.FilePath)) + processFile(task.FilePath, task.FileLines) + err := os.Truncate(task.FilePath, 0) + if err != nil { + fmt.Println("清空文件失败:", err) + } + return nil + } + }) + } + + // 等待所有任务完成 + if err := g.Wait(); err != nil { + fmt.Printf("任务执行出错: %v\n", err) + } else { + fmt.Println("所有任务执行完成!") } - wg.Wait() fmt.Printf("上传完成,耗时:%s\n", time.Since(start)) } @@ -120,7 +168,7 @@ func getTxtFiles(dir string) (txtFiles []string, err error) { return txtFiles, err } -func processFile(filePath string) { +func processFile(filePath string, fileLines int) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) @@ -146,11 +194,10 @@ func processFile(filePath string) { scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { - line := scanner.Text() - lines <- line + lines <- scanner.Text() lineCount++ if lineCount%10000 == 0 { - fmt.Printf("文件【%s】处理进度:%v%%\n", filePath, float64(lineCount)/40000*100) + fmt.Printf("文件【%s】处理进度:%.2f%%\n", filePath, float64(lineCount)/float64(fileLines)*100) } }