diff --git a/go.mod b/go.mod index e0e9580..c36def9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module dypid go 1.25 require ( - github.com/fsnotify/fsnotify v1.8.0 github.com/gin-contrib/cors v1.7.6 github.com/gin-gonic/gin v1.10.1 github.com/redis/go-redis/v9 v9.12.1 @@ -16,6 +15,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.5 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/gabriel-vasile/mimetype v1.4.9 // indirect github.com/gin-contrib/sse v1.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect @@ -43,7 +43,6 @@ require ( golang.org/x/arch v0.18.0 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/net v0.41.0 // indirect - golang.org/x/sync v0.17.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.26.0 // indirect google.golang.org/protobuf v1.36.6 // indirect diff --git a/go.sum b/go.sum index ad46009..0360803 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,6 @@ golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= diff --git a/tool/.gitignore b/tool/.gitignore deleted file mode 100644 index 0d7baac..0000000 --- a/tool/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -./upload -config.toml \ No newline at end of file diff --git a/tool/update-dypid.go b/tool/update-dypid.go deleted file mode 100644 index b252c34..0000000 --- a/tool/update-dypid.go +++ /dev/null @@ -1,226 +0,0 @@ -package main - -import ( - "bufio" - "context" - "fmt" - "io" - "net/http" - "net/url" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/spf13/viper" - "golang.org/x/sync/errgroup" -) - -var httpClient = &http.Client{ - Transport: &http.Transport{ - MaxIdleConns: 200, - MaxIdleConnsPerHost: 100, - IdleConnTimeout: 30 * time.Second, - }, - Timeout: 30 * time.Second, -} - -type Task struct { - FilePath string - FileLines int -} - -func main() { - initConfig() - - //检测./upload - fmt.Println("程序启动成功,正在检测txt文件") - for { - files, err := getTxtFiles("./") - if err != nil { - fmt.Println(err) - continue - } - if files != nil { - start := time.Now() - - fileLines := make(map[string]int) - fmt.Println("正在统计", len(files), "个文件行数") - for _, filePath := range files { - file, err := os.Open(filePath) - if err != nil { - fmt.Println("打开文件失败:", err) - } - // 使用 bufio.Scanner 逐行读取 - scanner := bufio.NewScanner(file) - lineCount := 0 - for scanner.Scan() { - lineCount++ - } - file.Close() - if lineCount == 0 { - continue - } - fileLines[filepath.Base(filePath)] = lineCount - fmt.Println(filepath.Base(filePath), "文件行数:", lineCount) - - } - - var tasks []Task - for k, v := range fileLines { - tasks = append(tasks, Task{FilePath: k, FileLines: v}) - } - - // 使用errgroup控制并发 - g, ctx := errgroup.WithContext(context.Background()) - g.SetLimit(50) // 设置最大并发数为50 - // 执行所有任务 - for _, task := range tasks { - task := task // 创建局部变量 - g.Go(func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - fmt.Println("正在上传文件:", filepath.Base(task.FilePath)) - processFile(task.FilePath, task.FileLines) - err := os.Truncate(task.FilePath, 0) - if err != nil { - fmt.Println("清空文件失败:", err) - } - return nil - } - }) - } - - // 等待所有任务完成 - if err := g.Wait(); err != nil { - fmt.Printf("任务执行出错: %v\n", err) - } else { - fmt.Println("所有任务执行完成!") - } - - fmt.Printf("上传完成,耗时:%s\n", time.Since(start)) - } - time.Sleep(time.Minute) - } -} - -func initConfig() { - //程序配置 - viper.SetDefault("url", "http://localhost:8080") - viper.SetDefault("token", "") - viper.SetDefault("thread-count", 10) - //设置配置文件名和路径 ./config.toml - viper.AddConfigPath(".") - viper.SetConfigName("config") - viper.SetConfigType("toml") - viper.SafeWriteConfig() //安全写入默认配置 - //读取配置文件 - if err := viper.ReadInConfig(); err != nil { - fmt.Errorf("无法读取配置文件: %w", err) - } - viper.WatchConfig() - viper.OnConfigChange(func(e fsnotify.Event) { - if err := viper.ReadInConfig(); err != nil { - fmt.Errorf("无法读取配置文件: %w", err) - } - }) -} - -func uploadDataToServer(data string) error { - params := url.Values{} - params.Set("token", viper.GetString("token")) - params.Set("data", data) - - resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader("")) - if err != nil { - return err - } - if resp != nil { - _, _ = io.Copy(io.Discard, resp.Body) - } - return err -} - -// 获取目录中的所有txt文件 -func getTxtFiles(dir string) (txtFiles []string, err error) { - err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - // 只处理普通文件,跳过目录 - if !info.Mode().IsRegular() { - return nil - } - - // 检查文件扩展名是否为.txt - if strings.ToLower(filepath.Ext(path)) == ".txt" { - txtFiles = append(txtFiles, path) - } - - return nil - }) - - return txtFiles, err -} - -func processFile(filePath string, fileLines int) { - var wg sync.WaitGroup - // 打开文件 - file, err := os.Open(filePath) - if err != nil { - fmt.Printf("无法打开文件 %s: %v\n", filePath, err) - return - } - defer file.Close() - - // 创建行通道 - lines := make(chan string, 100) - - // 创建10个worker处理文件上传 - for i := 0; i < viper.GetInt("thread-count"); i++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() - processLines(lines, workerID, filePath) - }(i) - } - - // 读取文件并发送到通道 - scanner := bufio.NewScanner(file) - lineCount := 0 - for scanner.Scan() { - lines <- scanner.Text() - lineCount++ - if lineCount%10000 == 0 { - fmt.Printf("文件【%s】处理进度:%.2f%%\n", filePath, float64(lineCount)/float64(fileLines)*100) - } - } - - close(lines) - wg.Wait() - - if err := scanner.Err(); err != nil { - fmt.Printf("读取文件 %s 错误: %v\n", filePath, err) - return - } - - fmt.Printf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount) -} - -func processLines(lines <-chan string, workerID int, filePath string) { - for line := range lines { - // 跳过空行 - if strings.TrimSpace(line) == "" { - continue - } - // 上传数据 - if err := uploadDataToServer(line); err != nil { - fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err) - } - } -}