Files
dypid/tool/update-dypid.go
YGXB_net b2c643cf73 feat: 优化Redis上传操作 添加文件上传工具
使用Lua脚本确保Redis数据检测重复与插入的原子化
2025-09-01 13:56:30 +08:00

216 lines
4.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bufio"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
)
type UploadData struct {
Dyid string // dyid
Uid string // uid
Secuid string // secuid
Pid string // pid
CommentId string // comment_id
Id1 string // id1
Id2 string // id2
Id3 string // id3
Id4 string // id4
Id5 string // id5
}
var httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
},
Timeout: 30 * time.Second,
}
func main() {
//程序配置
//设置默认值
viper.SetDefault("url", "http://localhost:8080")
viper.SetDefault("token", "")
viper.SetDefault("thread-count", 10)
//设置配置文件名和路径 ./config.toml
viper.AddConfigPath(".")
viper.SetConfigName("config")
viper.SetConfigType("toml")
viper.SafeWriteConfig() //安全写入默认配置
//读取配置文件
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
})
//检测./upload
fmt.Println("程序启动成功正在检测upload")
os.Mkdir("./upload", os.ModePerm)
for {
files, err := getTxtFiles("./upload")
if err != nil {
fmt.Println(err)
return
}
if files != nil {
wg := sync.WaitGroup{}
for _, filePath := range files {
fmt.Println("正在上传文件:", filePath)
wg.Add(1)
go func() {
processFile(filePath)
os.Remove(filePath)
wg.Done()
}()
}
wg.Wait()
}
time.Sleep(2 * time.Second)
}
}
func uploadDataToServer(data UploadData) error {
params := url.Values{}
params.Add("token", viper.GetString("token"))
params.Add("dyid", data.Dyid)
params.Add("uid", data.Uid)
params.Add("secuid", data.Secuid)
params.Add("pid", data.Pid)
params.Add("comment_id", data.CommentId)
params.Add("id1", data.Id1)
params.Add("id2", data.Id2)
params.Add("id3", data.Id3)
params.Add("id4", data.Id4)
params.Add("id5", data.Id5)
resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader(""))
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
// 获取目录中的所有txt文件
func getTxtFiles(dir string) ([]string, error) {
var txtFiles []string
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 只处理普通文件,跳过目录
if !info.Mode().IsRegular() {
return nil
}
// 检查文件扩展名是否为.txt
if strings.ToLower(filepath.Ext(path)) == ".txt" {
txtFiles = append(txtFiles, path)
}
return nil
})
return txtFiles, err
}
func processFile(filePath string) {
var wg sync.WaitGroup
// 打开文件
file, err := os.Open(filePath)
if err != nil {
fmt.Printf("无法打开文件 %s: %v\n", filePath, err)
return
}
defer file.Close()
// 创建行通道
lines := make(chan string, 100)
// 创建10个worker处理文件上传
for i := 0; i < viper.GetInt("thread-count"); i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
processLines(lines, workerID, filePath)
}(i)
}
// 读取文件并发送到通道
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
line := scanner.Text()
lines <- line
lineCount++
}
close(lines)
wg.Wait()
if err := scanner.Err(); err != nil {
fmt.Printf("读取文件 %s 错误: %v\n", filePath, err)
}
fmt.Printf("文件 %s 处理完成,共处理 %d 行数据\n", filePath, lineCount)
}
func processLines(lines <-chan string, workerID int, filePath string) {
for line := range lines {
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 使用----分割行数据
parts := strings.Split(line, "----")
for i := 0; i < 10; i++ {
parts = append(parts, "")
}
// 确保有足够的字段
if len(parts) < 3 {
fmt.Printf("Worker %d (文件 %s): 行数据字段不足,跳过: %s\n", workerID, filePath, line)
continue
}
// 创建上传数据结构
uploadData := UploadData{
Dyid: parts[0],
Uid: parts[1],
Secuid: parts[2],
Pid: parts[3],
CommentId: parts[4],
Id1: parts[5],
Id2: parts[6],
Id3: parts[7],
Id4: parts[8],
Id5: parts[9],
}
// 上传数据
if err := uploadDataToServer(uploadData); err != nil {
fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)
}
// 添加短暂延迟避免服务器过载
//time.Sleep(10 * time.Millisecond)
}
}