// Package uploader 上传数据 package uploader import ( "bufio" "context" "dypid-client/internal/api" "dypid-client/internal/config" "fmt" "io" "os" "path/filepath" "runtime" "strconv" "strings" "sync" "sync/atomic" "time" wailsruntime "github.com/wailsapp/wails/v2/pkg/runtime" "golang.org/x/sync/errgroup" ) type fileInfo struct { FilePath string FileLines int } type Progress struct { FileName string `json:"name"` Total int `json:"total"` Uploaded int `json:"uploaded"` Percentage int `json:"percentage"` } var progress sync.Map func StartUpload(ctx context.Context, logChan *chan string) { AddLog(logChan, "===============================================") AddLog(logChan, `服务器: `+config.APPConfig.Url) AddLog(logChan, `Token: `+config.APPConfig.Token) AddLog(logChan, `检测目录: `+config.APPConfig.CheckDir) AddLog(logChan, `同时处理文件数: `+strconv.Itoa(config.APPConfig.HandleFileCount)) AddLog(logChan, `单文件上传线程: `+strconv.Itoa(config.APPConfig.ThreadCount)) AddLog(logChan, "===============================================") AddLog(logChan, "正在创建连接池(连接池可避免首次大量上传时出现网络错误)") api.InitConn() AddLog(logChan, "创建连接池完成,开始运行程序") progress.Clear() //推送上传进度 go func() { for { select { case <-ctx.Done(): return default: var pg []Progress progress.Range(func(_, value any) bool { pg = append(pg, value.(Progress)) return true }) wailsruntime.EventsEmit(ctx, "progress", pg) time.Sleep(250 * time.Millisecond) } } }() //开启上传程序 for { uploadData(ctx, logChan) select { case <-ctx.Done(): return case <-time.After(time.Minute): } } } func uploadData(ctx context.Context, logChan *chan string) { start := time.Now() // 获取检测目录 var checkPath = "./" if config.APPConfig.CheckDir != "" { checkPath = config.APPConfig.CheckDir } //要上传的文件路径字符串数组 var files []string //先检测tmp目录有没有残余文件 os.Mkdir("./tmp", os.ModePerm) tmpFiles, err := getTxtFiles("./tmp") if err != nil { AddLog(logChan, "获取 tmp 文件列表失败:"+err.Error()) } //tmp有文件,优先上传(else:tmp没文件扫描指定文件夹,并复制文件到tmp) if tmpFiles != nil { AddLog(logChan, "当前 tmp 目录下还有未上传完成文件,将优先上传 tmp 目录文件") files = tmpFiles } else { //tmp没文件,扫描指定文件夹 f, err := getTxtFiles(checkPath) if err != nil { AddLog(logChan, "获取文件列表失败:"+err.Error()) return } //指定文件夹没文件,退出函数 if f == nil { return } //是否向用户提示清空文件,并复制文件到tmp if config.APPConfig.ClearFilesNoPrompt { //不用提示直接复制文件到tmp for _, p := range f { err := copyFile(p, "./tmp/"+filepath.Base(p)) if err != nil { AddLog(logChan, "复制文件失败:"+err.Error()) } else { files = append(files, "./tmp/"+filepath.Base(p)) err := os.Truncate(p, 0) if err != nil { AddLog(logChan, "清空文件失败:"+err.Error()) } } } } else { //提示用户 wailsruntime.EventsEmit(ctx, "clear-files", f) confirm := make(chan bool) wailsruntime.EventsOn(ctx, "confirm-clear-files", func(optionalData ...any) { confirm <- optionalData[0].(bool) }) if <-confirm { for _, p := range f { err := copyFile(p, "./tmp/"+filepath.Base(p)) if err != nil { AddLog(logChan, "复制文件失败:"+err.Error()) } else { files = append(files, "./tmp/"+filepath.Base(p)) err := os.Truncate(p, 0) if err != nil { AddLog(logChan, "清空文件失败:"+err.Error()) } } } } else { AddLog(logChan, "已取消上传,1分钟后再运行") return } } } //检测到文件 //统计文件行数 var fInfo = make(map[string]fileInfo) AddLog(logChan, fmt.Sprintf("正在统计 %v 个文件行数", len(files))) isAllEmpty := true for _, filePath := range files { select { case <-ctx.Done(): return default: file, err := os.Open(filePath) if err != nil { AddLog(logChan, "打开文件失败:"+err.Error()) } // 使用 bufio.Scanner 逐行读取 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lineCount++ } err = file.Close() if lineCount == 0 { continue } fInfo[filepath.Base(filePath)] = fileInfo{ FilePath: filePath, FileLines: lineCount, } isAllEmpty = false AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) } } if isAllEmpty { AddLog(logChan, "所有文件都为空,不进行上传") return } //刷新文件上传进度 progress.Clear() for fileName, info := range fInfo { progress.Store(fileName, Progress{ FileName: fileName, Total: info.FileLines, Uploaded: 0, Percentage: 0, }, ) } // 使用 errgroup 控制同时处理的文件数,并开始上传文件任务 g, egctx := errgroup.WithContext(ctx) g.SetLimit(config.APPConfig.HandleFileCount) // 设置同时处理文件数 // 执行文件上传任务参数(文件路径,文件行数) for fileName, info := range fInfo { select { case <-egctx.Done(): return default: g.Go(func() error { select { case <-egctx.Done(): return egctx.Err() default: AddLog(logChan, "正在上传文件:"+fileName) processFile(egctx, logChan, info.FilePath, info.FileLines) select { case <-egctx.Done(): return egctx.Err() default: //上传完成,删除缓存文件 err := os.Remove(info.FilePath) if err != nil { AddLog(logChan, "删除缓存文件失败:"+err.Error()) } return nil } } }) } } select { case <-ctx.Done(): return default: // 等待所有任务完成 g.Wait() AddLog(logChan, "所有任务执行完成!") AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String())) } } // copyFile 快速拷贝文件 src -> dst func copyFile(src, dst string) error { // 打开源文件 sourceFile, err := os.Open(src) if err != nil { return err } defer sourceFile.Close() // 创建目标文件 destFile, err := os.Create(dst) if err != nil { return err } defer destFile.Close() // 核心:最快拷贝,底层使用操作系统零拷贝技术 _, err = io.Copy(destFile, sourceFile) if err != nil { return err } // 强制刷入磁盘,保证数据完整 return destFile.Sync() } // 获取目录中的所有txt文件(文件大小为0的不返回) func getTxtFiles(dir string) (txtFiles []string, err error) { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 跳过目录,只处理普通文件 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { if info.Size() != 0 { txtFiles = append(txtFiles, path) } } return nil }) return txtFiles, err } // processFile 处理每个文件 func processFile(ctx context.Context, logChan *chan string, filePath string, fileLines int) { // 打开文件 file, err := os.Open(filePath) if err != nil { AddLog(logChan, fmt.Sprintf("[processFile] 无法打开文件 %s: %v", filePath, err)) return } defer file.Close() // 创建行通道 lines := make(chan string, 200) var countLine int32 = 0 // 创建指定个worker同时处理文件上传 for i := 0; i < config.APPConfig.ThreadCount; i++ { select { case <-ctx.Done(): close(lines) return default: go func() { processLines(ctx, logChan, &lines, i, filePath, &countLine) }() } } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) go func() { defer func() { if r := recover(); r != nil { _, f, l, _ := runtime.Caller(0) fmt.Println("panic:", f+":"+strconv.Itoa(l), r) } }() for scanner.Scan() { select { case <-ctx.Done(): return default: lines <- scanner.Text() } } }() // 等待所有行处理完成并推送进度 for int(countLine) != fileLines { select { case <-ctx.Done(): close(lines) //关闭processLines中的上传线程 return default: progress.Store(filepath.Base(filePath), Progress{ FileName: filepath.Base(filePath), Total: fileLines, Uploaded: int(countLine), Percentage: int(float64(countLine) / float64(fileLines) * 100), }, ) } } //上传完成,进度设为100 progress.Store(filepath.Base(filePath), Progress{ FileName: filepath.Base(filePath), Total: fileLines, Uploaded: int(countLine), Percentage: 100, }, ) close(lines) //关闭processLines中的上传线程 if err := scanner.Err(); err != nil { AddLog(logChan, fmt.Sprintf("读取文件 %s 错误: %v", filePath, err)) return } AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filepath.Base(filePath), countLine)) } // processLines 处理接受到的每一行数据并上传(chan 管道接受数据) func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) { for line := range *lines { select { case <-ctx.Done(): return default: // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 上传数据 if err := api.UploadDataToServer(ctx, line); err != nil { AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err)) } atomic.AddInt32(countLine, 1) } } } // AddLog 添加日志 func AddLog(logChan *chan string, message string) { *logChan <- message }