operation_sync.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package router
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/0xJacky/Nginx-UI/internal/analytic"
  8. "github.com/0xJacky/Nginx-UI/internal/logger"
  9. "github.com/gin-gonic/gin"
  10. "github.com/pkg/errors"
  11. "io"
  12. "net/http"
  13. "net/url"
  14. "regexp"
  15. "sync"
  16. )
  17. type ErrorRes struct {
  18. Message string `json:"message"`
  19. }
  20. type toolBodyWriter struct {
  21. gin.ResponseWriter
  22. body *bytes.Buffer
  23. }
  24. func (r toolBodyWriter) Write(b []byte) (int, error) {
  25. return r.body.Write(b)
  26. }
  27. // OperationSync 针对配置了vip的环境操作进行同步
  28. func OperationSync() gin.HandlerFunc {
  29. return func(c *gin.Context) {
  30. bodyBytes, _ := PeekRequest(c.Request)
  31. wb := &toolBodyWriter{
  32. body: &bytes.Buffer{},
  33. ResponseWriter: c.Writer,
  34. }
  35. c.Writer = wb
  36. c.Next()
  37. if c.Request.Method == "GET" || !statusValid(c.Writer.Status()) { // 请求有问题,无需执行同步操作
  38. return
  39. }
  40. totalCount := 0
  41. successCount := 0
  42. detailMsg := ""
  43. // 后置处理操作同步
  44. wg := sync.WaitGroup{}
  45. for _, node := range analytic.NodeMap {
  46. wg.Add(1)
  47. node := node
  48. go func(data analytic.Node) {
  49. defer wg.Done()
  50. if node.OperationSync && node.Status && requestUrlMatch(c.Request.URL.Path, data) { // 开启操作同步且当前状态正常
  51. totalCount++
  52. if err := syncNodeOperation(c, data, bodyBytes); err != nil {
  53. detailMsg += fmt.Sprintf("node_name: %s, err_msg: %s; ", data.Name, err)
  54. return
  55. }
  56. successCount++
  57. }
  58. }(*node)
  59. }
  60. wg.Wait()
  61. if successCount < totalCount { // 如果有错误,替换原来的消息内容
  62. originBytes := wb.body
  63. logger.Infof("origin response body: %s", originBytes)
  64. // clear Origin Buffer
  65. wb.body = &bytes.Buffer{}
  66. wb.ResponseWriter.WriteHeader(http.StatusInternalServerError)
  67. errorRes := ErrorRes{
  68. Message: fmt.Sprintf("operation sync failed, total: %d, success: %d, fail: %d, detail: %s", totalCount, successCount, totalCount-successCount, detailMsg),
  69. }
  70. byts, _ := json.Marshal(errorRes)
  71. _, err := wb.Write(byts)
  72. if err != nil {
  73. logger.Error(err)
  74. }
  75. }
  76. _, err := wb.ResponseWriter.Write(wb.body.Bytes())
  77. if err != nil {
  78. logger.Error(err)
  79. }
  80. }
  81. }
  82. func PeekRequest(request *http.Request) ([]byte, error) {
  83. if request.Body != nil {
  84. byts, err := io.ReadAll(request.Body) // io.ReadAll as Go 1.16, below please use ioutil.ReadAll
  85. if err != nil {
  86. return nil, err
  87. }
  88. request.Body = io.NopCloser(bytes.NewReader(byts))
  89. return byts, nil
  90. }
  91. return make([]byte, 0), nil
  92. }
  93. func requestUrlMatch(url string, node analytic.Node) bool {
  94. p, _ := regexp.Compile(node.SyncApiRegex)
  95. result := p.FindAllString(url, -1)
  96. if len(result) > 0 && result[0] == url {
  97. return true
  98. }
  99. return false
  100. }
  101. func statusValid(code int) bool {
  102. return code < http.StatusMultipleChoices
  103. }
  104. func syncNodeOperation(c *gin.Context, node analytic.Node, bodyBytes []byte) error {
  105. u, err := url.JoinPath(node.URL, c.Request.RequestURI)
  106. if err != nil {
  107. return err
  108. }
  109. decodedUri, err := url.QueryUnescape(u)
  110. if err != nil {
  111. return err
  112. }
  113. logger.Debugf("syncNodeOperation request: %s, node_id: %d, node_name: %s", decodedUri, node.ID, node.Name)
  114. client := http.Client{
  115. Transport: &http.Transport{
  116. TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
  117. },
  118. }
  119. req, err := http.NewRequest(c.Request.Method, decodedUri, bytes.NewReader(bodyBytes))
  120. req.Header.Set("X-Node-Secret", node.Token)
  121. res, err := client.Do(req)
  122. if err != nil {
  123. return err
  124. }
  125. defer res.Body.Close()
  126. byts, err := io.ReadAll(res.Body)
  127. if err != nil {
  128. return err
  129. }
  130. if !statusValid(res.StatusCode) {
  131. errRes := ErrorRes{}
  132. if err = json.Unmarshal(byts, &errRes); err != nil {
  133. return err
  134. }
  135. return errors.New(errRes.Message)
  136. }
  137. logger.Debug("syncNodeOperation result: ", string(byts))
  138. return nil
  139. }