diff --git a/config/config.go b/config/config.go index 06268b5..85625d6 100644 --- a/config/config.go +++ b/config/config.go @@ -30,7 +30,9 @@ func InitConfig() { DB: 0, }, } - viper.SetDefault("redis", defaultConf) + viper.SetDefault("host", defaultConf.Host) + viper.SetDefault("run-mode", defaultConf.RunMode) + viper.SetDefault("redis", defaultConf.Redis) //设置配置文件名和路径 ./config.toml viper.AddConfigPath(".") viper.SetConfigName("config") diff --git a/controller/dataController.go b/controller/dataController.go index ece6596..912b66c 100644 --- a/controller/dataController.go +++ b/controller/dataController.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/gin-gonic/gin" + "github.com/redis/go-redis/v9" ) func ReadDataHandler(c *gin.Context) { @@ -43,17 +44,55 @@ func WriteDataHandler(c *gin.Context) { return } - exists := global.RDB.BFExists(global.RCtx, fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject), c.Query(dedupObject)) - if exists.Val() { - return - } - marshal, err := json.Marshal(data) + luaScript := ` +local dedupKey = KEYS[1] -- KEYS[1]: 去重键 (dedup:token:object) +local listKey = KEYS[2] -- KEYS[2]: 列表键 (list:token) +local dedupValue = ARGV[1] -- ARGV[1]: 去重值 +local jsonData = ARGV[2] -- ARGV[2]: JSON序列化的数据 + +-- 检查布隆过滤器中是否已存在该值 +local exists = redis.call('BF.EXISTS', dedupKey, dedupValue) + +-- 如果已存在,返回已去重标记 +if exists == 1 then + return "已去重" +end + +-- 添加到布隆过滤器 +redis.call('BF.ADD', dedupKey, dedupValue) +-- 添加到列表 +redis.call('LPUSH', listKey, jsonData) + +-- 返回成功结果 +return "ok" +` + k1 := fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject) + k2 := fmt.Sprintf("list:%s", data.Token) + v1 := c.Query(dedupObject) + v2, err := json.Marshal(data) if err != nil { + c.JSON(500, gin.H{"error": "JSON序列化失败 " + err.Error()}) return } - global.RDB.LPush(global.RCtx, fmt.Sprintf("list:%s", data.Token), marshal) - global.RDB.BFAdd(global.RCtx, fmt.Sprintf("dedup:%s:%s", data.Token, dedupObject), c.Query(dedupObject)) - c.JSON(200, gin.H{"result": "ok"}) + + result, err := redis.NewScript(luaScript).Run( + global.RCtx, + global.RDB, + []string{k1, k2}, + v1, + string(v2), + ).Result() + if err != nil { + c.JSON(500, gin.H{"error": "Redis操作失败 " + err.Error()}) + return + } + + if resultMap, ok := result.(string); ok { + c.JSON(200, gin.H{"result": resultMap}) + return + } + + c.JSON(500, gin.H{"error": "WriteDataHandler 错误"}) } func createBF(bloomFilter string, errorRate float64, capacity int64) error { diff --git a/tool/.gitignore b/tool/.gitignore new file mode 100644 index 0000000..0d7baac --- /dev/null +++ b/tool/.gitignore @@ -0,0 +1,2 @@ +./upload +config.toml \ No newline at end of file diff --git a/tool/update-dypid.go b/tool/update-dypid.go new file mode 100644 index 0000000..4aeaf1d --- /dev/null +++ b/tool/update-dypid.go @@ -0,0 +1,215 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/spf13/viper" +) + +type UploadData struct { + Dyid string // dyid + Uid string // uid + Secuid string // secuid + Pid string // pid + CommentId string // comment_id + Id1 string // id1 + Id2 string // id2 + Id3 string // id3 + Id4 string // id4 + Id5 string // id5 +} + +var httpClient = &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 30 * time.Second, + }, + Timeout: 30 * time.Second, +} + +func main() { + //程序配置 + //设置默认值 + viper.SetDefault("url", "http://localhost:8080") + viper.SetDefault("token", "") + viper.SetDefault("thread-count", 10) + //设置配置文件名和路径 ./config.toml + viper.AddConfigPath(".") + viper.SetConfigName("config") + viper.SetConfigType("toml") + viper.SafeWriteConfig() //安全写入默认配置 + //读取配置文件 + if err := viper.ReadInConfig(); err != nil { + fmt.Errorf("无法读取配置文件: %w", err) + } + viper.WatchConfig() + viper.OnConfigChange(func(e fsnotify.Event) { + if err := viper.ReadInConfig(); err != nil { + fmt.Errorf("无法读取配置文件: %w", err) + } + }) + + //检测./upload + fmt.Println("程序启动成功,正在检测upload") + os.Mkdir("./upload", os.ModePerm) + for { + files, err := getTxtFiles("./upload") + if err != nil { + fmt.Println(err) + return + } + if files != nil { + wg := sync.WaitGroup{} + for _, filePath := range files { + fmt.Println("正在上传文件:", filePath) + wg.Add(1) + go func() { + processFile(filePath) + os.Remove(filePath) + wg.Done() + }() + } + wg.Wait() + + } + time.Sleep(2 * time.Second) + } +} +func uploadDataToServer(data UploadData) error { + params := url.Values{} + params.Add("token", viper.GetString("token")) + params.Add("dyid", data.Dyid) + params.Add("uid", data.Uid) + params.Add("secuid", data.Secuid) + params.Add("pid", data.Pid) + params.Add("comment_id", data.CommentId) + params.Add("id1", data.Id1) + params.Add("id2", data.Id2) + params.Add("id3", data.Id3) + params.Add("id4", data.Id4) + params.Add("id5", data.Id5) + + resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader("")) + _, _ = io.Copy(io.Discard, resp.Body) + return err +} + +// 获取目录中的所有txt文件 +func getTxtFiles(dir string) ([]string, error) { + var txtFiles []string + + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + // 只处理普通文件,跳过目录 + if !info.Mode().IsRegular() { + return nil + } + + // 检查文件扩展名是否为.txt + if strings.ToLower(filepath.Ext(path)) == ".txt" { + txtFiles = append(txtFiles, path) + } + + return nil + }) + + return txtFiles, err +} + +func processFile(filePath string) { + var wg sync.WaitGroup + // 打开文件 + file, err := os.Open(filePath) + if err != nil { + fmt.Printf("无法打开文件 %s: %v\n", filePath, err) + return + } + defer file.Close() + + // 创建行通道 + lines := make(chan string, 100) + + // 创建10个worker处理文件上传 + for i := 0; i < viper.GetInt("thread-count"); i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + processLines(lines, workerID, filePath) + }(i) + } + + // 读取文件并发送到通道 + scanner := bufio.NewScanner(file) + lineCount := 0 + for scanner.Scan() { + line := scanner.Text() + lines <- line + lineCount++ + } + + close(lines) + wg.Wait() + + if err := scanner.Err(); err != nil { + fmt.Printf("读取文件 %s 错误: %v\n", filePath, err) + } + + fmt.Printf("文件 %s 处理完成,共处理 %d 行数据\n", filePath, lineCount) +} + +func processLines(lines <-chan string, workerID int, filePath string) { + for line := range lines { + // 跳过空行 + if strings.TrimSpace(line) == "" { + continue + } + + // 使用----分割行数据 + parts := strings.Split(line, "----") + for i := 0; i < 10; i++ { + parts = append(parts, "") + } + + // 确保有足够的字段 + if len(parts) < 3 { + fmt.Printf("Worker %d (文件 %s): 行数据字段不足,跳过: %s\n", workerID, filePath, line) + continue + } + + // 创建上传数据结构 + uploadData := UploadData{ + Dyid: parts[0], + Uid: parts[1], + Secuid: parts[2], + Pid: parts[3], + CommentId: parts[4], + Id1: parts[5], + Id2: parts[6], + Id3: parts[7], + Id4: parts[8], + Id5: parts[9], + } + + // 上传数据 + if err := uploadDataToServer(uploadData); err != nil { + fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err) + } + + // 添加短暂延迟避免服务器过载 + //time.Sleep(10 * time.Millisecond) + } +} diff --git a/web/src/router/index.ts b/web/src/router/index.ts index 920de3a..8a990cf 100644 --- a/web/src/router/index.ts +++ b/web/src/router/index.ts @@ -5,7 +5,7 @@ import TokenDetailView from '@/views/TokenDetailView.vue' const routes = [ {path: '/', name: "TokenList", component: TokenListView}, - {path: '/token', name: "TokenDetail", component: TokenDetailView}, + {path: '/', name: "TokenDetail", component: TokenDetailView}, ]; const router = createRouter({ diff --git a/web/src/views/TokenDetailView.vue b/web/src/views/TokenDetailView.vue index de34043..62a615d 100644 --- a/web/src/views/TokenDetailView.vue +++ b/web/src/views/TokenDetailView.vue @@ -24,6 +24,7 @@ const deleteDedup = () => { dedup_bf: true } }) + getInfo() } const deleteRedis = () => { axios.delete('/api/token/info', { @@ -32,6 +33,7 @@ const deleteRedis = () => { cache_list: true } }) + getInfo() } getInfo()