Compare commits

..

2 Commits

Author SHA1 Message Date
b2c643cf73 feat: 优化Redis上传操作 添加文件上传工具
使用Lua脚本确保Redis数据检测重复与插入的原子化
2025-09-01 13:56:30 +08:00
b22b4786e6 refactor: 移除token和key选项 2025-09-01 08:51:31 +08:00
7 changed files with 271 additions and 19 deletions

View File

@@ -30,7 +30,9 @@ func InitConfig() {
DB: 0, DB: 0,
}, },
} }
viper.SetDefault("redis", defaultConf) viper.SetDefault("host", defaultConf.Host)
viper.SetDefault("run-mode", defaultConf.RunMode)
viper.SetDefault("redis", defaultConf.Redis)
//设置配置文件名和路径 ./config.toml //设置配置文件名和路径 ./config.toml
viper.AddConfigPath(".") viper.AddConfigPath(".")
viper.SetConfigName("config") viper.SetConfigName("config")

View File

@@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/redis/go-redis/v9"
) )
func ReadDataHandler(c *gin.Context) { func ReadDataHandler(c *gin.Context) {
@@ -43,17 +44,55 @@ func WriteDataHandler(c *gin.Context) {
return return
} }
exists := global.RDB.BFExists(global.RCtx, fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject), c.Query(dedupObject)) luaScript := `
if exists.Val() { local dedupKey = KEYS[1] -- KEYS[1]: 去重键 (dedup:token:object)
return local listKey = KEYS[2] -- KEYS[2]: 列表键 (list:token)
} local dedupValue = ARGV[1] -- ARGV[1]: 去重值
marshal, err := json.Marshal(data) local jsonData = ARGV[2] -- ARGV[2]: JSON序列化的数据
-- 检查布隆过滤器中是否已存在该值
local exists = redis.call('BF.EXISTS', dedupKey, dedupValue)
-- 如果已存在,返回已去重标记
if exists == 1 then
return "已去重"
end
-- 添加到布隆过滤器
redis.call('BF.ADD', dedupKey, dedupValue)
-- 添加到列表
redis.call('LPUSH', listKey, jsonData)
-- 返回成功结果
return "ok"
`
k1 := fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject)
k2 := fmt.Sprintf("list:%s", data.Token)
v1 := c.Query(dedupObject)
v2, err := json.Marshal(data)
if err != nil { if err != nil {
c.JSON(500, gin.H{"error": "JSON序列化失败 " + err.Error()})
return return
} }
global.RDB.LPush(global.RCtx, fmt.Sprintf("list:%s", data.Token), marshal)
global.RDB.BFAdd(global.RCtx, fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject), c.Query(dedupObject)) result, err := redis.NewScript(luaScript).Run(
c.JSON(200, gin.H{"result": "ok"}) global.RCtx,
global.RDB,
[]string{k1, k2},
v1,
string(v2),
).Result()
if err != nil {
c.JSON(500, gin.H{"error": "Redis操作失败 " + err.Error()})
return
}
if resultMap, ok := result.(string); ok {
c.JSON(200, gin.H{"result": resultMap})
return
}
c.JSON(500, gin.H{"error": "WriteDataHandler 错误"})
} }
func createBF(bloomFilter string, errorRate float64, capacity int64) error { func createBF(bloomFilter string, errorRate float64, capacity int64) error {

2
tool/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
./upload
config.toml

215
tool/update-dypid.go Normal file
View File

@@ -0,0 +1,215 @@
package main
import (
"bufio"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
)
type UploadData struct {
Dyid string // dyid
Uid string // uid
Secuid string // secuid
Pid string // pid
CommentId string // comment_id
Id1 string // id1
Id2 string // id2
Id3 string // id3
Id4 string // id4
Id5 string // id5
}
var httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
},
Timeout: 30 * time.Second,
}
func main() {
//程序配置
//设置默认值
viper.SetDefault("url", "http://localhost:8080")
viper.SetDefault("token", "")
viper.SetDefault("thread-count", 10)
//设置配置文件名和路径 ./config.toml
viper.AddConfigPath(".")
viper.SetConfigName("config")
viper.SetConfigType("toml")
viper.SafeWriteConfig() //安全写入默认配置
//读取配置文件
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
})
//检测./upload
fmt.Println("程序启动成功正在检测upload")
os.Mkdir("./upload", os.ModePerm)
for {
files, err := getTxtFiles("./upload")
if err != nil {
fmt.Println(err)
return
}
if files != nil {
wg := sync.WaitGroup{}
for _, filePath := range files {
fmt.Println("正在上传文件:", filePath)
wg.Add(1)
go func() {
processFile(filePath)
os.Remove(filePath)
wg.Done()
}()
}
wg.Wait()
}
time.Sleep(2 * time.Second)
}
}
func uploadDataToServer(data UploadData) error {
params := url.Values{}
params.Add("token", viper.GetString("token"))
params.Add("dyid", data.Dyid)
params.Add("uid", data.Uid)
params.Add("secuid", data.Secuid)
params.Add("pid", data.Pid)
params.Add("comment_id", data.CommentId)
params.Add("id1", data.Id1)
params.Add("id2", data.Id2)
params.Add("id3", data.Id3)
params.Add("id4", data.Id4)
params.Add("id5", data.Id5)
resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader(""))
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
// 获取目录中的所有txt文件
func getTxtFiles(dir string) ([]string, error) {
var txtFiles []string
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 只处理普通文件,跳过目录
if !info.Mode().IsRegular() {
return nil
}
// 检查文件扩展名是否为.txt
if strings.ToLower(filepath.Ext(path)) == ".txt" {
txtFiles = append(txtFiles, path)
}
return nil
})
return txtFiles, err
}
func processFile(filePath string) {
var wg sync.WaitGroup
// 打开文件
file, err := os.Open(filePath)
if err != nil {
fmt.Printf("无法打开文件 %s: %v\n", filePath, err)
return
}
defer file.Close()
// 创建行通道
lines := make(chan string, 100)
// 创建10个worker处理文件上传
for i := 0; i < viper.GetInt("thread-count"); i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
processLines(lines, workerID, filePath)
}(i)
}
// 读取文件并发送到通道
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
line := scanner.Text()
lines <- line
lineCount++
}
close(lines)
wg.Wait()
if err := scanner.Err(); err != nil {
fmt.Printf("读取文件 %s 错误: %v\n", filePath, err)
}
fmt.Printf("文件 %s 处理完成,共处理 %d 行数据\n", filePath, lineCount)
}
func processLines(lines <-chan string, workerID int, filePath string) {
for line := range lines {
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 使用----分割行数据
parts := strings.Split(line, "----")
for i := 0; i < 10; i++ {
parts = append(parts, "")
}
// 确保有足够的字段
if len(parts) < 3 {
fmt.Printf("Worker %d (文件 %s): 行数据字段不足,跳过: %s\n", workerID, filePath, line)
continue
}
// 创建上传数据结构
uploadData := UploadData{
Dyid: parts[0],
Uid: parts[1],
Secuid: parts[2],
Pid: parts[3],
CommentId: parts[4],
Id1: parts[5],
Id2: parts[6],
Id3: parts[7],
Id4: parts[8],
Id5: parts[9],
}
// 上传数据
if err := uploadDataToServer(uploadData); err != nil {
fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)
}
// 添加短暂延迟避免服务器过载
//time.Sleep(10 * time.Millisecond)
}
}

