// Package uploader 上传数据 package uploader import ( "bufio" "context" "dypid-client/internal/api" "dypid-client/internal/config" "fmt" "os" "path/filepath" "strings" "sync" "sync/atomic" "time" "github.com/wailsapp/wails/v2/pkg/runtime" "golang.org/x/sync/errgroup" ) var progress sync.Map type Task struct { FilePath string FileLines int } type Progress struct { FileName string `json:"name"` Total int `json:"total"` Uploaded int `json:"uploaded"` Percentage int `json:"percentage"` } func StartLooking(ctx context.Context, logChan *chan string, lookingPath string) { go func() { for { time.Sleep(500 * time.Millisecond) var pg []Progress progress.Range(func(key, value any) bool { p := value.(Progress) pg = append(pg, p) return true }) runtime.EventsEmit(ctx, "progress", pg) } }() for { uploadData(ctx, logChan, lookingPath) select { case <-time.After(time.Minute): case <-ctx.Done(): AddLog(logChan, "上传程序已退出") return } } } func uploadData(ctx context.Context, logChan *chan string, lookingPath string) { var path = "./" if lookingPath != "" { path = lookingPath } //获取文件列表 files, err := getTxtFiles(path) if err != nil { AddLog(logChan, "获取文件列表失败:"+err.Error()) return } if files == nil { return } start := time.Now() //检测到文件 //统计文件行数 fileLines := make(map[string]int) AddLog(logChan, fmt.Sprintf("正在统计 %v 个文件行数", len(files))) isAllEmpty := true for _, filePath := range files { select { case <-ctx.Done(): return default: file, err := os.Open(filePath) if err != nil { AddLog(logChan, "打开文件失败:"+err.Error()) } // 使用 bufio.Scanner 逐行读取 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lineCount++ } err = file.Close() if lineCount == 0 { continue } fileLines[filepath.Base(filePath)] = lineCount isAllEmpty = false AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) } } if isAllEmpty { AddLog(logChan, "所有文件都为空,不进行上传") return } //添加文件上传任务参数(文件路径,文件行数) var tasks []Task for k, v := range fileLines { tasks = append(tasks, Task{FilePath: path + "/" + k, FileLines: v}) } // 使用 errgroup 控制同时处理的文件数,并开始上传文件任务 g, egctx := errgroup.WithContext(ctx) g.SetLimit(config.APPConfig.HandleFileCount) // 设置同时处理文件数 // 执行所有任务 for _, task := range tasks { g.Go(func() error { select { case <-egctx.Done(): return egctx.Err() default: AddLog(logChan, "正在上传文件:"+filepath.Base(task.FilePath)) processFile(egctx, logChan, task.FilePath, task.FileLines) //上传完成,清空文件 err := os.Truncate(task.FilePath, 0) if err != nil { AddLog(logChan, "清空文件失败:"+err.Error()) } return nil } }) } // 等待所有任务完成 if err := g.Wait(); err != nil { AddLog(logChan, fmt.Sprintf("任务执行出错: %v", err)) } else { AddLog(logChan, "所有任务执行完成!") } AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String())) progress.Clear() } // 获取目录中的所有txt文件 func getTxtFiles(dir string) (txtFiles []string, err error) { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 只处理普通文件,跳过目录 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { txtFiles = append(txtFiles, path) } return nil }) return txtFiles, err } func processFile(ctx context.Context, logChan *chan string, filePath string, fileLines int) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) if err != nil { AddLog(logChan, fmt.Sprintf("[processFile] 无法打开文件 %s: %v", filePath, err)) return } defer file.Close() // 创建行通道 lines := make(chan string, 100) var countLine int32 = 0 // 创建指定个worker同时处理文件上传 for i := 0; i < config.APPConfig.ThreadCount; i++ { wg.Go(func() { processLines(ctx, logChan, &lines, i, filePath, &countLine) }) } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) go func() { for scanner.Scan() { select { case <-ctx.Done(): return default: lines <- scanner.Text() } } }() for int(countLine) != fileLines { select { case <-ctx.Done(): close(lines) wg.Wait() return default: progress.Store(filepath.Base(filePath), Progress{FileName: filepath.Base(filePath), Total: fileLines, Uploaded: int(countLine), Percentage: int(float64(countLine)/float64(fileLines)*100) + 1, }) time.Sleep(500 * time.Millisecond) } } close(lines) wg.Wait() if err := scanner.Err(); err != nil { AddLog(logChan, fmt.Sprintf("读取文件 %s 错误: %v", filePath, err)) return } AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filePath, countLine)) } func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) { for line := range *lines { select { case <-ctx.Done(): return default: // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 上传数据 if err := api.UploadDataToServer(line); err != nil { AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filePath, err)) } atomic.AddInt32(countLine, 1) } } } func AddLog(logChan *chan string, message string) { *logChan <- message }