streams.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package streams
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/0xJacky/Nginx-UI/internal/config"
  6. "github.com/0xJacky/Nginx-UI/internal/helper"
  7. "github.com/0xJacky/Nginx-UI/internal/nginx"
  8. "github.com/0xJacky/Nginx-UI/internal/stream"
  9. "github.com/0xJacky/Nginx-UI/model"
  10. "github.com/0xJacky/Nginx-UI/query"
  11. "github.com/gin-gonic/gin"
  12. "github.com/samber/lo"
  13. "github.com/spf13/cast"
  14. "github.com/uozi-tech/cosy"
  15. "gorm.io/gorm/clause"
  16. )
  17. type Stream struct {
  18. ModifiedAt time.Time `json:"modified_at"`
  19. Advanced bool `json:"advanced"`
  20. Status config.ConfigStatus `json:"status"`
  21. Name string `json:"name"`
  22. Config string `json:"config"`
  23. Tokenized *nginx.NgxConfig `json:"tokenized,omitempty"`
  24. Filepath string `json:"filepath"`
  25. EnvGroupID uint64 `json:"env_group_id"`
  26. EnvGroup *model.EnvGroup `json:"env_group,omitempty"`
  27. SyncNodeIDs []uint64 `json:"sync_node_ids" gorm:"serializer:json"`
  28. ProxyTargets []config.ProxyTarget `json:"proxy_targets,omitempty"`
  29. }
  30. func GetStreams(c *gin.Context) {
  31. // Parse query parameters
  32. options := &stream.ListOptions{
  33. Search: c.Query("search"),
  34. Status: c.Query("status"),
  35. OrderBy: c.Query("order_by"),
  36. Sort: c.DefaultQuery("sort", "desc"),
  37. EnvGroupID: cast.ToUint64(c.Query("env_group_id")),
  38. }
  39. // Get streams from database
  40. s := query.Stream
  41. eg := query.EnvGroup
  42. // Get environment groups for association
  43. envGroups, err := eg.Find()
  44. if err != nil {
  45. cosy.ErrHandler(c, err)
  46. return
  47. }
  48. // Create environment group map for quick lookup
  49. envGroupMap := lo.SliceToMap(envGroups, func(item *model.EnvGroup) (uint64, *model.EnvGroup) {
  50. return item.ID, item
  51. })
  52. // Get streams with optional filtering
  53. var streams []*model.Stream
  54. if options.EnvGroupID != 0 {
  55. streams, err = s.Where(s.EnvGroupID.Eq(options.EnvGroupID)).Find()
  56. } else {
  57. streams, err = s.Find()
  58. }
  59. if err != nil {
  60. cosy.ErrHandler(c, err)
  61. return
  62. }
  63. // Associate streams with their environment groups
  64. for _, stream := range streams {
  65. if stream.EnvGroupID > 0 {
  66. stream.EnvGroup = envGroupMap[stream.EnvGroupID]
  67. }
  68. }
  69. // Get stream configurations using the internal logic
  70. configs, err := stream.GetStreamConfigs(c, options, streams)
  71. if err != nil {
  72. cosy.ErrHandler(c, err)
  73. return
  74. }
  75. c.JSON(http.StatusOK, gin.H{
  76. "data": configs,
  77. })
  78. }
  79. func GetStream(c *gin.Context) {
  80. name := helper.UnescapeURL(c.Param("name"))
  81. // Get stream information using internal logic
  82. info, err := stream.GetStreamInfo(name)
  83. if err != nil {
  84. cosy.ErrHandler(c, err)
  85. return
  86. }
  87. // Build response based on advanced mode
  88. response := Stream{
  89. ModifiedAt: info.FileInfo.ModTime(),
  90. Advanced: info.Model.Advanced,
  91. Status: info.Status,
  92. Name: name,
  93. Filepath: info.Path,
  94. EnvGroupID: info.Model.EnvGroupID,
  95. EnvGroup: info.Model.EnvGroup,
  96. SyncNodeIDs: info.Model.SyncNodeIDs,
  97. }
  98. if info.Model.Advanced {
  99. response.Config = info.RawContent
  100. } else {
  101. response.Config = info.NgxConfig.FmtCode()
  102. response.Tokenized = info.NgxConfig
  103. }
  104. c.JSON(http.StatusOK, response)
  105. }
  106. func SaveStream(c *gin.Context) {
  107. name := helper.UnescapeURL(c.Param("name"))
  108. var json struct {
  109. Content string `json:"content" binding:"required"`
  110. EnvGroupID uint64 `json:"env_group_id"`
  111. SyncNodeIDs []uint64 `json:"sync_node_ids"`
  112. Overwrite bool `json:"overwrite"`
  113. PostAction string `json:"post_action"`
  114. }
  115. // Validate input JSON
  116. if !cosy.BindAndValid(c, &json) {
  117. return
  118. }
  119. // Save stream configuration using internal logic
  120. err := stream.SaveStreamConfig(name, json.Content, json.EnvGroupID, json.SyncNodeIDs, json.Overwrite, json.PostAction)
  121. if err != nil {
  122. cosy.ErrHandler(c, err)
  123. return
  124. }
  125. // Return the updated stream
  126. GetStream(c)
  127. }
  128. func EnableStream(c *gin.Context) {
  129. // Enable the stream by creating a symlink in streams-enabled directory
  130. err := stream.Enable(helper.UnescapeURL(c.Param("name")))
  131. if err != nil {
  132. cosy.ErrHandler(c, err)
  133. return
  134. }
  135. c.JSON(http.StatusOK, gin.H{
  136. "message": "ok",
  137. })
  138. }
  139. func DisableStream(c *gin.Context) {
  140. // Disable the stream by removing the symlink from streams-enabled directory
  141. err := stream.Disable(helper.UnescapeURL(c.Param("name")))
  142. if err != nil {
  143. cosy.ErrHandler(c, err)
  144. return
  145. }
  146. c.JSON(http.StatusOK, gin.H{
  147. "message": "ok",
  148. })
  149. }
  150. func DeleteStream(c *gin.Context) {
  151. // Delete the stream configuration file and its symbolic link if exists
  152. err := stream.Delete(helper.UnescapeURL(c.Param("name")))
  153. if err != nil {
  154. cosy.ErrHandler(c, err)
  155. return
  156. }
  157. c.JSON(http.StatusOK, gin.H{
  158. "message": "ok",
  159. })
  160. }
  161. func RenameStream(c *gin.Context) {
  162. oldName := helper.UnescapeURL(c.Param("name"))
  163. var json struct {
  164. NewName string `json:"new_name"`
  165. }
  166. // Validate input JSON
  167. if !cosy.BindAndValid(c, &json) {
  168. return
  169. }
  170. // Rename the stream configuration file
  171. err := stream.Rename(oldName, json.NewName)
  172. if err != nil {
  173. cosy.ErrHandler(c, err)
  174. return
  175. }
  176. c.JSON(http.StatusOK, gin.H{
  177. "message": "ok",
  178. })
  179. }
  180. func BatchUpdateStreams(c *gin.Context) {
  181. cosy.Core[model.Stream](c).SetValidRules(gin.H{
  182. "env_group_id": "required",
  183. }).SetItemKey("path").
  184. BeforeExecuteHook(func(ctx *cosy.Ctx[model.Stream]) {
  185. effectedPath := make([]string, len(ctx.BatchEffectedIDs))
  186. var streams []*model.Stream
  187. for i, name := range ctx.BatchEffectedIDs {
  188. path := nginx.GetConfPath("streams-available", name)
  189. effectedPath[i] = path
  190. streams = append(streams, &model.Stream{
  191. Path: path,
  192. })
  193. }
  194. s := query.Stream
  195. err := s.Clauses(clause.OnConflict{
  196. DoNothing: true,
  197. }).Create(streams...)
  198. if err != nil {
  199. ctx.AbortWithError(err)
  200. return
  201. }
  202. ctx.BatchEffectedIDs = effectedPath
  203. }).BatchModify()
  204. }