diff --git a/internal/uploader/uploader.go b/internal/uploader/uploader.go index 052eecb..1128834 100644 --- a/internal/uploader/uploader.go +++ b/internal/uploader/uploader.go @@ -57,21 +57,23 @@ func StartUpload(ctx context.Context, logChan *chan string) { case <-ctx.Done(): return default: - var pg []Progress - progress.Range(func(_, value any) bool { - pg = append(pg, value.(Progress)) - return true - }) - wailsruntime.EventsEmit(ctx, "progress", pg) - - time.Sleep(250 * time.Millisecond) } + + var pg []Progress + progress.Range(func(_, value any) bool { + pg = append(pg, value.(Progress)) + return true + }) + wailsruntime.EventsEmit(ctx, "progress", pg) + + time.Sleep(250 * time.Millisecond) } }() //开启上传程序 for { uploadData(ctx, logChan) + select { case <-ctx.Done(): return @@ -120,6 +122,12 @@ func uploadData(ctx context.Context, logChan *chan string) { if config.APPConfig.ClearFilesNoPrompt { //不用提示直接复制文件到tmp for _, p := range f { + select { + case <-ctx.Done(): + return + default: + } + err := copyFile(p, "./tmp/"+filepath.Base(p)) if err != nil { AddLog(logChan, "复制文件失败:"+err.Error()) @@ -143,6 +151,12 @@ func uploadData(ctx context.Context, logChan *chan string) { if <-confirm { for _, p := range f { + select { + case <-ctx.Done(): + return + default: + } + err := copyFile(p, "./tmp/"+filepath.Base(p)) if err != nil { AddLog(logChan, "复制文件失败:"+err.Error()) @@ -173,31 +187,32 @@ func uploadData(ctx context.Context, logChan *chan string) { case <-ctx.Done(): return default: - file, err := os.Open(filePath) - if err != nil { - AddLog(logChan, "打开文件失败:"+err.Error()) - } - - // 使用 bufio.Scanner 逐行读取 - scanner := bufio.NewScanner(file) - lineCount := 0 - for scanner.Scan() { - lineCount++ - } - err = file.Close() - - if lineCount == 0 { - continue - } - filesInfo[filepath.Base(filePath)] = fileInfo{ - FilePath: filePath, - FileLines: lineCount, - } - - isAllEmpty = false - - AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) } + + file, err := os.Open(filePath) + if err != nil { + AddLog(logChan, "打开文件失败:"+err.Error()) + } + + // 使用 bufio.Scanner 逐行读取 + scanner := bufio.NewScanner(file) + lineCount := 0 + for scanner.Scan() { + lineCount++ + } + err = file.Close() + + if lineCount == 0 { + continue + } + filesInfo[filepath.Base(filePath)] = fileInfo{ + FilePath: filePath, + FileLines: lineCount, + } + + isAllEmpty = false + + AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) } if isAllEmpty { @@ -228,41 +243,45 @@ func uploadData(ctx context.Context, logChan *chan string) { case <-egctx.Done(): return default: - g.Go(func() error { - select { - case <-egctx.Done(): - return egctx.Err() - default: - AddLog(logChan, "正在上传文件:"+fileName) - - processFile(egctx, logChan, info.FilePath, info.FileLines) - - select { - case <-egctx.Done(): - return egctx.Err() - default: - //上传完成,删除缓存文件 - err := os.Remove(info.FilePath) - if err != nil { - AddLog(logChan, "删除缓存文件失败:"+err.Error()) - } - return nil - } - } - }) } + + g.Go(func() error { + select { + case <-egctx.Done(): + return egctx.Err() + default: + } + + AddLog(logChan, "正在上传文件:"+fileName) + + processFile(egctx, logChan, info.FilePath, info.FileLines) + + select { + case <-egctx.Done(): + return egctx.Err() + default: + } + + //上传完成,删除缓存文件 + err := os.Remove(info.FilePath) + if err != nil { + AddLog(logChan, "删除缓存文件失败:"+err.Error()) + } + return nil + }) } select { case <-ctx.Done(): return default: - // 等待所有任务完成 - g.Wait() - - AddLog(logChan, "所有任务执行完成!") - AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String())) } + + // 等待所有任务完成 + g.Wait() + + AddLog(logChan, "所有任务执行完成!") + AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String())) } // copyFile 快速拷贝文件 src -> dst @@ -336,10 +355,11 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil close(lines) return default: - go func() { - processLines(ctx, logChan, &lines, i, filePath, &countLine) - }() } + + go func() { + processLines(ctx, logChan, &lines, i, filePath, &countLine) + }() } // 读取文件并发送到通道 @@ -356,8 +376,9 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil case <-ctx.Done(): return default: - lines <- scanner.Text() } + + lines <- scanner.Text() } }() @@ -368,15 +389,16 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil close(lines) //关闭processLines中的上传线程 return default: - progress.Store(filepath.Base(filePath), - Progress{ - FileName: filepath.Base(filePath), - Total: fileLines, - Uploaded: int(countLine), - Percentage: int(float64(countLine) / float64(fileLines) * 100), - }, - ) } + + progress.Store(filepath.Base(filePath), + Progress{ + FileName: filepath.Base(filePath), + Total: fileLines, + Uploaded: int(countLine), + Percentage: int(float64(countLine) / float64(fileLines) * 100), + }, + ) } //上传完成,进度设为100 progress.Store(filepath.Base(filePath), @@ -404,16 +426,17 @@ func processLines(ctx context.Context, logChan *chan string, lines *chan string, case <-ctx.Done(): return default: - // 跳过空行 - if strings.TrimSpace(line) == "" { - continue - } - // 上传数据 - if err := api.UploadDataToServer(ctx, line); err != nil { - AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err)) - } - atomic.AddInt32(countLine, 1) } + + // 跳过空行 + if strings.TrimSpace(line) == "" { + continue + } + // 上传数据 + if err := api.UploadDataToServer(ctx, line); err != nil { + AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err)) + } + atomic.AddInt32(countLine, 1) } }