180 lines
3.8 KiB
Go
180 lines
3.8 KiB
Go
package main
|
||
|
||
import (
|
||
"bufio"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"net/url"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/fsnotify/fsnotify"
|
||
"github.com/spf13/viper"
|
||
)
|
||
|
||
var httpClient = &http.Client{
|
||
Transport: &http.Transport{
|
||
MaxIdleConns: 200,
|
||
MaxIdleConnsPerHost: 100,
|
||
IdleConnTimeout: 30 * time.Second,
|
||
},
|
||
Timeout: 30 * time.Second,
|
||
}
|
||
|
||
func main() {
|
||
initConfig()
|
||
|
||
//检测./upload
|
||
fmt.Println("程序启动成功,正在检测txt文件")
|
||
for {
|
||
files, err := getTxtFiles("./")
|
||
if err != nil {
|
||
fmt.Println(err)
|
||
return
|
||
}
|
||
if files != nil {
|
||
start := time.Now()
|
||
|
||
wg := sync.WaitGroup{}
|
||
for _, filePath := range files {
|
||
fmt.Println("正在上传文件:", filePath)
|
||
wg.Add(1)
|
||
go func() {
|
||
processFile(filePath)
|
||
err := os.Truncate(filePath, 0)
|
||
if err != nil {
|
||
fmt.Println("清空文件失败:", err)
|
||
}
|
||
wg.Done()
|
||
}()
|
||
}
|
||
wg.Wait()
|
||
|
||
fmt.Printf("上传完成,耗时:%s\n", time.Since(start))
|
||
}
|
||
time.Sleep(time.Minute)
|
||
}
|
||
}
|
||
|
||
func initConfig() {
|
||
//程序配置
|
||
viper.SetDefault("url", "http://localhost:8080")
|
||
viper.SetDefault("token", "")
|
||
viper.SetDefault("thread-count", 10)
|
||
//设置配置文件名和路径 ./config.toml
|
||
viper.AddConfigPath(".")
|
||
viper.SetConfigName("config")
|
||
viper.SetConfigType("toml")
|
||
viper.SafeWriteConfig() //安全写入默认配置
|
||
//读取配置文件
|
||
if err := viper.ReadInConfig(); err != nil {
|
||
fmt.Errorf("无法读取配置文件: %w", err)
|
||
}
|
||
viper.WatchConfig()
|
||
viper.OnConfigChange(func(e fsnotify.Event) {
|
||
if err := viper.ReadInConfig(); err != nil {
|
||
fmt.Errorf("无法读取配置文件: %w", err)
|
||
}
|
||
})
|
||
}
|
||
|
||
func uploadDataToServer(data string) error {
|
||
params := url.Values{}
|
||
params.Set("token", viper.GetString("token"))
|
||
params.Set("data", data)
|
||
|
||
resp, err := httpClient.Post(viper.GetString("url")+"/api/data?"+params.Encode(), "application/x-www-form-urlencoded", strings.NewReader(""))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if resp != nil {
|
||
_, _ = io.Copy(io.Discard, resp.Body)
|
||
}
|
||
return err
|
||
}
|
||
|
||
// 获取目录中的所有txt文件
|
||
func getTxtFiles(dir string) (txtFiles []string, err error) {
|
||
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 只处理普通文件,跳过目录
|
||
if !info.Mode().IsRegular() {
|
||
return nil
|
||
}
|
||
|
||
// 检查文件扩展名是否为.txt
|
||
if strings.ToLower(filepath.Ext(path)) == ".txt" {
|
||
txtFiles = append(txtFiles, path)
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
return txtFiles, err
|
||
}
|
||
|
||
func processFile(filePath string) {
|
||
var wg sync.WaitGroup
|
||
// 打开文件
|
||
file, err := os.Open(filePath)
|
||
if err != nil {
|
||
fmt.Printf("无法打开文件 %s: %v\n", filePath, err)
|
||
return
|
||
}
|
||
defer file.Close()
|
||
|
||
// 创建行通道
|
||
lines := make(chan string, 100)
|
||
|
||
// 创建10个worker处理文件上传
|
||
for i := 0; i < viper.GetInt("thread-count"); i++ {
|
||
wg.Add(1)
|
||
go func(workerID int) {
|
||
defer wg.Done()
|
||
processLines(lines, workerID, filePath)
|
||
}(i)
|
||
}
|
||
|
||
// 读取文件并发送到通道
|
||
scanner := bufio.NewScanner(file)
|
||
lineCount := 0
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
lines <- line
|
||
lineCount++
|
||
if lineCount%10000 == 0 {
|
||
fmt.Printf("文件【%s】处理进度:%v%%\n", filePath, float64(lineCount)/40000*100)
|
||
}
|
||
}
|
||
|
||
close(lines)
|
||
wg.Wait()
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
fmt.Printf("读取文件 %s 错误: %v\n", filePath, err)
|
||
return
|
||
}
|
||
|
||
fmt.Printf("文件【%s】处理完成,共处理 %d 行数据\n", filePath, lineCount)
|
||
}
|
||
|
||
func processLines(lines <-chan string, workerID int, filePath string) {
|
||
for line := range lines {
|
||
// 跳过空行
|
||
if strings.TrimSpace(line) == "" {
|
||
continue
|
||
}
|
||
// 上传数据
|
||
if err := uploadDataToServer(line); err != nil {
|
||
fmt.Printf("Worker %d (文件 %s): 上传失败: %v\n", workerID, filePath, err)
|
||
}
|
||
}
|
||
}
|