feat: 添加文件上传工具及CI/CD配置
Some checks failed
构建上传工具 / build-tool (push) Failing after 22s

This commit is contained in:
2025-10-11 12:34:06 +08:00
parent cd068268b0
commit 0407102d70
5 changed files with 360 additions and 0 deletions

226
main.go Normal file
View File

@@ -0,0 +1,226 @@
package main
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)
var httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 200,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 30 * time.Second,
},
Timeout: 30 * time.Second,
}
type Task struct {
FilePath string
FileLines int
}
func main() {
initConfig()
//检测./upload
fmt.Println("程序启动成功正在检测txt文件")
for {
files, err := getTxtFiles("./")
if err != nil {
fmt.Println(err)
continue
}
if files != nil {
start := time.Now()
fileLines := make(map[string]int)
fmt.Println("正在统计", len(files), "个文件行数")
for _, filePath := range files {
file, err := os.Open(filePath)
if err != nil {
fmt.Println("打开文件失败:", err)
}
// 使用 bufio.Scanner 逐行读取
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
file.Close()
if lineCount == 0 {
continue
}
fileLines[filepath.Base(filePath)] = lineCount
fmt.Println(filepath.Base(filePath), "文件行数:", lineCount)
}
var tasks []Task
for k, v := range fileLines {
tasks = append(tasks, Task{FilePath: k, FileLines: v})
}
// 使用errgroup控制并发
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(50) // 设置最大并发数为50
// 执行所有任务
for _, task := range tasks {
task := task // 创建局部变量
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
fmt.Println("正在上传文件:", filepath.Base(task.FilePath))
processFile(task.FilePath, task.FileLines)
err := os.Truncate(task.FilePath, 0)
if err != nil {
fmt.Println("清空文件失败:", err)
}
return nil
}
})
}
// 等待所有任务完成
if err := g.Wait(); err != nil {
fmt.Printf("任务执行出错: %v\n", err)
} else {
fmt.Println("所有任务执行完成!")
}
fmt.Printf("上传完成,耗时:%s\n", time.Since(start))
}
time.Sleep(time.Minute)
}
}
func initConfig() {
//程序配置
viper.SetDefault("url", "http://localhost:8080")
viper.SetDefault("token", "")
viper.SetDefault("thread-count", 10)
//设置配置文件名和路径 ./config.toml
viper.AddConfigPath(".")
viper.SetConfigName("config")
viper.SetConfigType("toml")
viper.SafeWriteConfig() //安全写入默认配置
//读取配置文件
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
if err := viper.ReadInConfig(); err != nil {
fmt.Errorf("无法读取配置文件: %w", err)
}
})
}
func uploadDataToServer(data string) error {
params := url.Values{}
params.Set("token", viper.GetString("token"))
params.Set("data", data)
resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader(""))
if err != nil {
return err
}
if resp != nil {
_, _ = io.Copy(io.Discard, resp.Body)
}
return err
}
// 获取目录中的所有txt文件
func getTxtFiles(dir string) (txtFiles []string, err error) {
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 只处理普通文件,跳过目录
if !info.Mode().IsRegular() {
return nil
}
// 检查文件扩展名是否为.txt
if strings.ToLower(filepath.Ext(path)) == ".txt" {
txtFiles = append(txtFiles, path)
}
return nil
})
return txtFiles, err
}
func processFile(filePath string, fileLines int) {
var wg sync.WaitGroup
// 打开文件
file, err := os.Open(filePath)
if err != nil {
fmt.Printf("无法打开文件 %s: %v\n", filePath, err)
return
}
defer file.Close()
// 创建行通道
lines := make(chan string, 100)
// 创建10个worker处理文件上传
for i := 0; i < viper.GetInt("thread-count"); i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
processLines(lines, workerID, filePath)
}(i)
}
// 读取文件并发送到通道
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lines <- scanner.Text()
lineCount++
if lineCount%10000 == 0 {
fmt.Printf("文件【%s】处理进度%.2f%%\n", filePath, float64(lineCount)/float64(fileLines)*100)
}
}
close(lines)
wg.Wait()
if err := scanner.Err(); err != nil {
fmt.Printf("读取文件 %s 错误: %v\n", filePath, err)
return
}
fmt.Printf("文件【%s】处理完成共处理 %d 行数据\n", filePath, lineCount)
}
func processLines(lines <-chan string, workerID int, filePath string) {
for line := range lines {
// 跳过空行
if strings.TrimSpace(line) == "" {
continue
}
// 上传数据
if err := uploadDataToServer(line); err != nil {
fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)
}
}
}