// Package uploader 上传数据 package uploader import ( "bufio" "context" "dypid-client/internal/api" "dypid-client/internal/config" "fmt" "os" "path/filepath" "runtime" "strconv" "strings" "sync" "sync/atomic" "time" wailsruntime "github.com/wailsapp/wails/v2/pkg/runtime" "golang.org/x/sync/errgroup" ) var progress sync.Map type Task struct { FilePath string FileLines int } type Progress struct { FileName string `json:"name"` Total int `json:"total"` Uploaded int `json:"uploaded"` Percentage int `json:"percentage"` } func StartLooking(ctx context.Context, logChan *chan string, lookingPath string) { AddLog(logChan, "===============================================") AddLog(logChan, `服务器: `+config.APPConfig.Url) AddLog(logChan, `Token: `+config.APPConfig.Token) AddLog(logChan, `检测目录: `+config.APPConfig.CheckDir) AddLog(logChan, `同时处理文件数: `+strconv.Itoa(config.APPConfig.HandleFileCount)) AddLog(logChan, `单文件上传线程: `+strconv.Itoa(config.APPConfig.ThreadCount)) AddLog(logChan, "===============================================") AddLog(logChan, "正在创建连接池(连接池可避免首次大量上传时出现网络错误)") api.InitConn() AddLog(logChan, "创建连接池完成,开始运行程序") progress.Clear() //推送上传进度 go func() { for { select { case <-ctx.Done(): return default: time.Sleep(250 * time.Millisecond) var pg []Progress progress.Range(func(key, value any) bool { p := value.(Progress) pg = append(pg, p) return true }) wailsruntime.EventsEmit(ctx, "progress", pg) } } }() for { uploadData(ctx, logChan, lookingPath) select { case <-time.After(time.Minute): case <-ctx.Done(): return } } } func uploadData(ctx context.Context, logChan *chan string, lookingPath string) { var path = "./" if lookingPath != "" { path = lookingPath } //获取文件列表 files, err := getTxtFiles(path) if err != nil { AddLog(logChan, "获取文件列表失败:"+err.Error()) return } if files == nil { return } start := time.Now() //检测到文件 //统计文件行数 fileLines := make(map[string]int) AddLog(logChan, fmt.Sprintf("正在统计 %v 个文件行数", len(files))) isAllEmpty := true for _, filePath := range files { select { case <-ctx.Done(): return default: file, err := os.Open(filePath) if err != nil { AddLog(logChan, "打开文件失败:"+err.Error()) } // 使用 bufio.Scanner 逐行读取 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lineCount++ } err = file.Close() if lineCount == 0 { continue } fileLines[filepath.Base(filePath)] = lineCount isAllEmpty = false AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) } } if isAllEmpty { AddLog(logChan, "所有文件都为空,不进行上传") return } progress.Clear() //添加文件上传任务参数(文件路径,文件行数) var tasks []Task for fileName, lines := range fileLines { tasks = append(tasks, Task{FilePath: path + "/" + fileName, FileLines: lines}) progress.Store(fileName, Progress{FileName: fileName, Total: lines, Uploaded: 0, Percentage: 0}) } // 使用 errgroup 控制同时处理的文件数,并开始上传文件任务 g, egctx := errgroup.WithContext(ctx) g.SetLimit(config.APPConfig.HandleFileCount) // 设置同时处理文件数 // 执行所有任务 for _, task := range tasks { select { case <-egctx.Done(): return default: g.Go(func() error { select { case <-egctx.Done(): return egctx.Err() default: AddLog(logChan, "正在上传文件:"+filepath.Base(task.FilePath)) processFile(egctx, logChan, task.FilePath, task.FileLines) select { case <-egctx.Done(): return egctx.Err() default: //上传完成,清空文件 err := os.Truncate(task.FilePath, 0) if err != nil { AddLog(logChan, "清空文件失败:"+err.Error()) } return nil } } }) } } select { case <-ctx.Done(): return default: // 等待所有任务完成 if err := g.Wait(); err != nil { AddLog(logChan, fmt.Sprintf("任务执行出错: %v", err)) } else { AddLog(logChan, "所有任务执行完成!") } AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String())) } } // 获取目录中的所有txt文件 func getTxtFiles(dir string) (txtFiles []string, err error) { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 只处理普通文件,跳过目录 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { txtFiles = append(txtFiles, path) } return nil }) return txtFiles, err } func processFile(ctx context.Context, logChan *chan string, filePath string, fileLines int) { // 打开文件 file, err := os.Open(filePath) if err != nil { AddLog(logChan, fmt.Sprintf("[processFile] 无法打开文件 %s: %v", filePath, err)) return } defer file.Close() // 创建行通道 lines := make(chan string, 200) var countLine int32 = 0 // 创建指定个worker同时处理文件上传 for i := 0; i < config.APPConfig.ThreadCount; i++ { select { case <-ctx.Done(): return default: go func() { processLines(ctx, logChan, &lines, i, filePath, &countLine) }() } } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) go func() { defer func() { if r := recover(); r != nil { _, f, l, _ := runtime.Caller(0) fmt.Println("panic:", f+":"+strconv.Itoa(l), r) } }() for scanner.Scan() { select { case <-ctx.Done(): return default: lines <- scanner.Text() } } }() // 等待所有行处理完成并推送进度 for int(countLine) != fileLines { select { case <-ctx.Done(): close(lines) //关闭processLines中的上传线程 return default: progress.Store(filepath.Base(filePath), Progress{ FileName: filepath.Base(filePath), Total: fileLines, Uploaded: int(countLine), Percentage: int(float64(countLine) / float64(fileLines) * 100), }, ) } } //上传完成,进度设为100 progress.Store(filepath.Base(filePath), Progress{ FileName: filepath.Base(filePath), Total: fileLines, Uploaded: int(countLine), Percentage: 100, }, ) close(lines) //关闭processLines中的上传线程 if err := scanner.Err(); err != nil { AddLog(logChan, fmt.Sprintf("读取文件 %s 错误: %v", filePath, err)) return } AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filepath.Base(filePath), countLine)) } // processLines 处理接受到的每一行数据并上传(chan 管道接受数据) func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) { for line := range *lines { select { case <-ctx.Done(): return default: // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 上传数据 if err := api.UploadDataToServer(ctx, line); err != nil { AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err)) } atomic.AddInt32(countLine, 1) } } } // AddLog 添加日志 func AddLog(logChan *chan string, message string) { *logChan <- message }