operation_sync.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package router
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/0xJacky/Nginx-UI/server/internal/analytic"
  8. "github.com/0xJacky/Nginx-UI/server/internal/logger"
  9. "github.com/gin-gonic/gin"
  10. "github.com/pkg/errors"
  11. "io"
  12. "net/http"
  13. "net/url"
  14. "regexp"
  15. "sync"
  16. )
  17. type ErrorRes struct {
  18. Message string `json:"message"`
  19. }
  20. type toolBodyWriter struct {
  21. gin.ResponseWriter
  22. body *bytes.Buffer
  23. }
  24. func (r toolBodyWriter) Write(b []byte) (int, error) {
  25. return r.body.Write(b)
  26. }
  27. // OperationSync 针对配置了vip的环境操作进行同步
  28. func OperationSync() gin.HandlerFunc {
  29. return func(c *gin.Context) {
  30. bodyBytes, _ := PeekRequest(c.Request)
  31. wb := &toolBodyWriter{
  32. body: &bytes.Buffer{},
  33. ResponseWriter: c.Writer,
  34. }
  35. c.Writer = wb
  36. c.Next()
  37. if c.Request.Method == "GET" || !statusValid(c.Writer.Status()) { // 请求有问题,无需执行同步操作
  38. wb.ResponseWriter.Write(wb.body.Bytes())
  39. return
  40. }
  41. totalCount := 0
  42. successCount := 0
  43. detailMsg := ""
  44. // 后置处理操作同步
  45. wg := sync.WaitGroup{}
  46. for _, node := range analytic.NodeMap {
  47. wg.Add(1)
  48. go func(data analytic.Node) {
  49. defer wg.Done()
  50. if *node.OperationSync && node.Status && requestUrlMatch(c.Request.URL.Path, data) { // 开启操作同步且当前状态正常
  51. totalCount++
  52. if err := syncNodeOperation(c, data, bodyBytes); err != nil {
  53. detailMsg += fmt.Sprintf("node_name: %s, err_msg: %s; ", data.Name, err)
  54. return
  55. }
  56. successCount++
  57. }
  58. }(*node)
  59. }
  60. wg.Wait()
  61. if successCount < totalCount { // 如果有错误,替换原来的消息内容
  62. originBytes := wb.body
  63. logger.Infof("origin response body: %s", originBytes)
  64. // clear Origin Buffer
  65. wb.body = &bytes.Buffer{}
  66. wb.ResponseWriter.WriteHeader(http.StatusInternalServerError)
  67. errorRes := ErrorRes{
  68. Message: fmt.Sprintf("operation sync failed, total: %d, success: %d, fail: %d, detail: %s", totalCount, successCount, totalCount-successCount, detailMsg),
  69. }
  70. byts, _ := json.Marshal(errorRes)
  71. wb.Write(byts)
  72. }
  73. wb.ResponseWriter.Write(wb.body.Bytes())
  74. }
  75. }
  76. func PeekRequest(request *http.Request) ([]byte, error) {
  77. if request.Body != nil {
  78. byts, err := io.ReadAll(request.Body) // io.ReadAll as Go 1.16, below please use ioutil.ReadAll
  79. if err != nil {
  80. return nil, err
  81. }
  82. request.Body = io.NopCloser(bytes.NewReader(byts))
  83. return byts, nil
  84. }
  85. return make([]byte, 0), nil
  86. }
  87. func requestUrlMatch(url string, node analytic.Node) bool {
  88. p, _ := regexp.Compile(node.SyncApiRegex)
  89. result := p.FindAllString(url, -1)
  90. if len(result) > 0 && result[0] == url {
  91. return true
  92. }
  93. return false
  94. }
  95. func statusValid(code int) bool {
  96. return code < http.StatusMultipleChoices
  97. }
  98. func syncNodeOperation(c *gin.Context, node analytic.Node, bodyBytes []byte) error {
  99. u, err := url.JoinPath(node.URL, c.Request.RequestURI)
  100. if err != nil {
  101. return err
  102. }
  103. decodedUri, err := url.QueryUnescape(u)
  104. if err != nil {
  105. return err
  106. }
  107. logger.Debugf("syncNodeOperation request: %s, node_id: %d, node_name: %s", decodedUri, node.ID, node.Name)
  108. client := http.Client{
  109. Transport: &http.Transport{
  110. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  111. },
  112. }
  113. req, err := http.NewRequest(c.Request.Method, decodedUri, bytes.NewReader(bodyBytes))
  114. req.Header.Set("X-Node-Secret", node.Token)
  115. res, err := client.Do(req)
  116. if err != nil {
  117. return err
  118. }
  119. defer res.Body.Close()
  120. byts, err := io.ReadAll(res.Body)
  121. if err != nil {
  122. return err
  123. }
  124. if !statusValid(res.StatusCode) {
  125. errRes := ErrorRes{}
  126. if err = json.Unmarshal(byts, &errRes); err != nil {
  127. return err
  128. }
  129. return errors.New(errRes.Message)
  130. }
  131. logger.Debug("syncNodeOperation result: ", string(byts))
  132. return nil
  133. }