993814cdfa
- 将变量名 fInfo 重命名为 filesInfo 以提高可读性 - 调整代码顺序,将 AddLog 调用移到变量声明后 - 统一使用新变量名在所有相关位置进行引用 - 移动 g.SetLimit 注释位置以提高代码可读性
424 lines
9.8 KiB
Go
424 lines
9.8 KiB
Go
// Package uploader 上传数据
|
|
package uploader
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"dypid-client/internal/api"
|
|
"dypid-client/internal/config"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
wailsruntime "github.com/wailsapp/wails/v2/pkg/runtime"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type fileInfo struct {
|
|
FilePath string
|
|
FileLines int
|
|
}
|
|
|
|
type Progress struct {
|
|
FileName string `json:"name"`
|
|
Total int `json:"total"`
|
|
Uploaded int `json:"uploaded"`
|
|
Percentage int `json:"percentage"`
|
|
}
|
|
|
|
var progress sync.Map
|
|
|
|
func StartUpload(ctx context.Context, logChan *chan string) {
|
|
AddLog(logChan, "===============================================")
|
|
AddLog(logChan, `服务器: `+config.APPConfig.Url)
|
|
AddLog(logChan, `Token: `+config.APPConfig.Token)
|
|
AddLog(logChan, `检测目录: `+config.APPConfig.CheckDir)
|
|
AddLog(logChan, `同时处理文件数: `+strconv.Itoa(config.APPConfig.HandleFileCount))
|
|
AddLog(logChan, `单文件上传线程: `+strconv.Itoa(config.APPConfig.ThreadCount))
|
|
AddLog(logChan, "===============================================")
|
|
|
|
AddLog(logChan, "正在创建连接池(连接池可避免首次大量上传时出现网络错误)")
|
|
api.InitConn()
|
|
AddLog(logChan, "创建连接池完成,开始运行程序")
|
|
|
|
progress.Clear()
|
|
|
|
//推送上传进度
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
var pg []Progress
|
|
progress.Range(func(_, value any) bool {
|
|
pg = append(pg, value.(Progress))
|
|
return true
|
|
})
|
|
wailsruntime.EventsEmit(ctx, "progress", pg)
|
|
|
|
time.Sleep(250 * time.Millisecond)
|
|
}
|
|
}
|
|
}()
|
|
|
|
//开启上传程序
|
|
for {
|
|
uploadData(ctx, logChan)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(time.Minute):
|
|
}
|
|
}
|
|
}
|
|
|
|
func uploadData(ctx context.Context, logChan *chan string) {
|
|
start := time.Now()
|
|
|
|
// 获取检测目录
|
|
var checkPath = "./"
|
|
if config.APPConfig.CheckDir != "" {
|
|
checkPath = config.APPConfig.CheckDir
|
|
}
|
|
|
|
//要上传的文件路径字符串数组
|
|
var files []string
|
|
|
|
//先检测tmp目录有没有残余文件
|
|
os.Mkdir("./tmp", os.ModePerm)
|
|
tmpFiles, err := getTxtFiles("./tmp")
|
|
if err != nil {
|
|
AddLog(logChan, "获取 tmp 文件列表失败:"+err.Error())
|
|
}
|
|
|
|
//tmp有文件,优先上传(else:tmp没文件扫描指定文件夹,并复制文件到tmp)
|
|
if tmpFiles != nil {
|
|
AddLog(logChan, "当前 tmp 目录下还有未上传完成文件,将优先上传 tmp 目录文件")
|
|
files = tmpFiles
|
|
} else {
|
|
//tmp没文件,扫描指定文件夹
|
|
f, err := getTxtFiles(checkPath)
|
|
if err != nil {
|
|
AddLog(logChan, "获取文件列表失败:"+err.Error())
|
|
return
|
|
}
|
|
|
|
//指定文件夹没文件,退出函数
|
|
if f == nil {
|
|
return
|
|
}
|
|
|
|
//是否向用户提示清空文件,并复制文件到tmp
|
|
if config.APPConfig.ClearFilesNoPrompt {
|
|
//不用提示直接复制文件到tmp
|
|
for _, p := range f {
|
|
err := copyFile(p, "./tmp/"+filepath.Base(p))
|
|
if err != nil {
|
|
AddLog(logChan, "复制文件失败:"+err.Error())
|
|
} else {
|
|
files = append(files, "./tmp/"+filepath.Base(p))
|
|
err := os.Truncate(p, 0)
|
|
if err != nil {
|
|
AddLog(logChan, "清空文件失败:"+err.Error())
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
//提示用户
|
|
wailsruntime.EventsEmit(ctx, "clear-files", f)
|
|
|
|
confirm := make(chan bool)
|
|
|
|
wailsruntime.EventsOn(ctx, "confirm-clear-files", func(optionalData ...any) {
|
|
confirm <- optionalData[0].(bool)
|
|
})
|
|
|
|
if <-confirm {
|
|
for _, p := range f {
|
|
err := copyFile(p, "./tmp/"+filepath.Base(p))
|
|
if err != nil {
|
|
AddLog(logChan, "复制文件失败:"+err.Error())
|
|
} else {
|
|
files = append(files, "./tmp/"+filepath.Base(p))
|
|
err := os.Truncate(p, 0)
|
|
if err != nil {
|
|
AddLog(logChan, "清空文件失败:"+err.Error())
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
AddLog(logChan, "已取消上传,1分钟后再运行")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
//检测到文件
|
|
//统计文件行数
|
|
var filesInfo = make(map[string]fileInfo)
|
|
|
|
isAllEmpty := true
|
|
|
|
AddLog(logChan, fmt.Sprintf("正在统计 %v 个文件行数", len(files)))
|
|
for _, filePath := range files {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
AddLog(logChan, "打开文件失败:"+err.Error())
|
|
}
|
|
|
|
// 使用 bufio.Scanner 逐行读取
|
|
scanner := bufio.NewScanner(file)
|
|
lineCount := 0
|
|
for scanner.Scan() {
|
|
lineCount++
|
|
}
|
|
err = file.Close()
|
|
|
|
if lineCount == 0 {
|
|
continue
|
|
}
|
|
filesInfo[filepath.Base(filePath)] = fileInfo{
|
|
FilePath: filePath,
|
|
FileLines: lineCount,
|
|
}
|
|
|
|
isAllEmpty = false
|
|
|
|
AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount))
|
|
}
|
|
}
|
|
|
|
if isAllEmpty {
|
|
AddLog(logChan, "所有文件都为空,不进行上传")
|
|
return
|
|
}
|
|
|
|
//刷新文件上传进度
|
|
progress.Clear()
|
|
for fileName, info := range filesInfo {
|
|
progress.Store(fileName,
|
|
Progress{
|
|
FileName: fileName,
|
|
Total: info.FileLines,
|
|
Uploaded: 0,
|
|
Percentage: 0,
|
|
},
|
|
)
|
|
}
|
|
|
|
// 使用 errgroup 控制同时处理的文件数,并开始上传文件任务
|
|
g, egctx := errgroup.WithContext(ctx)
|
|
// 设置同时处理文件数
|
|
g.SetLimit(config.APPConfig.HandleFileCount)
|
|
// 执行文件上传任务参数(文件路径,文件行数)
|
|
for fileName, info := range filesInfo {
|
|
select {
|
|
case <-egctx.Done():
|
|
return
|
|
default:
|
|
g.Go(func() error {
|
|
select {
|
|
case <-egctx.Done():
|
|
return egctx.Err()
|
|
default:
|
|
AddLog(logChan, "正在上传文件:"+fileName)
|
|
|
|
processFile(egctx, logChan, info.FilePath, info.FileLines)
|
|
|
|
select {
|
|
case <-egctx.Done():
|
|
return egctx.Err()
|
|
default:
|
|
//上传完成,删除缓存文件
|
|
err := os.Remove(info.FilePath)
|
|
if err != nil {
|
|
AddLog(logChan, "删除缓存文件失败:"+err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// 等待所有任务完成
|
|
g.Wait()
|
|
|
|
AddLog(logChan, "所有任务执行完成!")
|
|
AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String()))
|
|
}
|
|
}
|
|
|
|
// copyFile 快速拷贝文件 src -> dst
|
|
func copyFile(src, dst string) error {
|
|
// 打开源文件
|
|
sourceFile, err := os.Open(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer sourceFile.Close()
|
|
|
|
// 创建目标文件
|
|
destFile, err := os.Create(dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer destFile.Close()
|
|
|
|
// 核心:最快拷贝,底层使用操作系统零拷贝技术
|
|
_, err = io.Copy(destFile, sourceFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 强制刷入磁盘,保证数据完整
|
|
return destFile.Sync()
|
|
}
|
|
|
|
// 获取目录中的所有txt文件(文件大小为0的不返回)
|
|
func getTxtFiles(dir string) (txtFiles []string, err error) {
|
|
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 跳过目录,只处理普通文件
|
|
if !info.Mode().IsRegular() {
|
|
return nil
|
|
}
|
|
|
|
// 检查文件扩展名是否为.txt
|
|
if strings.ToLower(filepath.Ext(path)) == ".txt" {
|
|
if info.Size() != 0 {
|
|
txtFiles = append(txtFiles, path)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return txtFiles, err
|
|
}
|
|
|
|
// processFile 处理每个文件
|
|
func processFile(ctx context.Context, logChan *chan string, filePath string, fileLines int) {
|
|
// 打开文件
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
AddLog(logChan, fmt.Sprintf("[processFile] 无法打开文件 %s: %v", filePath, err))
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
// 创建行通道
|
|
lines := make(chan string, 200)
|
|
var countLine int32 = 0
|
|
// 创建指定个worker同时处理文件上传
|
|
for i := 0; i < config.APPConfig.ThreadCount; i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(lines)
|
|
return
|
|
default:
|
|
go func() {
|
|
processLines(ctx, logChan, &lines, i, filePath, &countLine)
|
|
}()
|
|
}
|
|
}
|
|
|
|
// 读取文件并发送到通道
|
|
scanner := bufio.NewScanner(file)
|
|
go func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
_, f, l, _ := runtime.Caller(0)
|
|
fmt.Println("panic:", f+":"+strconv.Itoa(l), r)
|
|
}
|
|
}()
|
|
for scanner.Scan() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
lines <- scanner.Text()
|
|
}
|
|
}
|
|
}()
|
|
|
|
// 等待所有行处理完成并推送进度
|
|
for int(countLine) != fileLines {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(lines) //关闭processLines中的上传线程
|
|
return
|
|
default:
|
|
progress.Store(filepath.Base(filePath),
|
|
Progress{
|
|
FileName: filepath.Base(filePath),
|
|
Total: fileLines,
|
|
Uploaded: int(countLine),
|
|
Percentage: int(float64(countLine) / float64(fileLines) * 100),
|
|
},
|
|
)
|
|
}
|
|
}
|
|
//上传完成,进度设为100
|
|
progress.Store(filepath.Base(filePath),
|
|
Progress{
|
|
FileName: filepath.Base(filePath),
|
|
Total: fileLines,
|
|
Uploaded: int(countLine),
|
|
Percentage: 100,
|
|
},
|
|
)
|
|
close(lines) //关闭processLines中的上传线程
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
AddLog(logChan, fmt.Sprintf("读取文件 %s 错误: %v", filePath, err))
|
|
return
|
|
}
|
|
|
|
AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filepath.Base(filePath), countLine))
|
|
}
|
|
|
|
// processLines 处理接受到的每一行数据并上传(chan 管道接受数据)
|
|
func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) {
|
|
for line := range *lines {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// 跳过空行
|
|
if strings.TrimSpace(line) == "" {
|
|
continue
|
|
}
|
|
// 上传数据
|
|
if err := api.UploadDataToServer(ctx, line); err != nil {
|
|
AddLog(logChan, fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v", workerID, filepath.Base(filePath), err))
|
|
}
|
|
atomic.AddInt32(countLine, 1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddLog 添加日志
|
|
func AddLog(logChan *chan string, message string) {
|
|
*logChan <- message
|
|
}
|