feat(config): 添加文件并发处理配置和停止功能
All checks were successful
构建上传工具 / build-tool (push) Successful in 4m1s

This commit is contained in:
2025-10-15 18:12:47 +08:00
parent 84343031e8
commit 2f5708fbc2
2 changed files with 176 additions and 79 deletions

View File

@@ -11,6 +11,7 @@ type Config struct {
Url string `mapstructure:"url"` Url string `mapstructure:"url"`
Token string `mapstructure:"token"` Token string `mapstructure:"token"`
ThreadCount int `mapstructure:"thread-count"` ThreadCount int `mapstructure:"thread-count"`
HandleFileCount int `mapstructure:"handle-file-count"`
IsRunOnStart bool `mapstructure:"is-run-on-start"` IsRunOnStart bool `mapstructure:"is-run-on-start"`
LookingPath string `mapstructure:"looking-path"` LookingPath string `mapstructure:"looking-path"`
} }
@@ -23,12 +24,14 @@ func InitConfig() {
Url: "http://127.0.0.1:8080", Url: "http://127.0.0.1:8080",
Token: "", Token: "",
ThreadCount: 10, ThreadCount: 10,
HandleFileCount: 50,
IsRunOnStart: false, IsRunOnStart: false,
LookingPath: "", LookingPath: "",
} }
viper.SetDefault("url", defaultConfig.Url) viper.SetDefault("url", defaultConfig.Url)
viper.SetDefault("token", defaultConfig.Token) viper.SetDefault("token", defaultConfig.Token)
viper.SetDefault("thread-count", defaultConfig.ThreadCount) viper.SetDefault("thread-count", defaultConfig.ThreadCount)
viper.SetDefault("handle-file-count", defaultConfig.HandleFileCount)
viper.SetDefault("is-run-on-start", defaultConfig.IsRunOnStart) viper.SetDefault("is-run-on-start", defaultConfig.IsRunOnStart)
viper.SetDefault("looking-path", defaultConfig.LookingPath) viper.SetDefault("looking-path", defaultConfig.LookingPath)

146
main.go
View File

