序
最近一个项目上用到消息推送功能,boss说写个简单的ws让前端自行处理消息就好(前端:MMP)。
看到网上用到比较多的框架是gorilla/websocket,但是要验证第三方包的稳定性还是需要一定时间,还是用官方的websocket包吧(又不是不能用.jpg)。
思路其实很简单,一端接收到消息之后直接推入一个channel。另启一个协程去接收这个channel的消息,然后广播到所有ws连接。
Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| package main
import ( "context" "log" "net/http" "sync"
"golang.org/x/net/websocket" )
var ( WsSrv *WsServer )
const ( MessageChanSize = 1024 )
type WsServer struct { Clients map[string]*WsClient MessageChan chan string Codec websocket.Codec sync.RWMutex }
type WsClient struct { websocket *websocket.Conn }
func InitWsServer() { WsSrv = &WsServer{ Clients: map[string]*WsClient{}, MessageChan: make(chan string, MessageChanSize), Codec: websocket.Message, } }
func WsReceiver(ws *websocket.Conn) { var ( msg string err error ) defer func() { if err = ws.Close(); err != nil { log.Printf("[WsReceiver] ws(%v) closed err: %v\n", ws.RemoteAddr(), err) } }()
userID := ws.Request().URL.Query().Get("user_id") if userID == "" { return }
defer func() { WsSrv.Lock() defer WsSrv.Unlock() delete(WsSrv.Clients, userID) }()
WsSrv.Lock() WsSrv.Clients[userID] = &WsClient{websocket: ws} WsSrv.Unlock()
for { if err = WsSrv.Codec.Receive(ws, &msg); err != nil { log.Printf("[WsReceiver] receive from remote result err: %v\n", err) return } WsSrv.MessageChan <- msg } }
func WsSender(ctx context.Context) { var err error for { select { case msg := <-WsSrv.MessageChan: for userID, conn := range WsSrv.Clients { if err = websocket.Message.Send(conn.websocket, msg); err != nil { log.Printf("[WsSender] send msg(%v) to user_id(%v) result err: %v\n", msg, userID, err) } } case <-ctx.Done(): return } } }
func main() { var err error
InitWsServer() go WsSender(context.Background())
mux := http.NewServeMux() mux.Handle("/im", websocket.Handler(WsReceiver)) mux.HandleFunc("/ping", func(writer http.ResponseWriter, request *http.Request) { _, _ = writer.Write([]byte("pong")) })
if err = http.ListenAndServe(":23333", mux); err != nil { log.Fatalln(err) } }
|
nginx转发事要
在http请求升级为ws请求的过程中会带着两个重要的header,一个是Upgrade: websocket
,另一个是Connection: upgrade
。只有带上这两个header才能在http/1.1下成功将http请求升级为ws请求,但是如果在服务器中有nginx反向代理的情况下往往会把这个头丢掉,所以要在配置文件上加上来,否则会出现著名的http状态码400错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| map $http_upgrade $connection_upgrade { default upgrade; "" close; }
upstream srv { server 127.0.0.1:23333; }
server { listen 80;
location ~ ^/im { proxy_pass http://srv; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; } }
|
鸣谢
golang websocket的例子
Using NGINX as a WebSocket Proxy
websocket 在线测试工具
wscat