diff --git a/internal/api/api.go b/internal/api/api.go index 8f20428..38baf9a 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "net/url" + "sync" "time" ) @@ -28,17 +29,25 @@ func init() { // InitConn 创建连接池 func InitConn() { - for i := 0; i < 200; i++ { - resp, err := httpClient.Get(config.APPConfig.Url + "/api/test") - if err != nil { - fmt.Println(err) - return - } - defer func() { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - }() + wg := &sync.WaitGroup{} + + for i := 0; i < 10; i++ { + wg.Go(func() { + for i := 0; i < 50; i++ { + resp, err := httpClient.Get(config.APPConfig.Url + "/api/test") + if err != nil { + fmt.Println(err) + return + } + defer func() { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + }() + } + }) } + + wg.Wait() } func UploadDataToServer(ctx context.Context, data string) error { diff --git a/internal/uploader/uploader.go b/internal/uploader/uploader.go index 1666afc..041400f 100644 --- a/internal/uploader/uploader.go +++ b/internal/uploader/uploader.go @@ -211,7 +211,6 @@ func getTxtFiles(dir string) (txtFiles []string, err error) { } func processFile(ctx context.Context, logChan *chan string, filePath string, fileLines int) { - var wg sync.WaitGroup // 打开文件 file, err := os.Open(filePath) if err != nil { @@ -221,8 +220,7 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil defer file.Close() // 创建行通道 - lines := make(chan string, 100) - defer close(lines) + lines := make(chan string, 200) var countLine int32 = 0 // 创建指定个worker同时处理文件上传 for i := 0; i < config.APPConfig.ThreadCount; i++ { @@ -230,9 +228,9 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil case <-ctx.Done(): return default: - wg.Go(func() { + go func() { processLines(ctx, logChan, &lines, i, filePath, &countLine) - }) + }() } } @@ -255,22 +253,33 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil } }() + // 等待所有行处理完成并推送进度 for int(countLine) != fileLines { select { case <-ctx.Done(): - wg.Wait() + close(lines) //关闭processLines中的上传线程 return default: progress.Store(filepath.Base(filePath), - Progress{FileName: filepath.Base(filePath), - Total: fileLines, Uploaded: int(countLine), - Percentage: int(float64(countLine)/float64(fileLines)*100) + 1, - }) - time.Sleep(500 * time.Millisecond) + Progress{ + FileName: filepath.Base(filePath), + Total: fileLines, + Uploaded: int(countLine), + Percentage: int(float64(countLine) / float64(fileLines) * 100), + }, + ) } } - - wg.Wait() + //上传完成,进度设为100 + progress.Store(filepath.Base(filePath), + Progress{ + FileName: filepath.Base(filePath), + Total: fileLines, + Uploaded: int(countLine), + Percentage: 100, + }, + ) + close(lines) //关闭processLines中的上传线程 if err := scanner.Err(); err != nil { AddLog(logChan, fmt.Sprintf("读取文件 %s 错误: %v", filePath, err)) @@ -280,6 +289,7 @@ func processFile(ctx context.Context, logChan *chan string, filePath string, fil AddLog(logChan, fmt.Sprintf("文件【%s】处理完成,共处理 %d 行数据", filepath.Base(filePath), countLine)) } +// processLines 处理接受到的每一行数据并上传(chan 管道接受数据) func processLines(ctx context.Context, logChan *chan string, lines *chan string, workerID int, filePath string, countLine *int32) { for line := range *lines { select {