View File

@@ -5,7 +5,7 @@ import TokenDetailView from '@/views/TokenDetailView.vue'
const routes = [ const routes = [
{path: '/', name: "TokenList", component: TokenListView}, {path: '/', name: "TokenList", component: TokenListView},
{path: '/token', name: "TokenDetail", component: TokenDetailView}, {path: '/', name: "TokenDetail", component: TokenDetailView},
]; ];
const router = createRouter({ const router = createRouter({

View File

@@ -24,6 +24,7 @@ const deleteDedup = () => {
dedup_bf: true dedup_bf: true
} }
}) })
getInfo()
} }
const deleteRedis = () => { const deleteRedis = () => {
axios.delete('/api/token/info', { axios.delete('/api/token/info', {
@@ -32,6 +33,7 @@ const deleteRedis = () => {
cache_list: true cache_list: true
} }
}) })
getInfo()
} }
getInfo() getInfo()

View File

@@ -12,10 +12,6 @@ axios.get("/api/token").then(res => {
const input = ref('') const input = ref('')
const value = ref('') const value = ref('')
const options = [ const options = [
{
value: 'token',
label: 'token',
},
{ {
value: 'uid', value: 'uid',
label: 'uid', label: 'uid',
@@ -32,10 +28,6 @@ const options = [
value: 'dyid', value: 'dyid',
label: 'dyid', label: 'dyid',
}, },
{
value: 'key',
label: 'key',
},
{ {
value: 'comment_id', value: 'comment_id',
label: 'comment_id', label: 'comment_id',