feat(data): 添加数据格式支持 更改数据写入读取
All checks were successful
构建Docker镜像 / build-and-deploy (push) Successful in 1m33s
All checks were successful
构建Docker镜像 / build-and-deploy (push) Successful in 1m33s
This commit is contained in:
@@ -3,52 +3,65 @@ package controller
|
||||
import (
|
||||
"dypid/db"
|
||||
"dypid/global"
|
||||
"dypid/model"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func ReadDataHandler(c *gin.Context) {
|
||||
lLen := global.RDB.LLen(global.RCtx, fmt.Sprintf("list:%s", c.Query("token")))
|
||||
input := struct {
|
||||
Token string `form:"token" binding:"required"`
|
||||
}{}
|
||||
if err := c.ShouldBindQuery(&input); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "参数不能为空 " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
lLen := global.RDB.LLen(global.RCtx, fmt.Sprintf("list:%s", input.Token))
|
||||
if lLen.Val() == 0 {
|
||||
c.JSON(200, gin.H{"result": "数据库没有数据"})
|
||||
c.JSON(http.StatusOK, gin.H{"result": "数据库没有数据"})
|
||||
return
|
||||
}
|
||||
retData := global.RDB.BLPop(global.RCtx, 0, fmt.Sprintf("list:%s", c.Query("token")))
|
||||
newData := model.Data{}
|
||||
err := json.Unmarshal([]byte(retData.Val()[1]), &newData)
|
||||
if err != nil {
|
||||
c.JSON(400, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(200, gin.H{"result": newData})
|
||||
retData := global.RDB.BLPop(global.RCtx, 0, fmt.Sprintf("list:%s", input.Token)).Val()[1]
|
||||
|
||||
c.String(http.StatusOK, retData)
|
||||
}
|
||||
|
||||
func WriteDataHandler(c *gin.Context) {
|
||||
data := model.Data{}
|
||||
|
||||
if err := c.BindQuery(&data); err != nil {
|
||||
c.JSON(400, gin.H{"error": err.Error()})
|
||||
input := struct {
|
||||
Token string `form:"token" binding:"required"`
|
||||
Data string `form:"data" binding:"required"`
|
||||
}{}
|
||||
if err := c.ShouldBindQuery(&input); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "参数不能为空 " + err.Error()})
|
||||
return
|
||||
}
|
||||
dedupObject, err := db.GetDedupObject(data.Token)
|
||||
|
||||
dedupObject, err := db.GetDedupObject(input.Token)
|
||||
if err != nil {
|
||||
c.JSON(400, gin.H{"error": err.Error()})
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
err = createBF(fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject), 0.01, 100000000)
|
||||
if err != nil && err.Error() != "ERR item exists" {
|
||||
c.JSON(400, gin.H{"error": err.Error()})
|
||||
dataIndex, err := getDataIndex(input.Token)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
dedupValue := strings.Split(input.Data, "----")[dataIndex[dedupObject]]
|
||||
|
||||
err = createBF(fmt.Sprintf("dedup:%s:%s", input.Token, dedupObject), 0.01, 100000000)
|
||||
if err != nil && err.Error() != "ERR item exists" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
luaScript := `
|
||||
local dedupKey = KEYS[1] -- KEYS[1]: 去重键 (dedup:token:object)
|
||||
local listKey = KEYS[2] -- KEYS[2]: 列表键 (list:token)
|
||||
local dedupValue = ARGV[1] -- ARGV[1]: 去重值
|
||||
local jsonData = ARGV[2] -- ARGV[2]: JSON序列化的数据
|
||||
local rawData = ARGV[2] -- ARGV[2]: 原始数据
|
||||
|
||||
-- 检查布隆过滤器中是否已存在该值
|
||||
local exists = redis.call('BF.EXISTS', dedupKey, dedupValue)
|
||||
@@ -61,41 +74,49 @@ end
|
||||
-- 添加到布隆过滤器
|
||||
redis.call('BF.ADD', dedupKey, dedupValue)
|
||||
-- 添加到列表
|
||||
redis.call('LPUSH', listKey, jsonData)
|
||||
redis.call('LPUSH', listKey, rawData)
|
||||
|
||||
-- 返回成功结果
|
||||
return "ok"
|
||||
`
|
||||
k1 := fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject)
|
||||
k2 := fmt.Sprintf("list:%s", data.Token)
|
||||
v1 := c.Query(dedupObject)
|
||||
v2, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
c.JSON(500, gin.H{"error": "JSON序列化失败 " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
result, err := redis.NewScript(luaScript).Run(
|
||||
global.RCtx,
|
||||
global.RDB,
|
||||
[]string{k1, k2},
|
||||
v1,
|
||||
string(v2),
|
||||
[]string{
|
||||
fmt.Sprintf("dedup:%s:%s", input.Token, dedupObject),
|
||||
fmt.Sprintf("list:%s", input.Token),
|
||||
},
|
||||
dedupValue,
|
||||
input.Data,
|
||||
).Result()
|
||||
if err != nil {
|
||||
c.JSON(500, gin.H{"error": "Redis操作失败 " + err.Error()})
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Redis操作失败 " + err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if resultMap, ok := result.(string); ok {
|
||||
c.JSON(200, gin.H{"result": resultMap})
|
||||
c.JSON(http.StatusOK, gin.H{"result": resultMap})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(500, gin.H{"error": "WriteDataHandler 错误"})
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "WriteDataHandler 错误"})
|
||||
}
|
||||
|
||||
func createBF(bloomFilter string, errorRate float64, capacity int64) error {
|
||||
_, err := global.RDB.BFReserve(global.RCtx, bloomFilter, errorRate, capacity).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
func getDataIndex(token string) (index map[string]int, err error) {
|
||||
dataFormat, err := db.GetDataFormat(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f := strings.Split(dataFormat, "----")
|
||||
index = make(map[string]int)
|
||||
for i, s := range f {
|
||||
index[s] = i
|
||||
}
|
||||
return index, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user