package uploader import ( "bufio" "context" "dypid-client/api" "fmt" "net/http" "os" "path/filepath" "strings" "sync" "time" "github.com/spf13/viper" "golang.org/x/sync/errgroup" ) var httpClient = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 200, MaxIdleConnsPerHost: 100, IdleConnTimeout: 30 * time.Second, }, Timeout: 30 * time.Second, } type Task struct { FilePath string FileLines int } func StartLooking(lookingPath string) { //检测./ fmt.Println("程序启动成功,正在检测txt文件") for { files, err := getTxtFiles("./") if err != nil { fmt.Println(err) continue } if files != nil { start := time.Now() fileLines := make(map[string]int) fmt.Println("正在统计", len(files), "个文件行数") for _, filePath := range files { file, err := os.Open(filePath) if err != nil { fmt.Println("打开文件失败:", err) } // 使用 bufio.Scanner 逐行读取 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lineCount++ } file.Close() if lineCount == 0 { continue } fileLines[filepath.Base(filePath)] = lineCount fmt.Println(filepath.Base(filePath), "文件行数:", lineCount) } var tasks []Task for k, v := range fileLines { tasks = append(tasks, Task{FilePath: k, FileLines: v}) } // 使用errgroup控制并发 g, ctx := errgroup.WithContext(context.Background()) g.SetLimit(50) // 设置最大并发数为50 // 执行所有任务 for _, task := range tasks { task := task // 创建局部变量 g.Go(func() error { select { case <-ctx.Done(): return ctx.Err() default: fmt.Println("正在上传文件:", filepath.Base(task.FilePath)) processFile(task.FilePath, task.FileLines) err := os.Truncate(task.FilePath, 0) if err != nil { fmt.Println("清空文件失败:", err) } return nil } }) } // 等待所有任务完成 if err := g.Wait(); err != nil { fmt.Printf("任务执行出错: %v\n", err) } else { fmt.Println("所有任务执行完成!") } fmt.Printf("上传完成,耗时:%s\n", time.Since(start)) } time.Sleep(time.Minute) } } // 获取目录中的所有txt文件 func getTxtFiles(dir string) (txtFiles []string, err error) { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 只处理普通文件,跳过目录 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { txtFiles = append(txtFiles, path) } return nil }) return txtFiles, err } func processFile(filePath string, fileLines int) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) if err != nil { fmt.Printf("无法打开文件 %s: %v\n", filePath, err) return } defer file.Close() // 创建行通道 lines := make(chan string, 100) // 创建10个worker处理文件上传 for i := 0; i < viper.GetInt("thread-count"); i++ { wg.Add(1) go func(workerID int) { defer wg.Done() processLines(lines, workerID, filePath) }(i) } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lines <- scanner.Text() lineCount++ if lineCount%10000 == 0 { fmt.Printf("文件【%s】处理进度:%.2f%%\n", filePath, float64(lineCount)/float64(fileLines)*100) } } close(lines) wg.Wait() if err := scanner.Err(); err != nil { fmt.Printf("读取文件 %s 错误: %v\n", filePath, err) return } fmt.Printf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount) } func processLines(lines <-chan string, workerID int, filePath string) { for line := range lines { // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 上传数据 if err := api.UploadDataToServer(httpClient, line); err != nil { fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err) } } }