@@ -25,13 +25,14 @@ import (
var isRun = false var isRun = false
var logText = widget.NewMultiLineEntry() var logText = widget.NewMultiLineEntry()
var ctx, cancel = context.WithCancel(context.Background())
func main() { func main() {
config.InitConfig() config.InitConfig()
myApp := app.New() myApp := app.New()
myWindow := myApp.NewWindow("抖音数据上传工具") myWindow := myApp.NewWindow("抖音数据上传工具")
myWindow.Resize(fyne.NewSize(900, 550)) myWindow.Resize(fyne.NewSize(930, 600))
logText.Scroll = container.ScrollVerticalOnly logText.Scroll = container.ScrollVerticalOnly
@@ -63,11 +64,22 @@ func main() {
threadCountLabel.OnChanged = func(s string) { threadCountLabel.OnChanged = func(s string) {
i, err := strconv.Atoi(threadCountLabel.Text) i, err := strconv.Atoi(threadCountLabel.Text)
if err != nil { if err != nil {
AddLog("输入上传线程错误") AddLog("输入 上传线程 错误")
} }
config.WriteConfig("thread-count", i) config.WriteConfig("thread-count", i)
} }
handleFileCountLabel := widget.NewEntry()
handleFileCountLabel.SetPlaceHolder("50")
handleFileCountLabel.Text = strconv.Itoa(config.APPConfig.HandleFileCount)
handleFileCountLabel.OnChanged = func(s string) {
i, err := strconv.Atoi(handleFileCountLabel.Text)
if err != nil {
AddLog("输入 同时处理文件数 错误")
}
config.WriteConfig("handle-file-count", i)
}
isRunOnStartWidget := widget.NewCheck("启动程序时启动上传程序", func(b bool) { isRunOnStartWidget := widget.NewCheck("启动程序时启动上传程序", func(b bool) {
config.WriteConfig("is-run-on-start", b) config.WriteConfig("is-run-on-start", b)
}) })
@@ -86,19 +98,48 @@ func main() {
// 开始运行按钮 // 开始运行按钮
startBtn := widget.NewButton("开始运行", func() { startBtn := widget.NewButton("开始运行", func() {
s := "==============================="
AddLog(s)
if strings.TrimSpace(tokenEntry.Text) == "" { if strings.TrimSpace(tokenEntry.Text) == "" {
AddLog("错误请输入Token") AddLog("错误请输入Token")
return return
} }
AddLog(fmt.Sprintf("服务器地址:%s", config.APPConfig.Url))
AddLog(fmt.Sprintf("Token%s", tokenEntry.Text)) AddLog(fmt.Sprintf("Token%s", config.APPConfig.Token))
AddLog(fmt.Sprintf("检测目录:%s", selectedDirLabel.Text)) AddLog(fmt.Sprintf("检测目录:%s", config.APPConfig.LookingPath))
AddLog("===============================") AddLog(fmt.Sprintf("同时处理文件数:%v", config.APPConfig.HandleFileCount))
AddLog(fmt.Sprintf("单文件上传线程:%v", config.APPConfig.ThreadCount))
AddLog(s)
isRun = true isRun = true
go StartLooking(selectedDirLabel.Text) go StartLooking(ctx, selectedDirLabel.Text)
}) })
stopBtn := widget.NewButton("停止运行", func() {
cancel()
ctx, cancel = context.WithCancel(context.Background())
isRun = false
})
// 按钮状态同步
go func() {
for {
if isRun && (!startBtn.Disabled() || stopBtn.Disabled()) {
fyne.Do(func() {
startBtn.Disable()
stopBtn.Enable()
})
} else if !isRun && (startBtn.Disabled() || !stopBtn.Disabled()) {
fyne.Do(func() {
startBtn.Enable()
stopBtn.Disable()
})
}
time.Sleep(100 * time.Microsecond)
}
}()
// 清除日志按钮 // 清除日志按钮
clearLogBtn := widget.NewButton("清除日志", func() { clearLogBtn := widget.NewButton("清除日志", func() {
logText.SetText("") logText.SetText("")
@@ -110,8 +151,9 @@ func main() {
nil, nil,
// 底部 - 放置按钮 // 底部 - 放置按钮
container.NewVBox( container.NewVBox(
//isRunOnStartWidget, isRunOnStartWidget,
startBtn, startBtn,
stopBtn,
clearLogBtn, clearLogBtn,
), ),
nil, nil,
@@ -128,6 +170,10 @@ func main() {
selectedDirLabel, selectedDirLabel,
selectDirBtn, selectDirBtn,
widget.NewSeparator(), widget.NewSeparator(),
container.NewHBox(
widget.NewLabelWithStyle("同时处理文件数:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}),
handleFileCountLabel,
),
container.NewHBox( container.NewHBox(
widget.NewLabelWithStyle("单文件上传线程:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}), widget.NewLabelWithStyle("单文件上传线程:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}),
threadCountLabel, threadCountLabel,
@@ -147,6 +193,29 @@ func main() {
splitContainer := container.NewHSplit(leftPanel, rightPanel) splitContainer := container.NewHSplit(leftPanel, rightPanel)
splitContainer.SetOffset(0.35) // 左侧占35%宽度 splitContainer.SetOffset(0.35) // 左侧占35%宽度
go func() {
if !config.APPConfig.IsRunOnStart {
return
}
time.Sleep(time.Second)
s := "==============================="
AddLog(s)
if strings.TrimSpace(tokenEntry.Text) == "" {
AddLog("错误请输入Token")
return
}
AddLog(fmt.Sprintf("服务器地址:%s", config.APPConfig.Url))
AddLog(fmt.Sprintf("Token%s", config.APPConfig.Token))
AddLog(fmt.Sprintf("检测目录:%s", config.APPConfig.LookingPath))
AddLog(fmt.Sprintf("同时处理文件数:%v", config.APPConfig.HandleFileCount))
AddLog(fmt.Sprintf("单文件上传线程:%v", config.APPConfig.ThreadCount))
AddLog(s)
isRun = true
go StartLooking(ctx, selectedDirLabel.Text)
}()
myWindow.SetContent(splitContainer) myWindow.SetContent(splitContainer)
myWindow.ShowAndRun() myWindow.ShowAndRun()
} }
@@ -159,7 +228,9 @@ func AddLog(message string) {
}) })
} }
//
// 上传数据代码 // 上传数据代码
//
var httpClient = &http.Client{ var httpClient = &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
@@ -175,10 +246,11 @@ type Task struct {
FileLines int FileLines int
} }
func StartLooking(lookingPath string) { func StartLooking(ctx context.Context, lookingPath string) {
//检测./ t := time.NewTicker(time.Minute)
AddLog("程序启动成功正在检测txt文件") defer t.Stop()
for {
f := func() {
var path = "./" var path = "./"
if lookingPath != "" { if lookingPath != "" {
path = lookingPath path = lookingPath
@@ -186,14 +258,23 @@ func StartLooking(lookingPath string) {
files, err := getTxtFiles(path) files, err := getTxtFiles(path)
if err != nil { if err != nil {
AddLog(err.Error()) AddLog(err.Error())
continue return
}
if files == nil {
return
} }
if files != nil {
start := time.Now()
//检测到文件
start := time.Now()
//统计文件行数
fileLines := make(map[string]int) fileLines := make(map[string]int)
AddLog(fmt.Sprintf("正在统计 %v 个文件行数", len(files))) AddLog(fmt.Sprintf("正在统计 %v 个文件行数", len(files)))
for _, filePath := range files { for _, filePath := range files {
select {
case <-ctx.Done():
AddLog("上传程序已退出")
return
default:
file, err := os.Open(filePath) file, err := os.Open(filePath)
if err != nil { if err != nil {
AddLog("打开文件失败:" + err.Error()) AddLog("打开文件失败:" + err.Error())
@@ -210,17 +291,18 @@ func StartLooking(lookingPath string) {
} }
fileLines[filepath.Base(filePath)] = lineCount fileLines[filepath.Base(filePath)] = lineCount
AddLog(fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) AddLog(fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount))
}
} }
//添加任务
var tasks []Task var tasks []Task
for k, v := range fileLines { for k, v := range fileLines {
tasks = append(tasks, Task{FilePath: k, FileLines: v}) tasks = append(tasks, Task{FilePath: k, FileLines: v})
} }
// 使用errgroup控制并发 // 使用errgroup控制并发
g, ctx := errgroup.WithContext(context.Background()) g, ctx := errgroup.WithContext(ctx)
g.SetLimit(50) // 设置最大并发数为50 g.SetLimit(config.APPConfig.HandleFileCount) // 设置最大同时处理文件数为50
// 执行所有任务 // 执行所有任务
for _, task := range tasks { for _, task := range tasks {
task := task // 创建局部变量 task := task // 创建局部变量
@@ -230,7 +312,7 @@ func StartLooking(lookingPath string) {
return ctx.Err() return ctx.Err()
default: default:
AddLog("正在上传文件:" + filepath.Base(task.FilePath)) AddLog("正在上传文件:" + filepath.Base(task.FilePath))
processFile(task.FilePath, task.FileLines) processFile(ctx, task.FilePath, task.FileLines)
err := os.Truncate(task.FilePath, 0) err := os.Truncate(task.FilePath, 0)
if err != nil { if err != nil {
AddLog("清空文件失败:" + err.Error()) AddLog("清空文件失败:" + err.Error())
@@ -249,9 +331,17 @@ func StartLooking(lookingPath string) {
AddLog(fmt.Sprintf("上传完成,耗时:%s\n", time.Since(start))) AddLog(fmt.Sprintf("上传完成,耗时:%s\n", time.Since(start)))
} }
time.Sleep(time.Minute)
} for {
f()
select {
case <-ctx.Done():
AddLog("上传程序已退出") AddLog("上传程序已退出")
return
case <-t.C:
f()
}
}
} }
// 获取目录中的所有txt文件 // 获取目录中的所有txt文件
@@ -277,7 +367,7 @@ func getTxtFiles(dir string) (txtFiles []string, err error) {
return txtFiles, err return txtFiles, err
} }
func processFile(filePath string, fileLines int) { func processFile(ctx context.Context, filePath string, fileLines int) {
var wg sync.WaitGroup var wg sync.WaitGroup
// 打开文件 // 打开文件
file, err := os.Open(filePath) file, err := os.Open(filePath)
@@ -289,13 +379,12 @@ func processFile(filePath string, fileLines int) {
// 创建行通道 // 创建行通道
lines := make(chan string, 100) lines := make(chan string, 100)
// 创建10个worker处理文件上传 // 创建10个worker处理文件上传
for i := 0; i < config.APPConfig.ThreadCount; i++ { for i := 0; i < config.APPConfig.ThreadCount; i++ {
wg.Add(1) wg.Add(1)
go func(workerID int) { go func(workerID int) {
defer wg.Done() processLines(ctx, lines, workerID, filePath)
processLines(lines, workerID, filePath) wg.Done()
}(i) }(i)
} }
@@ -321,8 +410,12 @@ func processFile(filePath string, fileLines int) {
AddLog(fmt.Sprintf("文件【%s】处理完成共处理 %d 行数据\n", filePath, lineCount)) AddLog(fmt.Sprintf("文件【%s】处理完成共处理 %d 行数据\n", filePath, lineCount))
} }
func processLines(lines <-chan string, workerID int, filePath string) { func processLines(ctx context.Context, lines <-chan string, workerID int, filePath string) {
for line := range lines { for line := range lines {
select {
case <-ctx.Done():
return
default:
// 跳过空行 // 跳过空行
if strings.TrimSpace(line) == "" { if strings.TrimSpace(line) == "" {
continue continue
@@ -332,4 +425,5 @@ func processLines(lines <-chan string, workerID int, filePath string) {
AddLog(fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)) AddLog(fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err))
} }
} }
}
} }