feat: 优化Redis上传操作 添加文件上传工具
使用Lua脚本确保Redis数据检测重复与插入的原子化
This commit is contained in:
@@ -30,7 +30,9 @@ func InitConfig() {
|
|||||||
DB: 0,
|
DB: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
viper.SetDefault("redis", defaultConf)
|
viper.SetDefault("host", defaultConf.Host)
|
||||||
|
viper.SetDefault("run-mode", defaultConf.RunMode)
|
||||||
|
viper.SetDefault("redis", defaultConf.Redis)
|
||||||
//设置配置文件名和路径 ./config.toml
|
//设置配置文件名和路径 ./config.toml
|
||||||
viper.AddConfigPath(".")
|
viper.AddConfigPath(".")
|
||||||
viper.SetConfigName("config")
|
viper.SetConfigName("config")
|
||||||
|
@@ -8,6 +8,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReadDataHandler(c *gin.Context) {
|
func ReadDataHandler(c *gin.Context) {
|
||||||
@@ -43,17 +44,55 @@ func WriteDataHandler(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
exists := global.RDB.BFExists(global.RCtx, fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject), c.Query(dedupObject))
|
luaScript := `
|
||||||
if exists.Val() {
|
local dedupKey = KEYS[1] -- KEYS[1]: 去重键 (dedup:token:object)
|
||||||
return
|
local listKey = KEYS[2] -- KEYS[2]: 列表键 (list:token)
|
||||||
}
|
local dedupValue = ARGV[1] -- ARGV[1]: 去重值
|
||||||
marshal, err := json.Marshal(data)
|
local jsonData = ARGV[2] -- ARGV[2]: JSON序列化的数据
|
||||||
|
|
||||||
|
-- 检查布隆过滤器中是否已存在该值
|
||||||
|
local exists = redis.call('BF.EXISTS', dedupKey, dedupValue)
|
||||||
|
|
||||||
|
-- 如果已存在,返回已去重标记
|
||||||
|
if exists == 1 then
|
||||||
|
return "已去重"
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 添加到布隆过滤器
|
||||||
|
redis.call('BF.ADD', dedupKey, dedupValue)
|
||||||
|
-- 添加到列表
|
||||||
|
redis.call('LPUSH', listKey, jsonData)
|
||||||
|
|
||||||
|
-- 返回成功结果
|
||||||
|
return "ok"
|
||||||
|
`
|
||||||
|
k1 := fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject)
|
||||||
|
k2 := fmt.Sprintf("list:%s", data.Token)
|
||||||
|
v1 := c.Query(dedupObject)
|
||||||
|
v2, err := json.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
c.JSON(500, gin.H{"error": "JSON序列化失败 " + err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
global.RDB.LPush(global.RCtx, fmt.Sprintf("list:%s", data.Token), marshal)
|
|
||||||
global.RDB.BFAdd(global.RCtx, fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject), c.Query(dedupObject))
|
result, err := redis.NewScript(luaScript).Run(
|
||||||
c.JSON(200, gin.H{"result": "ok"})
|
global.RCtx,
|
||||||
|
global.RDB,
|
||||||
|
[]string{k1, k2},
|
||||||
|
v1,
|
||||||
|
string(v2),
|
||||||
|
).Result()
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(500, gin.H{"error": "Redis操作失败 " + err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if resultMap, ok := result.(string); ok {
|
||||||
|
c.JSON(200, gin.H{"result": resultMap})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(500, gin.H{"error": "WriteDataHandler 错误"})
|
||||||
}
|
}
|
||||||
|
|
||||||
func createBF(bloomFilter string, errorRate float64, capacity int64) error {
|
func createBF(bloomFilter string, errorRate float64, capacity int64) error {
|
||||||
|
2
tool/.gitignore
vendored
Normal file
2
tool/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
./upload
|
||||||
|
config.toml
|
215
tool/update-dypid.go
Normal file
215
tool/update-dypid.go
Normal file
@@ -0,0 +1,215 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/fsnotify/fsnotify"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
)
|
||||||
|
|
||||||
|
type UploadData struct {
|
||||||
|
Dyid string // dyid
|
||||||
|
Uid string // uid
|
||||||
|
Secuid string // secuid
|
||||||
|
Pid string // pid
|
||||||
|
CommentId string // comment_id
|
||||||
|
Id1 string // id1
|
||||||
|
Id2 string // id2
|
||||||
|
Id3 string // id3
|
||||||
|
Id4 string // id4
|
||||||
|
Id5 string // id5
|
||||||
|
}
|
||||||
|
|
||||||
|
var httpClient = &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
MaxIdleConns: 100,
|
||||||
|
MaxIdleConnsPerHost: 10,
|
||||||
|
IdleConnTimeout: 30 * time.Second,
|
||||||
|
},
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
//程序配置
|
||||||
|
//设置默认值
|
||||||
|
viper.SetDefault("url", "http://localhost:8080")
|
||||||
|
viper.SetDefault("token", "")
|
||||||
|
viper.SetDefault("thread-count", 10)
|
||||||
|
//设置配置文件名和路径 ./config.toml
|
||||||
|
viper.AddConfigPath(".")
|
||||||
|
viper.SetConfigName("config")
|
||||||
|
viper.SetConfigType("toml")
|
||||||
|
viper.SafeWriteConfig() //安全写入默认配置
|
||||||
|
//读取配置文件
|
||||||
|
if err := viper.ReadInConfig(); err != nil {
|
||||||
|
fmt.Errorf("无法读取配置文件: %w", err)
|
||||||
|
}
|
||||||
|
viper.WatchConfig()
|
||||||
|
viper.OnConfigChange(func(e fsnotify.Event) {
|
||||||
|
if err := viper.ReadInConfig(); err != nil {
|
||||||
|
fmt.Errorf("无法读取配置文件: %w", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
//检测./upload
|
||||||
|
fmt.Println("程序启动成功,正在检测upload")
|
||||||
|
os.Mkdir("./upload", os.ModePerm)
|
||||||
|
for {
|
||||||
|
files, err := getTxtFiles("./upload")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if files != nil {
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for _, filePath := range files {
|
||||||
|
fmt.Println("正在上传文件:", filePath)
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
processFile(filePath)
|
||||||
|
os.Remove(filePath)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func uploadDataToServer(data UploadData) error {
|
||||||
|
params := url.Values{}
|
||||||
|
params.Add("token", viper.GetString("token"))
|
||||||
|
params.Add("dyid", data.Dyid)
|
||||||
|
params.Add("uid", data.Uid)
|
||||||
|
params.Add("secuid", data.Secuid)
|
||||||
|
params.Add("pid", data.Pid)
|
||||||
|
params.Add("comment_id", data.CommentId)
|
||||||
|
params.Add("id1", data.Id1)
|
||||||
|
params.Add("id2", data.Id2)
|
||||||
|
params.Add("id3", data.Id3)
|
||||||
|
params.Add("id4", data.Id4)
|
||||||
|
params.Add("id5", data.Id5)
|
||||||
|
|
||||||
|
resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader(""))
|
||||||
|
_, _ = io.Copy(io.Discard, resp.Body)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取目录中的所有txt文件
|
||||||
|
func getTxtFiles(dir string) ([]string, error) {
|
||||||
|
var txtFiles []string
|
||||||
|
|
||||||
|
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 只处理普通文件,跳过目录
|
||||||
|
if !info.Mode().IsRegular() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查文件扩展名是否为.txt
|
||||||
|
if strings.ToLower(filepath.Ext(path)) == ".txt" {
|
||||||
|
txtFiles = append(txtFiles, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return txtFiles, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func processFile(filePath string) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
// 打开文件
|
||||||
|
file, err := os.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("无法打开文件 %s: %v\n", filePath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
// 创建行通道
|
||||||
|
lines := make(chan string, 100)
|
||||||
|
|
||||||
|
// 创建10个worker处理文件上传
|
||||||
|
for i := 0; i < viper.GetInt("thread-count"); i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(workerID int) {
|
||||||
|
defer wg.Done()
|
||||||
|
processLines(lines, workerID, filePath)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取文件并发送到通道
|
||||||
|
scanner := bufio.NewScanner(file)
|
||||||
|
lineCount := 0
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
lines <- line
|
||||||
|
lineCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
close(lines)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if err := scanner.Err(); err != nil {
|
||||||
|
fmt.Printf("读取文件 %s 错误: %v\n", filePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("文件 %s 处理完成,共处理 %d 行数据\n", filePath, lineCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
func processLines(lines <-chan string, workerID int, filePath string) {
|
||||||
|
for line := range lines {
|
||||||
|
// 跳过空行
|
||||||
|
if strings.TrimSpace(line) == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用----分割行数据
|
||||||
|
parts := strings.Split(line, "----")
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
parts = append(parts, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 确保有足够的字段
|
||||||
|
if len(parts) < 3 {
|
||||||
|
fmt.Printf("Worker %d (文件 %s): 行数据字段不足,跳过: %s\n", workerID, filePath, line)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建上传数据结构
|
||||||
|
uploadData := UploadData{
|
||||||
|
Dyid: parts[0],
|
||||||
|
Uid: parts[1],
|
||||||
|
Secuid: parts[2],
|
||||||
|
Pid: parts[3],
|
||||||
|
CommentId: parts[4],
|
||||||
|
Id1: parts[5],
|
||||||
|
Id2: parts[6],
|
||||||
|
Id3: parts[7],
|
||||||
|
Id4: parts[8],
|
||||||
|
Id5: parts[9],
|
||||||
|
}
|
||||||
|
|
||||||
|
// 上传数据
|
||||||
|
if err := uploadDataToServer(uploadData); err != nil {
|
||||||
|
fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 添加短暂延迟避免服务器过载
|
||||||
|
//time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
@@ -5,7 +5,7 @@ import TokenDetailView from '@/views/TokenDetailView.vue'
|
|||||||
|
|
||||||
const routes = [
|
const routes = [
|
||||||
{path: '/', name: "TokenList", component: TokenListView},
|
{path: '/', name: "TokenList", component: TokenListView},
|
||||||
{path: '/token', name: "TokenDetail", component: TokenDetailView},
|
{path: '/', name: "TokenDetail", component: TokenDetailView},
|
||||||
];
|
];
|
||||||
|
|
||||||
const router = createRouter({
|
const router = createRouter({
|
||||||
|
@@ -24,6 +24,7 @@ const deleteDedup = () => {
|
|||||||
dedup_bf: true
|
dedup_bf: true
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
getInfo()
|
||||||
}
|
}
|
||||||
const deleteRedis = () => {
|
const deleteRedis = () => {
|
||||||
axios.delete('/api/token/info', {
|
axios.delete('/api/token/info', {
|
||||||
@@ -32,6 +33,7 @@ const deleteRedis = () => {
|
|||||||
cache_list: true
|
cache_list: true
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
getInfo()
|
||||||
}
|
}
|
||||||
|
|
||||||
getInfo()
|
getInfo()
|
||||||
|
Reference in New Issue
Block a user