streams.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package streams
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/0xJacky/Nginx-UI/internal/config"
  6. "github.com/0xJacky/Nginx-UI/internal/helper"
  7. "github.com/0xJacky/Nginx-UI/internal/nginx"
  8. "github.com/0xJacky/Nginx-UI/internal/stream"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/0xJacky/Nginx-UI/query"
  11. "github.com/gin-gonic/gin"
  12. "github.com/samber/lo"
  13. "github.com/spf13/cast"
  14. "github.com/uozi-tech/cosy"
  15. "gorm.io/gorm/clause"
  16. )
  17. type Stream struct {
  18. ModifiedAt time.Time `json:"modified_at"`
  19. Advanced bool `json:"advanced"`
  20. Status config.ConfigStatus `json:"status"`
  21. Name string `json:"name"`
  22. Config string `json:"config"`
  23. Tokenized *nginx.NgxConfig `json:"tokenized,omitempty"`
  24. Filepath string `json:"filepath"`
  25. EnvGroupID uint64 `json:"env_group_id"`
  26. EnvGroup *model.EnvGroup `json:"env_group,omitempty"`
  27. SyncNodeIDs []uint64 `json:"sync_node_ids" gorm:"serializer:json"`
  28. ProxyTargets []config.ProxyTarget `json:"proxy_targets,omitempty"`
  29. }
  30. func GetStreams(c *gin.Context) {
  31. // Parse query parameters
  32. options := &stream.ListOptions{
  33. Search: c.Query("search"),
  34. Name: c.Query("name"),
  35. Status: c.Query("status"),
  36. OrderBy: c.Query("order_by"),
  37. Sort: c.DefaultQuery("sort", "desc"),
  38. EnvGroupID: cast.ToUint64(c.Query("env_group_id")),
  39. }
  40. // Get streams from database
  41. s := query.Stream
  42. eg := query.EnvGroup
  43. // Get environment groups for association
  44. envGroups, err := eg.Find()
  45. if err != nil {
  46. cosy.ErrHandler(c, err)
  47. return
  48. }
  49. // Create environment group map for quick lookup
  50. envGroupMap := lo.SliceToMap(envGroups, func(item *model.EnvGroup) (uint64, *model.EnvGroup) {
  51. return item.ID, item
  52. })
  53. // Get streams with optional filtering
  54. var streams []*model.Stream
  55. if options.EnvGroupID != 0 {
  56. streams, err = s.Where(s.EnvGroupID.Eq(options.EnvGroupID)).Find()
  57. } else {
  58. streams, err = s.Find()
  59. }
  60. if err != nil {
  61. cosy.ErrHandler(c, err)
  62. return
  63. }
  64. // Associate streams with their environment groups
  65. for _, stream := range streams {
  66. if stream.EnvGroupID > 0 {
  67. stream.EnvGroup = envGroupMap[stream.EnvGroupID]
  68. }
  69. }
  70. // Get stream configurations using the internal logic
  71. configs, err := stream.GetStreamConfigs(c, options, streams)
  72. if err != nil {
  73. cosy.ErrHandler(c, err)
  74. return
  75. }
  76. c.JSON(http.StatusOK, gin.H{
  77. "data": configs,
  78. })
  79. }
  80. func GetStream(c *gin.Context) {
  81. name := helper.UnescapeURL(c.Param("name"))
  82. // Get stream information using internal logic
  83. info, err := stream.GetStreamInfo(name)
  84. if err != nil {
  85. cosy.ErrHandler(c, err)
  86. return
  87. }
  88. // Build response based on advanced mode
  89. response := Stream{
  90. ModifiedAt: info.FileInfo.ModTime(),
  91. Advanced: info.Model.Advanced,
  92. Status: info.Status,
  93. Name: name,
  94. Filepath: info.Path,
  95. EnvGroupID: info.Model.EnvGroupID,
  96. EnvGroup: info.Model.EnvGroup,
  97. SyncNodeIDs: info.Model.SyncNodeIDs,
  98. }
  99. if info.Model.Advanced {
  100. response.Config = info.RawContent
  101. } else {
  102. response.Config = info.NgxConfig.FmtCode()
  103. response.Tokenized = info.NgxConfig
  104. }
  105. c.JSON(http.StatusOK, response)
  106. }
  107. func SaveStream(c *gin.Context) {
  108. name := helper.UnescapeURL(c.Param("name"))
  109. var json struct {
  110. Content string `json:"content" binding:"required"`
  111. EnvGroupID uint64 `json:"env_group_id"`
  112. SyncNodeIDs []uint64 `json:"sync_node_ids"`
  113. Overwrite bool `json:"overwrite"`
  114. PostAction string `json:"post_action"`
  115. }
  116. // Validate input JSON
  117. if !cosy.BindAndValid(c, &json) {
  118. return
  119. }
  120. // Save stream configuration using internal logic
  121. err := stream.SaveStreamConfig(name, json.Content, json.EnvGroupID, json.SyncNodeIDs, json.Overwrite, json.PostAction)
  122. if err != nil {
  123. cosy.ErrHandler(c, err)
  124. return
  125. }
  126. // Return the updated stream
  127. GetStream(c)
  128. }
  129. func EnableStream(c *gin.Context) {
  130. // Enable the stream by creating a symlink in streams-enabled directory
  131. err := stream.Enable(helper.UnescapeURL(c.Param("name")))
  132. if err != nil {
  133. cosy.ErrHandler(c, err)
  134. return
  135. }
  136. c.JSON(http.StatusOK, gin.H{
  137. "message": "ok",
  138. })
  139. }
  140. func DisableStream(c *gin.Context) {
  141. // Disable the stream by removing the symlink from streams-enabled directory
  142. err := stream.Disable(helper.UnescapeURL(c.Param("name")))
  143. if err != nil {
  144. cosy.ErrHandler(c, err)
  145. return
  146. }
  147. c.JSON(http.StatusOK, gin.H{
  148. "message": "ok",
  149. })
  150. }
  151. func DeleteStream(c *gin.Context) {
  152. // Delete the stream configuration file and its symbolic link if exists
  153. err := stream.Delete(helper.UnescapeURL(c.Param("name")))
  154. if err != nil {
  155. cosy.ErrHandler(c, err)
  156. return
  157. }
  158. c.JSON(http.StatusOK, gin.H{
  159. "message": "ok",
  160. })
  161. }
  162. func RenameStream(c *gin.Context) {
  163. oldName := helper.UnescapeURL(c.Param("name"))
  164. var json struct {
  165. NewName string `json:"new_name"`
  166. }
  167. // Validate input JSON
  168. if !cosy.BindAndValid(c, &json) {
  169. return
  170. }
  171. // Rename the stream configuration file
  172. err := stream.Rename(oldName, json.NewName)
  173. if err != nil {
  174. cosy.ErrHandler(c, err)
  175. return
  176. }
  177. c.JSON(http.StatusOK, gin.H{
  178. "message": "ok",
  179. })
  180. }
  181. func BatchUpdateStreams(c *gin.Context) {
  182. cosy.Core[model.Stream](c).SetValidRules(gin.H{
  183. "env_group_id": "required",
  184. }).SetItemKey("path").
  185. BeforeExecuteHook(func(ctx *cosy.Ctx[model.Stream]) {
  186. effectedPath := make([]string, len(ctx.BatchEffectedIDs))
  187. var streams []*model.Stream
  188. for i, name := range ctx.BatchEffectedIDs {
  189. path := nginx.GetConfPath("streams-available", name)
  190. effectedPath[i] = path
  191. streams = append(streams, &model.Stream{
  192. Path: path,
  193. })
  194. }
  195. s := query.Stream
  196. err := s.Clauses(clause.OnConflict{
  197. DoNothing: true,
  198. }).Create(streams...)
  199. if err != nil {
  200. ctx.AbortWithError(err)
  201. return
  202. }
  203. ctx.BatchEffectedIDs = effectedPath
  204. }).BatchModify()
  205. }