pipeline.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package pty
  2. import (
  3. "encoding/json"
  4. "github.com/0xJacky/Nginx-UI/settings"
  5. "github.com/creack/pty"
  6. "github.com/gorilla/websocket"
  7. "github.com/pkg/errors"
  8. "github.com/uozi-tech/cosy/logger"
  9. "os"
  10. "os/exec"
  11. "time"
  12. "unicode/utf8"
  13. )
  14. type Pipeline struct {
  15. Pty *os.File
  16. cmd *exec.Cmd
  17. ws *websocket.Conn
  18. }
  19. type Message struct {
  20. Type MsgType
  21. Data json.RawMessage
  22. }
  23. const bufferSize = 2048
  24. func NewPipeLine(conn *websocket.Conn) (p *Pipeline, err error) {
  25. c := exec.Command(settings.TerminalSettings.StartCmd)
  26. ptmx, err := pty.StartWithSize(c, &pty.Winsize{Cols: 90, Rows: 60})
  27. if err != nil {
  28. return nil, errors.Wrap(err, "start pty error")
  29. }
  30. p = &Pipeline{
  31. Pty: ptmx,
  32. cmd: c,
  33. ws: conn,
  34. }
  35. return
  36. }
  37. func (p *Pipeline) ReadWsAndWritePty(errorChan chan error) {
  38. for {
  39. msgType, payload, err := p.ws.ReadMessage()
  40. if err != nil && websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived,
  41. websocket.CloseNormalClosure) {
  42. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close")
  43. return
  44. }
  45. if msgType != websocket.TextMessage {
  46. errorChan <- errors.Errorf("Error ReadWsAndWritePty Invalid msgType: %v", msgType)
  47. return
  48. }
  49. var msg Message
  50. err = json.Unmarshal(payload, &msg)
  51. if err != nil {
  52. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty json.Unmarshal")
  53. return
  54. }
  55. switch msg.Type {
  56. case TypeData:
  57. var data string
  58. err = json.Unmarshal(msg.Data, &data)
  59. if err != nil {
  60. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty json.Unmarshal msg.Data")
  61. return
  62. }
  63. _, err = p.Pty.Write([]byte(data))
  64. if err != nil {
  65. errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty write pty")
  66. return
  67. }
  68. case TypeResize:
  69. var win struct {
  70. Cols uint16
  71. Rows uint16
  72. }
  73. err = json.Unmarshal(msg.Data, &win)
  74. if err != nil {
  75. errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty Invalid resize message")
  76. return
  77. }
  78. err = pty.Setsize(p.Pty, &pty.Winsize{Rows: win.Rows, Cols: win.Cols})
  79. if err != nil {
  80. errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty set pty size")
  81. return
  82. }
  83. case TypePing:
  84. err = p.ws.WriteControl(websocket.PongMessage, []byte{}, time.Now().Add(time.Second))
  85. if err != nil {
  86. errorChan <- errors.Wrap(err, "Error ReadSktAndWritePty write pong")
  87. return
  88. }
  89. default:
  90. errorChan <- errors.Errorf("Error ReadWsAndWritePty unknown msg.Type %v", msg.Type)
  91. return
  92. }
  93. }
  94. }
  95. func (p *Pipeline) ReadPtyAndWriteWs(errorChan chan error) {
  96. buf := make([]byte, bufferSize)
  97. for {
  98. n, err := p.Pty.Read(buf)
  99. if err != nil {
  100. errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs read pty")
  101. return
  102. }
  103. processedOutput := validString(string(buf[:n]))
  104. err = p.ws.WriteMessage(websocket.TextMessage, []byte(processedOutput))
  105. if err != nil && websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
  106. errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write")
  107. return
  108. }
  109. }
  110. }
  111. func (p *Pipeline) Close() {
  112. err := p.Pty.Close()
  113. if err != nil {
  114. logger.Error(err)
  115. }
  116. err = p.cmd.Process.Kill()
  117. if err != nil {
  118. logger.Error(err)
  119. }
  120. _, err = p.cmd.Process.Wait()
  121. if err != nil {
  122. logger.Error(err)
  123. }
  124. }
  125. func validString(s string) string {
  126. if !utf8.ValidString(s) {
  127. v := make([]rune, 0, len(s))
  128. for i, r := range s {
  129. if r == utf8.RuneError {
  130. _, size := utf8.DecodeRuneInString(s[i:])
  131. if size == 1 {
  132. continue
  133. }
  134. }
  135. v = append(v, r)
  136. }
  137. s = string(v)
  138. }
  139. return s
  140. }