refactor(uploader): 优化上传器代码结构和上下文取消处理

- 在进度更新循环中添加上下文取消检查点
- 在文件复制操作前添加上下文取消检查点
- 重构代码缩进和括号位置以提高可读性
- 优化 goroutine 中的上下文取消处理逻辑
- 统一代码块的括号格式和缩进风格
This commit is contained in:
2026-05-01 22:47:39 +08:00
parent 993814cdfa
commit d426c16104
+103 -80
View File
@@ -57,21 +57,23 @@ func StartUpload(ctx context.Context, logChan *chan string) {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
var pg []Progress
progress.Range(func(_, value any) bool {
pg = append(pg, value.(Progress))
return true
})
wailsruntime.EventsEmit(ctx, "progress", pg)
time.Sleep(250 * time.Millisecond)
} }
var pg []Progress
progress.Range(func(_, value any) bool {
pg = append(pg, value.(Progress))
return true
})
wailsruntime.EventsEmit(ctx, "progress", pg)
time.Sleep(250 * time.Millisecond)
} }
}() }()
//开启上传程序 //开启上传程序
for { for {
uploadData(ctx, logChan) uploadData(ctx, logChan)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@@ -120,6 +122,12 @@ func uploadData(ctx context.Context, logChan *chan string) {
if config.APPConfig.ClearFilesNoPrompt { if config.APPConfig.ClearFilesNoPrompt {
//不用提示直接复制文件到tmp //不用提示直接复制文件到tmp
for _, p := range f { for _, p := range f {
select {
case <-ctx.Done():
return
default:
}
err := copyFile(p, "./tmp/"+filepath.Base(p)) err := copyFile(p, "./tmp/"+filepath.Base(p))
if err != nil { if err != nil {
AddLog(logChan, "复制文件失败:"+err.Error()) AddLog(logChan, "复制文件失败:"+err.Error())
@@ -143,6 +151,12 @@ func uploadData(ctx context.Context, logChan *chan string) {
if <-confirm { if <-confirm {
for _, p := range f { for _, p := range f {
select {
case <-ctx.Done():
return
default:
}
err := copyFile(p, "./tmp/"+filepath.Base(p)) err := copyFile(p, "./tmp/"+filepath.Base(p))
if err != nil { if err != nil {
AddLog(logChan, "复制文件失败:"+err.Error()) AddLog(logChan, "复制文件失败:"+err.Error())
@@ -173,31 +187,32 @@ func uploadData(ctx context.Context, logChan *chan string) {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
file, err := os.Open(filePath)
if err != nil {
AddLog(logChan, "打开文件失败:"+err.Error())
}
// 使用 bufio.Scanner 逐行读取
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
err = file.Close()
if lineCount == 0 {
continue
}
filesInfo[filepath.Base(filePath)] = fileInfo{
FilePath: filePath,
FileLines: lineCount,
}
isAllEmpty = false
AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount))
} }
file, err := os.Open(filePath)
if err != nil {
AddLog(logChan, "打开文件失败:"+err.Error())
}
// 使用 bufio.Scanner 逐行读取
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
err = file.Close()
if lineCount == 0 {
continue
}
filesInfo[filepath.Base(filePath)] = fileInfo{
FilePath: filePath,
FileLines: lineCount,
}
isAllEmpty = false
AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount))
} }
if isAllEmpty { if isAllEmpty {
@@ -228,41 +243,45 @@ func uploadData(ctx context.Context, logChan *chan string) {
case <-egctx.Done(): case <-egctx.Done():
return return
default: default:
g.Go(func() error {
select {
case <-egctx.Done():
return egctx.Err()
default:
AddLog(logChan, "正在上传文件:"+fileName)
processFile(egctx, logChan, info.FilePath, info.FileLines)
select {
case <-egctx.Done():
return egctx.Err()
default:
//上传完成,删除缓存文件
err := os.Remove(info.FilePath)
if err != nil {
AddLog(logChan, "删除缓存文件失败:"+err.Error())
}
return nil
}
}
})
} }
g.Go(func() error {
select {
case <-egctx.Done():
return egctx.Err()
default:
}
AddLog(logChan, "正在上传文件:"+fileName)
processFile(egctx, logChan, info.FilePath, info.FileLines)
select {
case <-egctx.Done():
return egctx.Err()
default:
}
//上传完成,删除缓存文件
err := os.Remove(info.FilePath)
if err != nil {
AddLog(logChan, "删除缓存文件失败:"+err.Error())
}
return nil
})
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
// 等待所有任务完成
g.Wait()
AddLog(logChan, "所有任务执行完成!")
AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String()))
} }
// 等待所有任务完成
g.Wait()
AddLog(logChan, "所有任务执行完成!")
AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String()))
} }
// copyFile 快速拷贝文件 src -> dst // copyFile 快速拷贝文件 src -> dst
@@ -336,10 +355,11 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil
close(lines) close(lines)
return return
default: default:
go func() {
processLines(ctx, logChan, &lines, i, filePath, &countLine)
}()
} }
go func() {
processLines(ctx, logChan, &lines, i, filePath, &countLine)
}()
} }
// 读取文件并发送到通道 // 读取文件并发送到通道
@@ -356,8 +376,9 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
lines <- scanner.Text()
} }
lines <- scanner.Text()
} }
}() }()
@@ -368,15 +389,16 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil
close(lines) //关闭processLines中的上传线程 close(lines) //关闭processLines中的上传线程
return return
default: default:
progress.Store(filepath.Base(filePath),
Progress{
FileName: filepath.Base(filePath),
Total: fileLines,
Uploaded: int(countLine),
Percentage: int(float64(countLine) / float64(fileLines) * 100),
},
)
} }
progress.Store(filepath.Base(filePath),
Progress{
FileName: filepath.Base(filePath),
Total: fileLines,
Uploaded: int(countLine),
Percentage: int(float64(countLine) / float64(fileLines) * 100),
},
)
} }
//上传完成,进度设为100 //上传完成,进度设为100
progress.Store(filepath.Base(filePath), progress.Store(filepath.Base(filePath),
@@ -404,16 +426,17 @@ func processLines(ctx context.Context, logChan *chan string, lines *chan string,
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 上传数据
if err := api.UploadDataToServer(ctx, line); err != nil {
AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err))
}
atomic.AddInt32(countLine, 1)
} }
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 上传数据
if err := api.UploadDataToServer(ctx, line); err != nil {
AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err))
}
atomic.AddInt32(countLine, 1)
} }
} }