package main import ( "bufio" "fmt" "io" "net/http" "net/url" "os" "path/filepath" "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/spf13/viper" ) var httpClient = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 200, MaxIdleConnsPerHost: 100, IdleConnTimeout: 30 * time.Second, }, Timeout: 30 * time.Second, } func main() { initConfig() //检测./upload fmt.Println("程序启动成功,正在检测txt文件") for { files, err := getTxtFiles("./") if err != nil { fmt.Println(err) return } if files != nil { start := time.Now() wg := sync.WaitGroup{} for _, filePath := range files { fmt.Println("正在上传文件:", filePath) wg.Add(1) go func() { processFile(filePath) err := os.Truncate(filePath, 0) if err != nil { fmt.Println("清空文件失败:", err) } wg.Done() }() } wg.Wait() fmt.Printf("上传完成,耗时:%s\n", time.Since(start)) } time.Sleep(time.Minute) } } func initConfig() { //程序配置 viper.SetDefault("url", "http://localhost:8080") viper.SetDefault("token", "") viper.SetDefault("thread-count", 10) //设置配置文件名和路径 ./config.toml viper.AddConfigPath(".") viper.SetConfigName("config") viper.SetConfigType("toml") viper.SafeWriteConfig() //安全写入默认配置 //读取配置文件 if err := viper.ReadInConfig(); err != nil { fmt.Errorf("无法读取配置文件: %w", err) } viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { if err := viper.ReadInConfig(); err != nil { fmt.Errorf("无法读取配置文件: %w", err) } }) } func uploadDataToServer(data string) error { params := url.Values{} params.Set("token", viper.GetString("token")) params.Set("data", data) resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader("")) if err != nil { return err } if resp != nil { _, _ = io.Copy(io.Discard, resp.Body) } return err } // 获取目录中的所有txt文件 func getTxtFiles(dir string) (txtFiles []string, err error) { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 只处理普通文件,跳过目录 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { txtFiles = append(txtFiles, path) } return nil }) return txtFiles, err } func processFile(filePath string) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) if err != nil { fmt.Printf("无法打开文件 %s: %v\n", filePath, err) return } defer file.Close() // 创建行通道 lines := make(chan string, 100) // 创建10个worker处理文件上传 for i := 0; i < viper.GetInt("thread-count"); i++ { wg.Add(1) go func(workerID int) { defer wg.Done() processLines(lines, workerID, filePath) }(i) } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { line := scanner.Text() lines <- line lineCount++ if lineCount%10000 == 0 { fmt.Printf("文件【%s】处理进度:%v%%\n", filePath, float64(lineCount)/40000*100) } } close(lines) wg.Wait() if err := scanner.Err(); err != nil { fmt.Printf("读取文件 %s 错误: %v\n", filePath, err) return } fmt.Printf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount) } func processLines(lines <-chan string, workerID int, filePath string) { for line := range lines { // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 上传数据 if err := uploadDataToServer(line); err != nil { fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err) } } }