package main import ( "bufio" "fmt" "io" "net/http" "net/url" "os" "path/filepath" "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/spf13/viper" ) type UploadData struct { Dyid string // dyid Uid string // uid Secuid string // secuid Pid string // pid CommentId string // comment_id Id1 string // id1 Id2 string // id2 Id3 string // id3 Id4 string // id4 Id5 string // id5 } var httpClient = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 10, IdleConnTimeout: 30 * time.Second, }, Timeout: 30 * time.Second, } func main() { //程序配置 //设置默认值 viper.SetDefault("url", "http://localhost:8080") viper.SetDefault("token", "") viper.SetDefault("thread-count", 10) //设置配置文件名和路径 ./config.toml viper.AddConfigPath(".") viper.SetConfigName("config") viper.SetConfigType("toml") viper.SafeWriteConfig() //安全写入默认配置 //读取配置文件 if err := viper.ReadInConfig(); err != nil { fmt.Errorf("无法读取配置文件: %w", err) } viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { if err := viper.ReadInConfig(); err != nil { fmt.Errorf("无法读取配置文件: %w", err) } }) //检测./upload fmt.Println("程序启动成功,正在检测upload") os.Mkdir("./upload", os.ModePerm) for { files, err := getTxtFiles("./upload") if err != nil { fmt.Println(err) return } if files != nil { wg := sync.WaitGroup{} for _, filePath := range files { fmt.Println("正在上传文件:", filePath) wg.Add(1) go func() { processFile(filePath) os.Remove(filePath) wg.Done() }() } wg.Wait() } time.Sleep(2 * time.Second) } } func uploadDataToServer(data UploadData) error { params := url.Values{} params.Add("token", viper.GetString("token")) params.Add("dyid", data.Dyid) params.Add("uid", data.Uid) params.Add("secuid", data.Secuid) params.Add("pid", data.Pid) params.Add("comment_id", data.CommentId) params.Add("id1", data.Id1) params.Add("id2", data.Id2) params.Add("id3", data.Id3) params.Add("id4", data.Id4) params.Add("id5", data.Id5) resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader("")) _, _ = io.Copy(io.Discard, resp.Body) return err } // 获取目录中的所有txt文件 func getTxtFiles(dir string) ([]string, error) { var txtFiles []string err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 只处理普通文件,跳过目录 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { txtFiles = append(txtFiles, path) } return nil }) return txtFiles, err } func processFile(filePath string) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) if err != nil { fmt.Printf("无法打开文件 %s: %v\n", filePath, err) return } defer file.Close() // 创建行通道 lines := make(chan string, 100) // 创建10个worker处理文件上传 for i := 0; i < viper.GetInt("thread-count"); i++ { wg.Add(1) go func(workerID int) { defer wg.Done() processLines(lines, workerID, filePath) }(i) } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { line := scanner.Text() lines <- line lineCount++ } close(lines) wg.Wait() if err := scanner.Err(); err != nil { fmt.Printf("读取文件 %s 错误: %v\n", filePath, err) } fmt.Printf("文件 %s 处理完成,共处理 %d 行数据\n", filePath, lineCount) } func processLines(lines <-chan string, workerID int, filePath string) { for line := range lines { // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 使用----分割行数据 parts := strings.Split(line, "----") for i := 0; i < 10; i++ { parts = append(parts, "") } // 确保有足够的字段 if len(parts) < 3 { fmt.Printf("Worker %d (文件 %s): 行数据字段不足,跳过: %s\n", workerID, filePath, line) continue } // 创建上传数据结构 uploadData := UploadData{ Dyid: parts[0], Uid: parts[1], Secuid: parts[2], Pid: parts[3], CommentId: parts[4], Id1: parts[5], Id2: parts[6], Id3: parts[7], Id4: parts[8], Id5: parts[9], } // 上传数据 if err := uploadDataToServer(uploadData); err != nil { fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err) } // 添加短暂延迟避免服务器过载 //time.Sleep(10 * time.Millisecond) } }