feat(app): 添加自动启动和日志滚动功能并优化上传逻辑
- 增加了运行时自动启动上传配置选项 - 实现了日志输出的滚动控制功能 - 优化了上传进度显示和状态同步机制 - 提升了HTTP客户端连接池配置至500 - 改进了文件上传完成后的清理逻辑 - 添加了上下文取消检查避免资源泄露 - 完善了上传开始时的日志信息输出
This commit is contained in:
+2
-2
@@ -11,8 +11,8 @@ import (
|
||||
|
||||
var httpClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConns: 200,
|
||||
MaxIdleConnsPerHost: 200,
|
||||
MaxIdleConns: 500,
|
||||
MaxIdleConnsPerHost: 500,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
},
|
||||
Timeout: 30 * time.Second,
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -33,6 +34,13 @@ type Progress struct {
|
||||
}
|
||||
|
||||
func StartLooking(ctx context.Context, logChan *chan string, lookingPath string) {
|
||||
AddLog(logChan, "===============================================")
|
||||
AddLog(logChan, `服务器: `+config.APPConfig.Url)
|
||||
AddLog(logChan, `Token: `+config.APPConfig.Token)
|
||||
AddLog(logChan, `检测目录: `+config.APPConfig.CheckDir)
|
||||
AddLog(logChan, `同时处理文件数: `+strconv.Itoa(config.APPConfig.HandleFileCount))
|
||||
AddLog(logChan, `单文件上传线程: `+strconv.Itoa(config.APPConfig.ThreadCount))
|
||||
AddLog(logChan, "===============================================")
|
||||
//推送上传进度
|
||||
go func() {
|
||||
for {
|
||||
@@ -108,6 +116,7 @@ func uploadData(ctx context.Context, logChan *chan string, lookingPath string) {
|
||||
fileLines[filepath.Base(filePath)] = lineCount
|
||||
isAllEmpty = false
|
||||
AddLog(logChan, fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount))
|
||||
progress.Store(filepath.Base(filePath), Progress{FileName: filepath.Base(filePath), Total: lineCount, Uploaded: 0, Percentage: 0})
|
||||
}
|
||||
}
|
||||
if isAllEmpty {
|
||||
@@ -135,25 +144,35 @@ func uploadData(ctx context.Context, logChan *chan string, lookingPath string) {
|
||||
|
||||
processFile(egctx, logChan, task.FilePath, task.FileLines)
|
||||
|
||||
//上传完成,清空文件
|
||||
err := os.Truncate(task.FilePath, 0)
|
||||
if err != nil {
|
||||
AddLog(logChan, "清空文件失败:"+err.Error())
|
||||
select {
|
||||
case <-egctx.Done():
|
||||
return egctx.Err()
|
||||
default:
|
||||
//上传完成,清空文件
|
||||
err := os.Truncate(task.FilePath, 0)
|
||||
if err != nil {
|
||||
AddLog(logChan, "清空文件失败:"+err.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// 等待所有任务完成
|
||||
if err := g.Wait(); err != nil {
|
||||
AddLog(logChan, fmt.Sprintf("任务执行出错: %v", err))
|
||||
} else {
|
||||
AddLog(logChan, "所有任务执行完成!")
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
// 等待所有任务完成
|
||||
if err := g.Wait(); err != nil {
|
||||
AddLog(logChan, fmt.Sprintf("任务执行出错: %v", err))
|
||||
} else {
|
||||
AddLog(logChan, "所有任务执行完成!")
|
||||
}
|
||||
|
||||
AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String()))
|
||||
progress.Clear()
|
||||
AddLog(logChan, fmt.Sprintf("上传完成,耗时:%s", time.Since(start).String()))
|
||||
progress.Clear()
|
||||
}
|
||||
}
|
||||
|
||||
// 获取目录中的所有txt文件
|
||||
@@ -194,9 +213,14 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil
|
||||
var countLine int32 = 0
|
||||
// 创建指定个worker同时处理文件上传
|
||||
for i := 0; i < config.APPConfig.ThreadCount; i++ {
|
||||
wg.Go(func() {
|
||||
processLines(ctx, logChan, &lines, i, filePath, &countLine)
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
wg.Go(func() {
|
||||
processLines(ctx, logChan, &lines, i, filePath, &countLine)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 读取文件并发送到通道
|
||||
@@ -257,6 +281,7 @@ func processLines(ctx context.Context, logChan *chan string, lines *chan string,
|
||||
}
|
||||
}
|
||||
|
||||
// AddLog 添加日志
|
||||
func AddLog(logChan *chan string, message string) {
|
||||
*logChan <- message
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user