- 简化响应体关闭逻辑,移除不必要的nil检查 - 调整后台状态推送频率,从500ms改为250ms - 修复前端事件监听器注册顺序 - 移除未使用的进度变量 - 优化goroutine中的任务执行逻辑 - 改进文件路径显示,统一使用文件名而非完整路径 - 添加waitgroup等待确保资源正确释放
This commit is contained in:
@@ -50,7 +50,7 @@ func StartLooking(ctx context.Context, logChan *chan string, lookingPath string)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
var pg []Progress
|
||||
progress.Range(func(key, value any) bool {
|
||||
@@ -137,28 +137,33 @@ func uploadData(ctx context.Context, logChan *chan string, lookingPath string) {
|
||||
g.SetLimit(config.APPConfig.HandleFileCount) // 设置同时处理文件数
|
||||
// 执行所有任务
|
||||
for _, task := range tasks {
|
||||
g.Go(func() error {
|
||||
select {
|
||||
case <-egctx.Done():
|
||||
return egctx.Err()
|
||||
default:
|
||||
AddLog(logChan, "正在上传文件:"+filepath.Base(task.FilePath))
|
||||
|
||||
processFile(egctx, logChan, task.FilePath, task.FileLines)
|
||||
|
||||
select {
|
||||
case <-egctx.Done():
|
||||
return
|
||||
default:
|
||||
g.Go(func() error {
|
||||
select {
|
||||
case <-egctx.Done():
|
||||
return egctx.Err()
|
||||
default:
|
||||
//上传完成,清空文件
|
||||
err := os.Truncate(task.FilePath, 0)
|
||||
if err != nil {
|
||||
AddLog(logChan, "清空文件失败:"+err.Error())
|
||||
AddLog(logChan, "正在上传文件:"+filepath.Base(task.FilePath))
|
||||
|
||||
processFile(egctx, logChan, task.FilePath, task.FileLines)
|
||||
|
||||
select {
|
||||
case <-egctx.Done():
|
||||
return egctx.Err()
|
||||
default:
|
||||
//上传完成,清空文件
|
||||
err := os.Truncate(task.FilePath, 0)
|
||||
if err != nil {
|
||||
AddLog(logChan, "清空文件失败:"+err.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -242,6 +247,7 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(lines)
|
||||
wg.Wait()
|
||||
return
|
||||
default:
|
||||
progress.Store(filepath.Base(filePath),
|
||||
@@ -261,7 +267,7 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil
|
||||
return
|
||||
}
|
||||
|
||||
AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filePath, countLine))
|
||||
AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filepath.Base(filePath), countLine))
|
||||
}
|
||||
|
||||
func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) {
|
||||
@@ -276,7 +282,7 @@ func processLines(ctx context.Context, logChan *chan string, lines *chan string,
|
||||
}
|
||||
// 上传数据
|
||||
if err := api.UploadDataToServer(ctx, line); err != nil {
|
||||
AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filePath, err))
|
||||
AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err))
|
||||
}
|
||||
atomic.AddInt32(countLine, 1)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user