diff --git a/config/config.go b/config/config.go index 87e5428..f71d176 100644 --- a/config/config.go +++ b/config/config.go @@ -8,11 +8,12 @@ import ( ) type Config struct { - Url string `mapstructure:"url"` - Token string `mapstructure:"token"` - ThreadCount int `mapstructure:"thread-count"` - IsRunOnStart bool `mapstructure:"is-run-on-start"` - LookingPath string `mapstructure:"looking-path"` + Url string `mapstructure:"url"` + Token string `mapstructure:"token"` + ThreadCount int `mapstructure:"thread-count"` + HandleFileCount int `mapstructure:"handle-file-count"` + IsRunOnStart bool `mapstructure:"is-run-on-start"` + LookingPath string `mapstructure:"looking-path"` } var APPConfig Config @@ -20,15 +21,17 @@ var APPConfig Config func InitConfig() { // 设置默认配置 defaultConfig := Config{ - Url: "http://127.0.0.1:8080", - Token: "", - ThreadCount: 10, - IsRunOnStart: false, - LookingPath: "", + Url: "http://127.0.0.1:8080", + Token: "", + ThreadCount: 10, + HandleFileCount: 50, + IsRunOnStart: false, + LookingPath: "", } viper.SetDefault("url", defaultConfig.Url) viper.SetDefault("token", defaultConfig.Token) viper.SetDefault("thread-count", defaultConfig.ThreadCount) + viper.SetDefault("handle-file-count", defaultConfig.HandleFileCount) viper.SetDefault("is-run-on-start", defaultConfig.IsRunOnStart) viper.SetDefault("looking-path", defaultConfig.LookingPath) diff --git a/main.go b/main.go index 10c1e56..185c48d 100644 --- a/main.go +++ b/main.go @@ -25,13 +25,14 @@ import ( var isRun = false var logText = widget.NewMultiLineEntry() +var ctx, cancel = context.WithCancel(context.Background()) func main() { config.InitConfig() myApp := app.New() myWindow := myApp.NewWindow("抖音数据上传工具") - myWindow.Resize(fyne.NewSize(900, 550)) + myWindow.Resize(fyne.NewSize(930, 600)) logText.Scroll = container.ScrollVerticalOnly @@ -63,11 +64,22 @@ func main() { threadCountLabel.OnChanged = func(s string) { i, err := strconv.Atoi(threadCountLabel.Text) if err != nil { - AddLog("输入上传线程错误") + AddLog("输入 上传线程 错误") } config.WriteConfig("thread-count", i) } + handleFileCountLabel := widget.NewEntry() + handleFileCountLabel.SetPlaceHolder("50") + handleFileCountLabel.Text = strconv.Itoa(config.APPConfig.HandleFileCount) + handleFileCountLabel.OnChanged = func(s string) { + i, err := strconv.Atoi(handleFileCountLabel.Text) + if err != nil { + AddLog("输入 同时处理文件数 错误") + } + config.WriteConfig("handle-file-count", i) + } + isRunOnStartWidget := widget.NewCheck("启动程序时启动上传程序", func(b bool) { config.WriteConfig("is-run-on-start", b) }) @@ -86,19 +98,48 @@ func main() { // 开始运行按钮 startBtn := widget.NewButton("开始运行", func() { + s := "===============================" + + AddLog(s) if strings.TrimSpace(tokenEntry.Text) == "" { AddLog("错误:请输入Token") return } - - AddLog(fmt.Sprintf("Token:%s", tokenEntry.Text)) - AddLog(fmt.Sprintf("检测目录:%s", selectedDirLabel.Text)) - AddLog("===============================") + AddLog(fmt.Sprintf("服务器地址:%s", config.APPConfig.Url)) + AddLog(fmt.Sprintf("Token:%s", config.APPConfig.Token)) + AddLog(fmt.Sprintf("检测目录:%s", config.APPConfig.LookingPath)) + AddLog(fmt.Sprintf("同时处理文件数:%v", config.APPConfig.HandleFileCount)) + AddLog(fmt.Sprintf("单文件上传线程:%v", config.APPConfig.ThreadCount)) + AddLog(s) isRun = true - go StartLooking(selectedDirLabel.Text) + go StartLooking(ctx, selectedDirLabel.Text) }) + stopBtn := widget.NewButton("停止运行", func() { + cancel() + ctx, cancel = context.WithCancel(context.Background()) + isRun = false + }) + + // 按钮状态同步 + go func() { + for { + if isRun && (!startBtn.Disabled() || stopBtn.Disabled()) { + fyne.Do(func() { + startBtn.Disable() + stopBtn.Enable() + }) + } else if !isRun && (startBtn.Disabled() || !stopBtn.Disabled()) { + fyne.Do(func() { + startBtn.Enable() + stopBtn.Disable() + }) + } + time.Sleep(100 * time.Microsecond) + } + }() + // 清除日志按钮 clearLogBtn := widget.NewButton("清除日志", func() { logText.SetText("") @@ -110,8 +151,9 @@ func main() { nil, // 底部 - 放置按钮 container.NewVBox( - //isRunOnStartWidget, + isRunOnStartWidget, startBtn, + stopBtn, clearLogBtn, ), nil, @@ -128,6 +170,10 @@ func main() { selectedDirLabel, selectDirBtn, widget.NewSeparator(), + container.NewHBox( + widget.NewLabelWithStyle("同时处理文件数:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}), + handleFileCountLabel, + ), container.NewHBox( widget.NewLabelWithStyle("单文件上传线程:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}), threadCountLabel, @@ -147,6 +193,29 @@ func main() { splitContainer := container.NewHSplit(leftPanel, rightPanel) splitContainer.SetOffset(0.35) // 左侧占35%宽度 + go func() { + if !config.APPConfig.IsRunOnStart { + return + } + time.Sleep(time.Second) + s := "===============================" + + AddLog(s) + if strings.TrimSpace(tokenEntry.Text) == "" { + AddLog("错误:请输入Token") + return + } + AddLog(fmt.Sprintf("服务器地址:%s", config.APPConfig.Url)) + AddLog(fmt.Sprintf("Token:%s", config.APPConfig.Token)) + AddLog(fmt.Sprintf("检测目录:%s", config.APPConfig.LookingPath)) + AddLog(fmt.Sprintf("同时处理文件数:%v", config.APPConfig.HandleFileCount)) + AddLog(fmt.Sprintf("单文件上传线程:%v", config.APPConfig.ThreadCount)) + AddLog(s) + + isRun = true + go StartLooking(ctx, selectedDirLabel.Text) + }() + myWindow.SetContent(splitContainer) myWindow.ShowAndRun() } @@ -159,7 +228,9 @@ func AddLog(message string) { }) } +// // 上传数据代码 +// var httpClient = &http.Client{ Transport: &http.Transport{ @@ -175,10 +246,11 @@ type Task struct { FileLines int } -func StartLooking(lookingPath string) { - //检测./ - AddLog("程序启动成功,正在检测txt文件") - for { +func StartLooking(ctx context.Context, lookingPath string) { + t := time.NewTicker(time.Minute) + defer t.Stop() + + f := func() { var path = "./" if lookingPath != "" { path = lookingPath @@ -186,14 +258,23 @@ func StartLooking(lookingPath string) { files, err := getTxtFiles(path) if err != nil { AddLog(err.Error()) - continue + return + } + if files == nil { + return } - if files != nil { - start := time.Now() - fileLines := make(map[string]int) - AddLog(fmt.Sprintf("正在统计 %v 个文件行数", len(files))) - for _, filePath := range files { + //检测到文件 + start := time.Now() + //统计文件行数 + fileLines := make(map[string]int) + AddLog(fmt.Sprintf("正在统计 %v 个文件行数", len(files))) + for _, filePath := range files { + select { + case <-ctx.Done(): + AddLog("上传程序已退出") + return + default: file, err := os.Open(filePath) if err != nil { AddLog("打开文件失败:" + err.Error()) @@ -210,48 +291,57 @@ func StartLooking(lookingPath string) { } fileLines[filepath.Base(filePath)] = lineCount AddLog(fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) - } - - var tasks []Task - for k, v := range fileLines { - tasks = append(tasks, Task{FilePath: k, FileLines: v}) - } - - // 使用errgroup控制并发 - g, ctx := errgroup.WithContext(context.Background()) - g.SetLimit(50) // 设置最大并发数为50 - // 执行所有任务 - for _, task := range tasks { - task := task // 创建局部变量 - g.Go(func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - AddLog("正在上传文件:" + filepath.Base(task.FilePath)) - processFile(task.FilePath, task.FileLines) - err := os.Truncate(task.FilePath, 0) - if err != nil { - AddLog("清空文件失败:" + err.Error()) - } - return nil - } - }) - } - - // 等待所有任务完成 - if err := g.Wait(); err != nil { - AddLog(fmt.Sprintf("任务执行出错: %v", err)) - } else { - AddLog("所有任务执行完成!") - } - - AddLog(fmt.Sprintf("上传完成,耗时:%s\n", time.Since(start))) } - time.Sleep(time.Minute) + + //添加任务 + var tasks []Task + for k, v := range fileLines { + tasks = append(tasks, Task{FilePath: k, FileLines: v}) + } + + // 使用errgroup控制并发 + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(config.APPConfig.HandleFileCount) // 设置最大同时处理文件数为50 + // 执行所有任务 + for _, task := range tasks { + task := task // 创建局部变量 + g.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + AddLog("正在上传文件:" + filepath.Base(task.FilePath)) + processFile(ctx, task.FilePath, task.FileLines) + err := os.Truncate(task.FilePath, 0) + if err != nil { + AddLog("清空文件失败:" + err.Error()) + } + return nil + } + }) + } + + // 等待所有任务完成 + if err := g.Wait(); err != nil { + AddLog(fmt.Sprintf("任务执行出错: %v", err)) + } else { + AddLog("所有任务执行完成!") + } + + AddLog(fmt.Sprintf("上传完成,耗时:%s\n", time.Since(start))) + } + + for { + f() + select { + case <-ctx.Done(): + AddLog("上传程序已退出") + return + case <-t.C: + f() + } } - AddLog("上传程序已退出") } // 获取目录中的所有txt文件 @@ -277,7 +367,7 @@ func getTxtFiles(dir string) (txtFiles []string, err error) { return txtFiles, err } -func processFile(filePath string, fileLines int) { +func processFile(ctx context.Context, filePath string, fileLines int) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) @@ -289,13 +379,12 @@ func processFile(filePath string, fileLines int) { // 创建行通道 lines := make(chan string, 100) - // 创建10个worker处理文件上传 for i := 0; i < config.APPConfig.ThreadCount; i++ { wg.Add(1) go func(workerID int) { - defer wg.Done() - processLines(lines, workerID, filePath) + processLines(ctx, lines, workerID, filePath) + wg.Done() }(i) } @@ -321,15 +410,20 @@ func processFile(filePath string, fileLines int) { AddLog(fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount)) } -func processLines(lines <-chan string, workerID int, filePath string) { +func processLines(ctx context.Context, lines <-chan string, workerID int, filePath string) { for line := range lines { - // 跳过空行 - if strings.TrimSpace(line) == "" { - continue - } - // 上传数据 - if err := api.UploadDataToServer(httpClient, line); err != nil { - AddLog(fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)) + select { + case <-ctx.Done(): + return + default: + // 跳过空行 + if strings.TrimSpace(line) == "" { + continue + } + // 上传数据 + if err := api.UploadDataToServer(httpClient, line); err != nil { + AddLog(fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)) + } } } }