123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package router
- import (
- "bytes"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "github.com/0xJacky/Nginx-UI/internal/analytic"
- "github.com/0xJacky/Nginx-UI/internal/logger"
- "github.com/gin-gonic/gin"
- "github.com/pkg/errors"
- "io"
- "net/http"
- "net/url"
- "regexp"
- "sync"
- )
- type ErrorRes struct {
- Message string `json:"message"`
- }
- type toolBodyWriter struct {
- gin.ResponseWriter
- body *bytes.Buffer
- }
- func (r toolBodyWriter) Write(b []byte) (int, error) {
- return r.body.Write(b)
- }
- // OperationSync 针对配置了vip的环境操作进行同步
- func OperationSync() gin.HandlerFunc {
- return func(c *gin.Context) {
- bodyBytes, _ := PeekRequest(c.Request)
- wb := &toolBodyWriter{
- body: &bytes.Buffer{},
- ResponseWriter: c.Writer,
- }
- c.Writer = wb
- c.Next()
- if c.Request.Method == "GET" || !statusValid(c.Writer.Status()) { // 请求有问题,无需执行同步操作
- return
- }
- totalCount := 0
- successCount := 0
- detailMsg := ""
- // 后置处理操作同步
- wg := sync.WaitGroup{}
- for _, node := range analytic.NodeMap {
- wg.Add(1)
- node := node
- go func(data analytic.Node) {
- defer wg.Done()
- if node.OperationSync && node.Status && requestUrlMatch(c.Request.URL.Path, data) { // 开启操作同步且当前状态正常
- totalCount++
- if err := syncNodeOperation(c, data, bodyBytes); err != nil {
- detailMsg += fmt.Sprintf("node_name: %s, err_msg: %s; ", data.Name, err)
- return
- }
- successCount++
- }
- }(*node)
- }
- wg.Wait()
- if successCount < totalCount { // 如果有错误,替换原来的消息内容
- originBytes := wb.body
- logger.Infof("origin response body: %s", originBytes)
- // clear Origin Buffer
- wb.body = &bytes.Buffer{}
- wb.ResponseWriter.WriteHeader(http.StatusInternalServerError)
- errorRes := ErrorRes{
- Message: fmt.Sprintf("operation sync failed, total: %d, success: %d, fail: %d, detail: %s", totalCount, successCount, totalCount-successCount, detailMsg),
- }
- byts, _ := json.Marshal(errorRes)
- _, err := wb.Write(byts)
- if err != nil {
- logger.Error(err)
- }
- }
- _, err := wb.ResponseWriter.Write(wb.body.Bytes())
- if err != nil {
- logger.Error(err)
- }
- }
- }
- func PeekRequest(request *http.Request) ([]byte, error) {
- if request.Body != nil {
- byts, err := io.ReadAll(request.Body) // io.ReadAll as Go 1.16, below please use ioutil.ReadAll
- if err != nil {
- return nil, err
- }
- request.Body = io.NopCloser(bytes.NewReader(byts))
- return byts, nil
- }
- return make([]byte, 0), nil
- }
- func requestUrlMatch(url string, node analytic.Node) bool {
- p, _ := regexp.Compile(node.SyncApiRegex)
- result := p.FindAllString(url, -1)
- if len(result) > 0 && result[0] == url {
- return true
- }
- return false
- }
- func statusValid(code int) bool {
- return code < http.StatusMultipleChoices
- }
- func syncNodeOperation(c *gin.Context, node analytic.Node, bodyBytes []byte) error {
- u, err := url.JoinPath(node.URL, c.Request.RequestURI)
- if err != nil {
- return err
- }
- decodedUri, err := url.QueryUnescape(u)
- if err != nil {
- return err
- }
- logger.Debugf("syncNodeOperation request: %s, node_id: %d, node_name: %s", decodedUri, node.ID, node.Name)
- client := http.Client{
- Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
- },
- }
- req, err := http.NewRequest(c.Request.Method, decodedUri, bytes.NewReader(bodyBytes))
- req.Header.Set("X-Node-Secret", node.Token)
- res, err := client.Do(req)
- if err != nil {
- return err
- }
- defer res.Body.Close()
- byts, err := io.ReadAll(res.Body)
- if err != nil {
- return err
- }
- if !statusValid(res.StatusCode) {
- errRes := ErrorRes{}
- if err = json.Unmarshal(byts, &errRes); err != nil {
- return err
- }
- return errors.New(errRes.Message)
- }
- logger.Debug("syncNodeOperation result: ", string(byts))
- return nil
- }
|