将存储部分迁移到Redis

This commit is contained in:
wtz
2026-02-20 19:41:02 +08:00
parent 13d2b0e1dc
commit a272dad5f1
9 changed files with 704 additions and 147 deletions

View File

@@ -20,11 +20,13 @@ var upgrader = websocket.Upgrader{
}
type Client struct {
ID string
RoomID string
Conn *websocket.Conn
Send chan []byte
Hub *Hub
ID string
RoomID string
Conn *websocket.Conn
Send chan []byte
Hub *Hub
done chan struct{}
doneOnce sync.Once
}
type Hub struct {
@@ -33,6 +35,8 @@ type Hub struct {
Unregister chan *Client
GameMgr *game.GameManager
mu sync.RWMutex
stopChan chan struct{}
stopOnce sync.Once
}
func NewHub(gameMgr *game.GameManager) *Hub {
@@ -41,6 +45,7 @@ func NewHub(gameMgr *game.GameManager) *Hub {
Register: make(chan *Client, 64),
Unregister: make(chan *Client, 64),
GameMgr: gameMgr,
stopChan: make(chan struct{}),
}
}
@@ -49,39 +54,71 @@ func (h *Hub) Run() {
select {
case c := <-h.Register:
h.mu.Lock()
if old, exists := h.Clients[c.ID]; exists {
old.close()
delete(h.Clients, old.ID)
}
h.Clients[c.ID] = c
h.mu.Unlock()
case c := <-h.Unregister:
h.mu.Lock()
if _, ok := h.Clients[c.ID]; ok {
delete(h.Clients, c.ID)
close(c.Send)
c.close()
}
h.mu.Unlock()
if c.RoomID != "" {
h.GameMgr.LeaveRoom(c.RoomID, c.ID)
h.GameMgr.LeaveRoomWithTTL(c.RoomID, c.ID)
h.broadcastRoomState(c.RoomID)
}
case <-h.stopChan:
h.mu.Lock()
for _, c := range h.Clients {
c.close()
}
h.Clients = make(map[string]*Client)
h.mu.Unlock()
return
}
}
}
func (h *Hub) Stop() {
h.stopOnce.Do(func() {
close(h.stopChan)
})
}
func (c *Client) close() {
c.doneOnce.Do(func() {
close(c.done)
})
}
func (c *Client) ReadPump() {
defer func() {
c.Hub.Unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, data, err := c.Conn.ReadMessage()
if err != nil {
break
select {
case <-c.done:
return
default:
_, data, err := c.Conn.ReadMessage()
if err != nil {
return
}
c.handleMessage(data)
}
c.handleMessage(data)
}
}
@@ -91,8 +128,13 @@ func (c *Client) WritePump() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case <-c.done:
c.Conn.WriteMessage(websocket.CloseMessage, nil)
return
case msg, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
@@ -102,6 +144,7 @@ func (c *Client) WritePump() {
if err := c.Conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
@@ -210,6 +253,7 @@ func (c *Client) sendError(msg string) {
})
select {
case c.Send <- data:
case <-c.done:
default:
}
}
@@ -222,6 +266,7 @@ func (h *Hub) broadcastToRoom(roomID string, msg models.Message) {
if c.RoomID == roomID {
select {
case c.Send <- data:
case <-c.done:
default:
}
}
@@ -248,6 +293,7 @@ func (h *Hub) broadcastRoomState(roomID string) {
})
select {
case c.Send <- data:
case <-c.done:
default:
}
}
@@ -274,15 +320,15 @@ func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
Conn: conn,
Send: make(chan []byte, 256),
Hub: hub,
done: make(chan struct{}),
}
hub.mu.Lock()
hub.Clients[client.ID] = client
hub.mu.Unlock()
hub.GameMgr.MarkPlayerOnline(roomID, playerID)
go client.WritePump()
go client.ReadPump()
hub.Register <- client
time.Sleep(50 * time.Millisecond)
go client.WritePump()
go client.ReadPump()
hub.broadcastRoomState(roomID)
}