最近被后台日志弄的很烦,看到有个项目简简单单,又能满足需要,顺便试下看看效果,做下记录。只是记录下一部分内容,就不全部读了,关于源码可以去https://github.com/xmge/seelog。
结构设计
// websocket客户端
type client struct {
id string
socket *websocket.Conn
send chan []byte
}
// 客户端管理
type clientManager struct {
clients map[*client]bool
broadcast chan []byte
register chan *client
unregister chan *client
}
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。
WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。
程序使用管道作为通信基础
- clients 用来保存当前全部的Websocket
- broadcast 作为广播使用的管道,当收到消息,向所有的clients中的websocket进行传输信息
- register 当新的链接建立,将client指针放入注册管道
- unregister 当链接断开,将断开的链接对象放入取消管道
- client结构体内的send管道,当broadcast收到,将信息发到每个client的send管道中
func (manager *clientManager) start() {
defer func() {
if err := recover(); err != nil {
log.Printf("[seelog] error:%+v", err)
}
}()
for {
select {
case conn := <-manager.register:
manager.clients[conn] = true
case conn := <-manager.unregister:
if _, ok := manager.clients[conn]; ok {
close(conn.send)
delete(manager.clients, conn)
}
case message := <-manager.broadcast:
for conn := range manager.clients {
select {
case conn.send <- message:
default:
close(conn.send)
delete(manager.clients, conn)
}
}
}
}
}
使用select-case进行管道的数据处理,外部加一个for循环保持轮询的状态。
func (c *client) write() {
defer func() {
manager.unregister <- c
c.socket.Close()
}()
for {
select {
case message, ok := <-c.send:
if !ok {
c.socket.WriteClose(1)
return
}
c.socket.Write(message)
}
}
}
这个是在每个websocket启动的时候使用,每个socket保持一个for循环,使用defer用于关闭操作,当for被打断(即关闭网页之类的操作),socket被关闭,则会插入到取消管道中,clients键值对会删除这个连接的信息。
监控文件的内容变化
通过os.Stat获取文件信息,返回值为fileInfo的接口
fileInfo, err = os.Stat(filePath)
func (f *File) Stat() (FileInfo, error)
type FileInfo interface {
Name() string // base name of the file 文件名
Size() int64 // length in bytes for regular files; system-dependent for others 文件大小(byte长度)
Mode() FileMode // file mode bits 文件模式(只读、只写之类)
ModTime() time.Time // modification time
IsDir() bool // abbreviation for Mode().IsDir() 是否是目录
Sys() interface{} // underlying data source (can return nil) 基础数据源(可以返回nil)
}
获取当前的文件的截止位置
offset := fileInfo.Size()
获取新的文件大小,然后根据文件大小和之前的区别,构建一个新的byte数组,大小为新的字节数减去旧的字节数
msg := make([]byte, newOffset-offset)
使用Open方法打开一个文件,Open方法是以只读的方式读取数据
file, err := os.Open(filePath)
func Open(name string) (*File, error)
可以将文件读取的起点设置到某个位置,在seelog中,将读取起点设置到文件末尾,当文件的大小发生变化,则文件从上个起点开始读取文件内容
_, err = file.Seek(offset, 0)
func (f *File) Seek(offset int64, whence int) (ret int64, err error)
whence 存在3个参数
0:文件头的绝对位置偏移offset的距离
1:文件的相对位置,即当前位置偏移offset的距离
2:文件末尾的绝对位置偏移offset的距离
这个特性当文件以O_APPEND的模式打开是没有效果的
msg是之前构造的字节数组,将新增的内容读取到字节数组中
_, err = file.Read(msg)
使用管道作为消息传输的方式,manager在这里是一个全局的manager,当管道收到消息,就打印处理
manager.broadcast <- msg
最后记得将文件关闭,否则下次打开会出错
file.Close()