package main import ( "bufio" "context" "dypid-client/api" "dypid-client/config" "dypid-client/utils/folder" "fmt" "net/http" "os" "path/filepath" "strconv" "strings" "sync" "time" "fyne.io/fyne/v2" "fyne.io/fyne/v2/app" "fyne.io/fyne/v2/container" "fyne.io/fyne/v2/layout" "fyne.io/fyne/v2/widget" "golang.org/x/sync/errgroup" ) var isRun = false var logText = widget.NewMultiLineEntry() func main() { config.InitConfig() myApp := app.New() myWindow := myApp.NewWindow("抖音数据上传工具") myWindow.Resize(fyne.NewSize(900, 550)) logText.Scroll = container.ScrollVerticalOnly // 创建界面组件 urlEntry := widget.NewEntry() urlEntry.SetPlaceHolder("http://127.0.0.1:8080") urlEntry.Text = config.APPConfig.Url urlEntry.OnChanged = func(s string) { config.WriteConfig("url", urlEntry.Text) } tokenEntry := widget.NewEntry() tokenEntry.SetPlaceHolder("请输入Token") tokenEntry.Text = config.APPConfig.Token tokenEntry.OnChanged = func(s string) { config.WriteConfig("token", tokenEntry.Text) } selectedDirLabel := widget.NewEntry() selectedDirLabel.SetPlaceHolder("未选择目录(默认为程序运行目录)") selectedDirLabel.Text = config.APPConfig.LookingPath selectedDirLabel.OnChanged = func(s string) { config.WriteConfig("looking-path", selectedDirLabel.Text) } threadCountLabel := widget.NewEntry() threadCountLabel.SetPlaceHolder("10") threadCountLabel.Text = strconv.Itoa(config.APPConfig.ThreadCount) threadCountLabel.OnChanged = func(s string) { i, err := strconv.Atoi(threadCountLabel.Text) if err != nil { AddLog("输入上传线程错误") } config.WriteConfig("thread-count", i) } isRunOnStartWidget := widget.NewCheck("启动程序时启动上传程序", func(b bool) { config.WriteConfig("is-run-on-start", b) }) isRunOnStartWidget.Checked = config.APPConfig.IsRunOnStart // 使用Windows原生目录选择按钮 selectDirBtn := widget.NewButton("选择检测目录", func() { // 调用CGO实现的Windows原生对话框 selectedPath := folder.OpenFolderDialog() if selectedPath == "" { return } selectedDirLabel.SetText(selectedPath) config.WriteConfig("looking-path", selectedPath) }) // 开始运行按钮 startBtn := widget.NewButton("开始运行", func() { if strings.TrimSpace(tokenEntry.Text) == "" { AddLog("错误:请输入Token") return } AddLog(fmt.Sprintf("Token:%s", tokenEntry.Text)) AddLog(fmt.Sprintf("检测目录:%s", selectedDirLabel.Text)) AddLog("===============================") isRun = true go StartLooking(selectedDirLabel.Text) }) // 清除日志按钮 clearLogBtn := widget.NewButton("清除日志", func() { logText.SetText("") AddLog("日志已清除") }) // 组装左侧面板 leftPanel := container.NewBorder( nil, // 底部 - 放置按钮 container.NewVBox( isRunOnStartWidget, startBtn, clearLogBtn, ), nil, nil, // 中间内容 container.NewVBox( widget.NewLabelWithStyle("服务器地址:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}), urlEntry, widget.NewSeparator(), widget.NewLabelWithStyle("Token:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}), tokenEntry, widget.NewSeparator(), widget.NewLabelWithStyle("检测目录:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}), selectedDirLabel, selectDirBtn, widget.NewSeparator(), container.NewHBox( widget.NewLabelWithStyle("单文件上传线程:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}), threadCountLabel, ), widget.NewSeparator(), layout.NewSpacer(), // 添加一个弹性空间,将内容向上推 ), ) // 组装右侧面板(日志显示) rightPanel := container.NewBorder( widget.NewLabelWithStyle("运行日志", fyne.TextAlignCenter, fyne.TextStyle{Bold: true}), nil, nil, nil, container.NewScroll(logText), ) // 使用HSplit容器创建可调整大小的左右分割布局 splitContainer := container.NewHSplit(leftPanel, rightPanel) splitContainer.SetOffset(0.35) // 左侧占35%宽度 myWindow.SetContent(splitContainer) myWindow.ShowAndRun() } func AddLog(message string) { fyne.Do(func() { logText.Append(message + "\n") // 自动滚动到底部 logText.CursorRow = len(logText.Text) }) } // 上传数据代码 var httpClient = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 200, MaxIdleConnsPerHost: 100, IdleConnTimeout: 30 * time.Second, }, Timeout: 30 * time.Second, } type Task struct { FilePath string FileLines int } func StartLooking(lookingPath string) { //检测./ AddLog("程序启动成功,正在检测txt文件") for { var path = "./" if lookingPath != "" { path = lookingPath } files, err := getTxtFiles(path) if err != nil { AddLog(err.Error()) continue } if files != nil { start := time.Now() fileLines := make(map[string]int) AddLog(fmt.Sprintf("正在统计 %v 个文件行数", len(files))) for _, filePath := range files { file, err := os.Open(filePath) if err != nil { AddLog("打开文件失败:" + err.Error()) } // 使用 bufio.Scanner 逐行读取 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lineCount++ } file.Close() if lineCount == 0 { continue } fileLines[filepath.Base(filePath)] = lineCount AddLog(fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount)) } var tasks []Task for k, v := range fileLines { tasks = append(tasks, Task{FilePath: k, FileLines: v}) } // 使用errgroup控制并发 g, ctx := errgroup.WithContext(context.Background()) g.SetLimit(50) // 设置最大并发数为50 // 执行所有任务 for _, task := range tasks { task := task // 创建局部变量 g.Go(func() error { select { case <-ctx.Done(): return ctx.Err() default: AddLog("正在上传文件:" + filepath.Base(task.FilePath)) processFile(task.FilePath, task.FileLines) err := os.Truncate(task.FilePath, 0) if err != nil { AddLog("清空文件失败:" + err.Error()) } return nil } }) } // 等待所有任务完成 if err := g.Wait(); err != nil { AddLog(fmt.Sprintf("任务执行出错: %v", err)) } else { AddLog("所有任务执行完成!") } AddLog(fmt.Sprintf("上传完成,耗时:%s\n", time.Since(start))) } time.Sleep(time.Minute) } AddLog("上传程序已退出") } // 获取目录中的所有txt文件 func getTxtFiles(dir string) (txtFiles []string, err error) { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 只处理普通文件,跳过目录 if !info.Mode().IsRegular() { return nil } // 检查文件扩展名是否为.txt if strings.ToLower(filepath.Ext(path)) == ".txt" { txtFiles = append(txtFiles, path) } return nil }) return txtFiles, err } func processFile(filePath string, fileLines int) { var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) if err != nil { AddLog(fmt.Sprintf("无法打开文件 %s: %v\n", filePath, err)) return } defer file.Close() // 创建行通道 lines := make(chan string, 100) // 创建10个worker处理文件上传 for i := 0; i < config.APPConfig.ThreadCount; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() processLines(lines, workerID, filePath) }(i) } // 读取文件并发送到通道 scanner := bufio.NewScanner(file) lineCount := 0 for scanner.Scan() { lines <- scanner.Text() lineCount++ if lineCount%10000 == 0 { AddLog(fmt.Sprintf("文件【%s】处理进度:%.2f%%\n", filePath, float64(lineCount)/float64(fileLines)*100)) } } close(lines) wg.Wait() if err := scanner.Err(); err != nil { AddLog(fmt.Sprintf("读取文件 %s 错误: %v\n", filePath, err)) return } AddLog(fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount)) } func processLines(lines <-chan string, workerID int, filePath string) { for line := range lines { // 跳过空行 if strings.TrimSpace(line) == "" { continue } // 上传数据 if err := api.UploadDataToServer(httpClient, line); err != nil { AddLog(fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)) } } }