package main import ( "bufio" "context" "fmt" "io" "net/http" "net/url" "os" "path/filepath" "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/spf13/viper" "golang.org/x/sync/errgroup" ) var httpClient = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 200, MaxIdleConnsPerHost: 100, IdleConnTimeout: 30 * time.Second, }, Timeout: 30 * time.Second, } type Task struct { FilePath string FileLines int } func main() { initConfig() //检测./upload fmt.Println("程序启动成功,正在检测txt文件") for { files, err := getTxtFiles("./") if err != nil { fmt.Println(err) return } if files != nil { start := time.Now() fileLines := make(map[string]int) fmt.Println("正在统计", len(files), "个文件行数") for _, filePath := range files { file, err := os.Open(filePath) if err != nil { fmt.Println("打开文件失败:", err) } defer file.Close() // 使用 bufio.Scanner 逐行读取 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lineCount++ } if lineCount == 0 { return } fileLines[filepath.Base(filePath)] = lineCount fmt.Println(filepath.Base(filePath), "文件行数:", lineCount) } var tasks []Task for k, v := range fileLines { tasks = append(tasks, Task{FilePath: k, FileLines: v}) } // 使用errgroup控制并发 g, ctx := errgroup.WithContext(context.Background()) g.SetLimit(50) // 设置最大并发数为50 // 执行所有任务 for _, task := range tasks { task := task // 创建局部变量 g.Go(func() error { select { case <-ctx.Done(): return ctx.Err() default: fmt.Println("正在上传文件:", filepath.Base(task.FilePath)) processFile(task.FilePath, task.FileLines) err := os.Truncate(task.FilePath, 0) if err != nil { fmt.Println("清空文件失败:", err) } return nil } }) } // 等待所有任务完成 if err := g.Wait(); err != nil { fmt.Printf("任务执行出错: %v\n", err) } else { fmt.Println("所有任务执行完成!") } fmt.Printf("上传完成,耗时:%s\n", time.Since(start)) } time.Sleep(time.Minute) } } func initConfig() { //程序配置 viper.SetDefault("url", "http://localhost:8080") viper.SetDefault("token", "") viper.SetDefault("thread-count", 10) //设置配置文件名和路径 ./config.toml viper.AddConfigPath(".") viper.SetConfigName("config") viper.SetConfigType("toml") viper.SafeWriteConfig() //安全写入默认配置 //读取配置文件 if err := viper.ReadInConfig(); err != nil { fmt.Errorf("无法读取配置文件: %w", err) } viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { if err := viper.ReadInConfig(); err != nil { fmt.Errorf("无法读取配置文件: %w", err) } }) } func uploadDataToServer(data string) error { params := url.Values{} params.Set("token", viper.GetString("token")) params.Set("data", data) resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader("")) if err != nil { return err } if resp != nil { _, _ = io.Copy(io.Discard, resp.Body) } return err } // 获取目录中的所有txt文件 func getTxtFiles(dir string) (txtFiles []string, err error) { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 只处理普通文件,跳过目录 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { txtFiles = append(txtFiles, path) } return nil }) return txtFiles, err } func processFile(filePath string, fileLines int) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) if err != nil { fmt.Printf("无法打开文件 %s: %v\n", filePath, err) return } defer file.Close() // 创建行通道 lines := make(chan string, 100) // 创建10个worker处理文件上传 for i := 0; i < viper.GetInt("thread-count"); i++ { wg.Add(1) go func(workerID int) { defer wg.Done() processLines(lines, workerID, filePath) }(i) } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lines <- scanner.Text() lineCount++ if lineCount%10000 == 0 { fmt.Printf("文件【%s】处理进度:%.2f%%\n", filePath, float64(lineCount)/float64(fileLines)*100) } } close(lines) wg.Wait() if err := scanner.Err(); err != nil { fmt.Printf("读取文件 %s 错误: %v\n", filePath, err) return } fmt.Printf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount) } func processLines(lines <-chan string, workerID int, filePath string) { for line := range lines { // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 上传数据 if err := uploadDataToServer(line); err != nil { fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err) } } }