From 75de353af673900b7fafd41845a97c6c580a3e71 Mon Sep 17 00:00:00 2001 From: YGXB_net Date: Tue, 28 Apr 2026 01:07:43 +0800 Subject: [PATCH] =?UTF-8?q?refactor(uploader):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8A=E4=BC=A0=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E5=92=8C=E8=B5=84=E6=BA=90=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 简化响应体关闭逻辑,移除不必要的nil检查 - 调整后台状态推送频率,从500ms改为250ms - 修复前端事件监听器注册顺序 - 移除未使用的进度变量 - 优化goroutine中的任务执行逻辑 - 改进文件路径显示,统一使用文件名而非完整路径 - 添加waitgroup等待确保资源正确释放 --- app.go | 5 ++-- frontend/src/App.vue | 25 ++++++++------------ internal/api/api.go | 7 +++--- internal/uploader/uploader.go | 44 ++++++++++++++++++++--------------- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/app.go b/app.go index 793749b..0fe4cb8 100644 --- a/app.go +++ b/app.go @@ -32,21 +32,22 @@ func (a *App) startup(ctx context.Context) { go func() { for log := range a.logChan { runtime.EventsEmit(a.ctx, "log", log) + time.Sleep(time.Millisecond) } }() // 后台 goroutine 持续推送运行状态 go func() { for { - time.Sleep(500 * time.Millisecond) + time.Sleep(250 * time.Millisecond) runtime.EventsEmit(a.ctx, "is-run", a.isRun) } }() //在程序启动时运行上传程序 - a.uploaderCTX, a.uploaderCancel = context.WithCancel(a.ctx) if config.APPConfig.IsRunOnStart { time.Sleep(time.Second) + a.uploaderCTX, a.uploaderCancel = context.WithCancel(a.ctx) go uploader.StartLooking(a.uploaderCTX, &a.logChan, config.APPConfig.CheckDir) a.isRun = true } diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 2a3f210..5c4f446 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -14,7 +14,6 @@ const concurrentFiles = ref(1) const uploadThreads = ref(1) const autoStart = ref(false) -const progress = ref(0) const isRunning = ref(false) const logOutput = ref([]) const logContentRef = ref() @@ -45,7 +44,7 @@ const sortedProgressList = computed(() => { const addLog = (msg: string) => { logOutput.value.push(`[${new Date().toLocaleString()}]` + msg) nextTick(() => { - if (logContentRef.value && logRoll.value){ + if (logContentRef.value && logRoll.value) { logContentRef.value.scrollTop = logContentRef.value.scrollHeight } }) @@ -71,19 +70,15 @@ const startRun = () => { ElMessage.warning('请选择检测目录') return } - isRunning.value = true - progress.value = 0 + StartUpload() } const stopRun = () => { - if (isRunning.value) { - isRunning.value = false - addLog(`正在停止运行`) - StopUpload().then(() => { - ElMessage.info('已停止运行') - }) - } + addLog(`正在停止运行`) + StopUpload().then(() => { + ElMessage.info('已停止运行') + }) } const clearLog = () => { @@ -133,14 +128,14 @@ watch(autoStart, () => { WriteConfig("is-run-on-start", autoStart.value) }) -EventsOn("log", (msg) => { - addLog(msg) +EventsOn("is-run", (run) => { + isRunning.value = run }) EventsOn("progress", (progress) => { progressList.value = progress }) -EventsOn("is-run", (run) => { - isRunning.value = run +EventsOn("log", (msg) => { + addLog(msg) }) diff --git a/internal/api/api.go b/internal/api/api.go index 2b8b6e1..4efe694 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -38,9 +38,8 @@ func UploadDataToServer(ctx context.Context, data string) error { if err != nil { return err } - if resp != nil { - _, _ = io.Copy(io.Discard, resp.Body) - resp.Body.Close() - } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return err } diff --git a/internal/uploader/uploader.go b/internal/uploader/uploader.go index 170cfd7..8d17d44 100644 --- a/internal/uploader/uploader.go +++ b/internal/uploader/uploader.go @@ -50,7 +50,7 @@ func StartLooking(ctx context.Context, logChan *chan string, lookingPath string) case <-ctx.Done(): return default: - time.Sleep(500 * time.Millisecond) + time.Sleep(250 * time.Millisecond) var pg []Progress progress.Range(func(key, value any) bool { @@ -137,28 +137,33 @@ func uploadData(ctx context.Context, logChan *chan string, lookingPath string) { g.SetLimit(config.APPConfig.HandleFileCount) // 设置同时处理文件数 // 执行所有任务 for _, task := range tasks { - g.Go(func() error { - select { - case <-egctx.Done(): - return egctx.Err() - default: - AddLog(logChan, "正在上传文件:"+filepath.Base(task.FilePath)) - - processFile(egctx, logChan, task.FilePath, task.FileLines) - + select { + case <-egctx.Done(): + return + default: + g.Go(func() error { select { case <-egctx.Done(): return egctx.Err() default: - //上传完成,清空文件 - err := os.Truncate(task.FilePath, 0) - if err != nil { - AddLog(logChan, "清空文件失败:"+err.Error()) + AddLog(logChan, "正在上传文件:"+filepath.Base(task.FilePath)) + + processFile(egctx, logChan, task.FilePath, task.FileLines) + + select { + case <-egctx.Done(): + return egctx.Err() + default: + //上传完成,清空文件 + err := os.Truncate(task.FilePath, 0) + if err != nil { + AddLog(logChan, "清空文件失败:"+err.Error()) + } + return nil } - return nil } - } - }) + }) + } } select { @@ -242,6 +247,7 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil select { case <-ctx.Done(): close(lines) + wg.Wait() return default: progress.Store(filepath.Base(filePath), @@ -261,7 +267,7 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil return } - AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filePath, countLine)) + AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filepath.Base(filePath), countLine)) } func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) { @@ -276,7 +282,7 @@ func processLines(ctx context.Context, logChan *chan string, lines *chan string, } // 上传数据 if err := api.UploadDataToServer(ctx, line); err != nil { - AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filePath, err)) + AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err)) } atomic.AddInt32(countLine, 1) }