Files
dypid-client/internal/uploader/uploader.go
T
ygxbnet 602c4c8546
构建上传工具 / build-tool (push) Successful in 1m1s
fix(uploader): 修复上传进度初始化逻辑
- 在处理文件前先清除之前的进度记录
- 将循环变量名从 k, v 更改为 fileName, lines 提高可读性
- 移动进度初始化位置确保每个文件都有正确的进度跟踪
- 删除重复的进度清除操作避免潜在的数据丢失问题
2026-04-28 01:35:16 +08:00

296 lines
6.7 KiB
Go

// Package uploader 上传数据
package uploader
import (
"bufio"
"context"
"dypid-client/internal/api"
"dypid-client/internal/config"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/wailsapp/wails/v2/pkg/runtime"
"golang.org/x/sync/errgroup"
)
var progress sync.Map
type Task struct {
FilePath string
FileLines int
}
type Progress struct {
FileName string `json:"name"`
Total int `json:"total"`
Uploaded int `json:"uploaded"`
Percentage int `json:"percentage"`
}
func StartLooking(ctx context.Context, logChan *chan string, lookingPath string) {
AddLog(logChan, "===============================================")
AddLog(logChan, `服务器: `+config.APPConfig.Url)
AddLog(logChan, `Token: `+config.APPConfig.Token)
AddLog(logChan, `检测目录: `+config.APPConfig.CheckDir)
AddLog(logChan, `同时处理文件数: `+strconv.Itoa(config.APPConfig.HandleFileCount))
AddLog(logChan, `单文件上传线程: `+strconv.Itoa(config.APPConfig.ThreadCount))
AddLog(logChan, "===============================================")
progress.Clear()
//推送上传进度
go func() {
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(250 * time.Millisecond)
var pg []Progress
progress.Range(func(key, value any) bool {
p := value.(Progress)
pg = append(pg, p)
return true
})
runtime.EventsEmit(ctx, "progress", pg)
}
}
}()
for {
uploadData(ctx, logChan, lookingPath)
select {
case <-time.After(time.Minute):
case <-ctx.Done():
return
}
}
}
func uploadData(ctx context.Context, logChan *chan string, lookingPath string) {
var path = "./"
if lookingPath != "" {
path = lookingPath
}
//获取文件列表
files, err := getTxtFiles(path)
if err != nil {
AddLog(logChan, "获取文件列表失败:"+err.Error())
return
}
if files == nil {
return
}
start := time.Now()
//检测到文件
//统计文件行数
fileLines := make(map[string]int)
AddLog(logChan, fmt.Sprintf("正在统计 %v 个文件行数", len(files)))
isAllEmpty := true
for _, filePath := range files {
select {
case <-ctx.Done():
return
default:
file, err := os.Open(filePath)
if err != nil {
AddLog(logChan, "打开文件失败:"+err.Error())
}
// 使用 bufio.Scanner 逐行读取
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
err = file.Close()
if lineCount == 0 {
continue
}
fileLines[filepath.Base(filePath)] = lineCount
isAllEmpty = false
AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount))
}
}
if isAllEmpty {
AddLog(logChan, "所有文件都为空,不进行上传")
return
}
progress.Clear()
//添加文件上传任务参数(文件路径,文件行数)
var tasks []Task
for fileName, lines := range fileLines {
tasks = append(tasks, Task{FilePath: path + "/" + fileName, FileLines: lines})
progress.Store(fileName, Progress{FileName: fileName, Total: lines, Uploaded: 0, Percentage: 0})
}
// 使用 errgroup 控制同时处理的文件数,并开始上传文件任务
g, egctx := errgroup.WithContext(ctx)
g.SetLimit(config.APPConfig.HandleFileCount) // 设置同时处理文件数
// 执行所有任务
for _, task := range tasks {
select {
case <-egctx.Done():
return
default:
g.Go(func() error {
select {
case <-egctx.Done():
return egctx.Err()
default:
AddLog(logChan, "正在上传文件:"+filepath.Base(task.FilePath))
processFile(egctx, logChan, task.FilePath, task.FileLines)
select {
case <-egctx.Done():
return egctx.Err()
default:
//上传完成,清空文件
err := os.Truncate(task.FilePath, 0)
if err != nil {
AddLog(logChan, "清空文件失败:"+err.Error())
}
return nil
}
}
})
}
}
select {
case <-ctx.Done():
return
default:
// 等待所有任务完成
if err := g.Wait(); err != nil {
AddLog(logChan, fmt.Sprintf("任务执行出错: %v", err))
} else {
AddLog(logChan, "所有任务执行完成!")
}
AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String()))
}
}
// 获取目录中的所有txt文件
func getTxtFiles(dir string) (txtFiles []string, err error) {
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 只处理普通文件,跳过目录
if !info.Mode().IsRegular() {
return nil
}
// 检查文件扩展名是否为.txt
if strings.ToLower(filepath.Ext(path)) == ".txt" {
txtFiles = append(txtFiles, path)
}
return nil
})
return txtFiles, err
}
func processFile(ctx context.Context, logChan *chan string, filePath string, fileLines int) {
var wg sync.WaitGroup
// 打开文件
file, err := os.Open(filePath)
if err != nil {
AddLog(logChan, fmt.Sprintf("[processFile] 无法打开文件 %s: %v", filePath, err))
return
}
defer file.Close()
// 创建行通道
lines := make(chan string, 100)
var countLine int32 = 0
// 创建指定个worker同时处理文件上传
for i := 0; i < config.APPConfig.ThreadCount; i++ {
select {
case <-ctx.Done():
return
default:
wg.Go(func() {
processLines(ctx, logChan, &lines, i, filePath, &countLine)
})
}
}
// 读取文件并发送到通道
scanner := bufio.NewScanner(file)
go func() {
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
lines <- scanner.Text()
}
}
}()
for int(countLine) != fileLines {
select {
case <-ctx.Done():
close(lines)
wg.Wait()
return
default:
progress.Store(filepath.Base(filePath),
Progress{FileName: filepath.Base(filePath),
Total: fileLines, Uploaded: int(countLine),
Percentage: int(float64(countLine)/float64(fileLines)*100) + 1,
})
time.Sleep(500 * time.Millisecond)
}
}
close(lines)
wg.Wait()
if err := scanner.Err(); err != nil {
AddLog(logChan, fmt.Sprintf("读取文件 %s 错误: %v", filePath, err))
return
}
AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filepath.Base(filePath), countLine))
}
func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) {
for line := range *lines {
select {
case <-ctx.Done():
return
default:
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 上传数据
if err := api.UploadDataToServer(ctx, line); err != nil {
AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err))
}
atomic.AddInt32(countLine, 1)
}
}
}
// AddLog 添加日志
func AddLog(logChan *chan string, message string) {
*logChan <- message
}