336 lines
8.3 KiB
Go
336 lines
8.3 KiB
Go
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"context"
|
||
"dypid-client/api"
|
||
"dypid-client/config"
|
||
"dypid-client/utils/folder"
|
||
"fmt"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"fyne.io/fyne/v2"
|
||
"fyne.io/fyne/v2/app"
|
||
"fyne.io/fyne/v2/container"
|
||
"fyne.io/fyne/v2/layout"
|
||
"fyne.io/fyne/v2/widget"
|
||
"golang.org/x/sync/errgroup"
|
||
)
|
||
|
||
var isRun = false
|
||
var logText = widget.NewMultiLineEntry()
|
||
|
||
func main() {
|
||
config.InitConfig()
|
||
|
||
myApp := app.New()
|
||
myWindow := myApp.NewWindow("抖音数据上传工具")
|
||
myWindow.Resize(fyne.NewSize(900, 550))
|
||
|
||
logText.Scroll = container.ScrollVerticalOnly
|
||
|
||
// 创建界面组件
|
||
urlEntry := widget.NewEntry()
|
||
urlEntry.SetPlaceHolder("http://127.0.0.1:8080")
|
||
urlEntry.Text = config.APPConfig.Url
|
||
urlEntry.OnChanged = func(s string) {
|
||
config.WriteConfig("url", urlEntry.Text)
|
||
}
|
||
|
||
tokenEntry := widget.NewEntry()
|
||
tokenEntry.SetPlaceHolder("请输入Token")
|
||
tokenEntry.Text = config.APPConfig.Token
|
||
tokenEntry.OnChanged = func(s string) {
|
||
config.WriteConfig("token", tokenEntry.Text)
|
||
}
|
||
|
||
selectedDirLabel := widget.NewEntry()
|
||
selectedDirLabel.SetPlaceHolder("未选择目录(默认为程序运行目录)")
|
||
selectedDirLabel.Text = config.APPConfig.LookingPath
|
||
selectedDirLabel.OnChanged = func(s string) {
|
||
config.WriteConfig("looking-path", selectedDirLabel.Text)
|
||
}
|
||
|
||
threadCountLabel := widget.NewEntry()
|
||
threadCountLabel.SetPlaceHolder("10")
|
||
threadCountLabel.Text = strconv.Itoa(config.APPConfig.ThreadCount)
|
||
threadCountLabel.OnChanged = func(s string) {
|
||
i, err := strconv.Atoi(threadCountLabel.Text)
|
||
if err != nil {
|
||
AddLog("输入上传线程错误")
|
||
}
|
||
config.WriteConfig("thread-count", i)
|
||
}
|
||
|
||
isRunOnStartWidget := widget.NewCheck("启动程序时启动上传程序", func(b bool) {
|
||
config.WriteConfig("is-run-on-start", b)
|
||
})
|
||
isRunOnStartWidget.Checked = config.APPConfig.IsRunOnStart
|
||
|
||
// 使用Windows原生目录选择按钮
|
||
selectDirBtn := widget.NewButton("选择检测目录", func() {
|
||
// 调用CGO实现的Windows原生对话框
|
||
selectedPath := folder.OpenFolderDialog()
|
||
if selectedPath == "" {
|
||
return
|
||
}
|
||
selectedDirLabel.SetText(selectedPath)
|
||
config.WriteConfig("looking-path", selectedPath)
|
||
})
|
||
|
||
// 开始运行按钮
|
||
startBtn := widget.NewButton("开始运行", func() {
|
||
if strings.TrimSpace(tokenEntry.Text) == "" {
|
||
AddLog("错误:请输入Token")
|
||
return
|
||
}
|
||
|
||
AddLog(fmt.Sprintf("Token:%s", tokenEntry.Text))
|
||
AddLog(fmt.Sprintf("检测目录:%s", selectedDirLabel.Text))
|
||
AddLog("===============================")
|
||
|
||
isRun = true
|
||
go StartLooking(selectedDirLabel.Text)
|
||
})
|
||
|
||
// 清除日志按钮
|
||
clearLogBtn := widget.NewButton("清除日志", func() {
|
||
logText.SetText("")
|
||
AddLog("日志已清除")
|
||
})
|
||
|
||
// 组装左侧面板
|
||
leftPanel := container.NewBorder(
|
||
nil,
|
||
// 底部 - 放置按钮
|
||
container.NewVBox(
|
||
//isRunOnStartWidget,
|
||
startBtn,
|
||
clearLogBtn,
|
||
),
|
||
nil,
|
||
nil,
|
||
// 中间内容
|
||
container.NewVBox(
|
||
widget.NewLabelWithStyle("服务器地址:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}),
|
||
urlEntry,
|
||
widget.NewSeparator(),
|
||
widget.NewLabelWithStyle("Token:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}),
|
||
tokenEntry,
|
||
widget.NewSeparator(),
|
||
widget.NewLabelWithStyle("检测目录:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}),
|
||
selectedDirLabel,
|
||
selectDirBtn,
|
||
widget.NewSeparator(),
|
||
container.NewHBox(
|
||
widget.NewLabelWithStyle("单文件上传线程:", fyne.TextAlignLeading, fyne.TextStyle{Bold: true}),
|
||
threadCountLabel,
|
||
),
|
||
widget.NewSeparator(),
|
||
layout.NewSpacer(), // 添加一个弹性空间,将内容向上推
|
||
),
|
||
)
|
||
// 组装右侧面板(日志显示)
|
||
rightPanel := container.NewBorder(
|
||
widget.NewLabelWithStyle("运行日志", fyne.TextAlignCenter, fyne.TextStyle{Bold: true}),
|
||
nil, nil, nil,
|
||
container.NewScroll(logText),
|
||
)
|
||
|
||
// 使用HSplit容器创建可调整大小的左右分割布局
|
||
splitContainer := container.NewHSplit(leftPanel, rightPanel)
|
||
splitContainer.SetOffset(0.35) // 左侧占35%宽度
|
||
|
||
myWindow.SetContent(splitContainer)
|
||
myWindow.ShowAndRun()
|
||
}
|
||
|
||
func AddLog(message string) {
|
||
fyne.Do(func() {
|
||
logText.Append(message + "\n")
|
||
// 自动滚动到底部
|
||
logText.CursorRow = len(logText.Text)
|
||
})
|
||
}
|
||
|
||
// 上传数据代码
|
||
|
||
var httpClient = &http.Client{
|
||
Transport: &http.Transport{
|
||
MaxIdleConns: 200,
|
||
MaxIdleConnsPerHost: 100,
|
||
IdleConnTimeout: 30 * time.Second,
|
||
},
|
||
Timeout: 30 * time.Second,
|
||
}
|
||
|
||
type Task struct {
|
||
FilePath string
|
||
FileLines int
|
||
}
|
||
|
||
func StartLooking(lookingPath string) {
|
||
//检测./
|
||
AddLog("程序启动成功,正在检测txt文件")
|
||
for {
|
||
var path = "./"
|
||
if lookingPath != "" {
|
||
path = lookingPath
|
||
}
|
||
files, err := getTxtFiles(path)
|
||
if err != nil {
|
||
AddLog(err.Error())
|
||
continue
|
||
}
|
||
if files != nil {
|
||
start := time.Now()
|
||
|
||
fileLines := make(map[string]int)
|
||
AddLog(fmt.Sprintf("正在统计 %v 个文件行数", len(files)))
|
||
for _, filePath := range files {
|
||
file, err := os.Open(filePath)
|
||
if err != nil {
|
||
AddLog("打开文件失败:" + err.Error())
|
||
}
|
||
// 使用 bufio.Scanner 逐行读取
|
||
scanner := bufio.NewScanner(file)
|
||
lineCount := 0
|
||
for scanner.Scan() {
|
||
lineCount++
|
||
}
|
||
file.Close()
|
||
if lineCount == 0 {
|
||
continue
|
||
}
|
||
fileLines[filepath.Base(filePath)] = lineCount
|
||
AddLog(fmt.Sprintf("%s 文件行数:%v", filepath.Base(filePath), lineCount))
|
||
|
||
}
|
||
|
||
var tasks []Task
|
||
for k, v := range fileLines {
|
||
tasks = append(tasks, Task{FilePath: k, FileLines: v})
|
||
}
|
||
|
||
// 使用errgroup控制并发
|
||
g, ctx := errgroup.WithContext(context.Background())
|
||
g.SetLimit(50) // 设置最大并发数为50
|
||
// 执行所有任务
|
||
for _, task := range tasks {
|
||
task := task // 创建局部变量
|
||
g.Go(func() error {
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
default:
|
||
AddLog("正在上传文件:" + filepath.Base(task.FilePath))
|
||
processFile(task.FilePath, task.FileLines)
|
||
err := os.Truncate(task.FilePath, 0)
|
||
if err != nil {
|
||
AddLog("清空文件失败:" + err.Error())
|
||
}
|
||
return nil
|
||
}
|
||
})
|
||
}
|
||
|
||
// 等待所有任务完成
|
||
if err := g.Wait(); err != nil {
|
||
AddLog(fmt.Sprintf("任务执行出错: %v", err))
|
||
} else {
|
||
AddLog("所有任务执行完成!")
|
||
}
|
||
|
||
AddLog(fmt.Sprintf("上传完成,耗时:%s\n", time.Since(start)))
|
||
}
|
||
time.Sleep(time.Minute)
|
||
}
|
||
AddLog("上传程序已退出")
|
||
}
|
||
|
||
// 获取目录中的所有txt文件
|
||
func getTxtFiles(dir string) (txtFiles []string, err error) {
|
||
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 只处理普通文件,跳过目录
|
||
if !info.Mode().IsRegular() {
|
||
return nil
|
||
}
|
||
|
||
// 检查文件扩展名是否为.txt
|
||
if strings.ToLower(filepath.Ext(path)) == ".txt" {
|
||
txtFiles = append(txtFiles, path)
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
return txtFiles, err
|
||
}
|
||
|
||
func processFile(filePath string, fileLines int) {
|
||
var wg sync.WaitGroup
|
||
// 打开文件
|
||
file, err := os.Open(filePath)
|
||
if err != nil {
|
||
AddLog(fmt.Sprintf("无法打开文件 %s: %v\n", filePath, err))
|
||
return
|
||
}
|
||
defer file.Close()
|
||
|
||
// 创建行通道
|
||
lines := make(chan string, 100)
|
||
|
||
// 创建10个worker处理文件上传
|
||
for i := 0; i < config.APPConfig.ThreadCount; i++ {
|
||
wg.Add(1)
|
||
go func(workerID int) {
|
||
defer wg.Done()
|
||
processLines(lines, workerID, filePath)
|
||
}(i)
|
||
}
|
||
|
||
// 读取文件并发送到通道
|
||
scanner := bufio.NewScanner(file)
|
||
lineCount := 0
|
||
for scanner.Scan() {
|
||
lines <- scanner.Text()
|
||
lineCount++
|
||
if lineCount%10000 == 0 {
|
||
AddLog(fmt.Sprintf("文件【%s】处理进度:%.2f%%\n", filePath, float64(lineCount)/float64(fileLines)*100))
|
||
}
|
||
}
|
||
|
||
close(lines)
|
||
wg.Wait()
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
AddLog(fmt.Sprintf("读取文件 %s 错误: %v\n", filePath, err))
|
||
return
|
||
}
|
||
|
||
AddLog(fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount))
|
||
}
|
||
|
||
func processLines(lines <-chan string, workerID int, filePath string) {
|
||
for line := range lines {
|
||
// 跳过空行
|
||
if strings.TrimSpace(line) == "" {
|
||
continue
|
||
}
|
||
// 上传数据
|
||
if err := api.UploadDataToServer(httpClient, line); err != nil {
|
||
AddLog(fmt.Sprintf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err))
|
||
}
|
||
}
|
||
}
|