Files
dypid/tool/update-dypid.go

224 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bufio"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
)
type UploadData struct {
Dyid string // dyid
Uid string // uid
Secid string // secid
Pid string // pid
CommentId string // comment_id
Id1 string // id1
Id2 string // id2
Id3 string // id3
Id4 string // id4
Id5 string // id5
}
var httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 200,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 30 * time.Second,
},
Timeout: 30 * time.Second,
}
func main() {
//程序配置
viper.SetDefault("url", "http://localhost:8080")
viper.SetDefault("token", "")
viper.SetDefault("thread-count", 10)
viper.SetDefault("index.uid", 0)
viper.SetDefault("index.secid", 1)
viper.SetDefault("index.pid", 2)
viper.SetDefault("index.comment-id", 3)
//设置配置文件名和路径 ./config.toml
viper.AddConfigPath(".")
viper.SetConfigName("config")
viper.SetConfigType("toml")
viper.SafeWriteConfig() //安全写入默认配置
//读取配置文件
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
})
//检测./upload
fmt.Println("程序启动成功正在检测txt文件")
//os.Mkdir("./upload", os.ModePerm)
for {
files, err := getTxtFiles("./")
if err != nil {
fmt.Println(err)
return
}
if files != nil {
wg := sync.WaitGroup{}
for _, filePath := range files {
fmt.Println("正在上传文件:", filePath)
wg.Add(1)
go func() {
processFile(filePath)
os.Remove(filePath)
wg.Done()
}()
}
wg.Wait()
}
time.Sleep(2 * time.Second)
}
}
func uploadDataToServer(data UploadData) error {
params := url.Values{}
params.Add("token", viper.GetString("token"))
params.Add("dyid", data.Dyid)
params.Add("uid", data.Uid)
params.Add("secid", data.Secid)
params.Add("pid", data.Pid)
params.Add("comment_id", data.CommentId)
params.Add("id1", data.Id1)
params.Add("id2", data.Id2)
params.Add("id3", data.Id3)
params.Add("id4", data.Id4)
params.Add("id5", data.Id5)
resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader(""))
if err != nil {
return err
}
if resp != nil {
_, _ = io.Copy(io.Discard, resp.Body)
}
return err
}
// 获取目录中的所有txt文件
func getTxtFiles(dir string) ([]string, error) {
var txtFiles []string
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 只处理普通文件,跳过目录
if !info.Mode().IsRegular() {
return nil
}
// 检查文件扩展名是否为.txt
if strings.ToLower(filepath.Ext(path)) == ".txt" {
txtFiles = append(txtFiles, path)
}
return nil
})
return txtFiles, err
}
func processFile(filePath string) {
var wg sync.WaitGroup
// 打开文件
file, err := os.Open(filePath)
if err != nil {
fmt.Printf("无法打开文件 %s: %v\n", filePath, err)
return
}
defer file.Close()
// 创建行通道
lines := make(chan string, 100)
// 创建10个worker处理文件上传
for i := 0; i < viper.GetInt("thread-count"); i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
processLines(lines, workerID, filePath)
}(i)
}
// 读取文件并发送到通道
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
line := scanner.Text()
lines <- line
lineCount++
if lineCount%10000 == 0 {
fmt.Printf("文件【%s】处理进度%v%%\n", filePath, float64(lineCount)/40000*100)
}
}
close(lines)
wg.Wait()
if err := scanner.Err(); err != nil {
fmt.Printf("读取文件 %s 错误: %v\n", filePath, err)
}
fmt.Printf("文件【%s】处理完成共处理 %d 行数据\n", filePath, lineCount)
}
func processLines(lines <-chan string, workerID int, filePath string) {
for line := range lines {
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 使用----分割行数据
parts := strings.Split(line, "----")
for i := 0; i < 10; i++ {
parts = append(parts, "")
}
// 确保有足够的字段
if len(parts) < 3 {
fmt.Printf("Worker %d (文件 %s): 行数据字段不足,跳过: %s\n", workerID, filePath, line)
continue
}
// 创建上传数据结构
uploadData := UploadData{
Uid: parts[viper.GetInt("index.uid")],
Secid: parts[viper.GetInt("index.secid")],
Pid: parts[viper.GetInt("index.pid")],
CommentId: parts[viper.GetInt("index.comment-id")],
Dyid: parts[4],
Id1: parts[5],
Id2: parts[6],
Id3: parts[7],
Id4: parts[8],
Id5: parts[9],
}
// 上传数据
if err := uploadDataToServer(uploadData); err != nil {
fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)
}
}
}