构造
- netty 作为服务端
- protobuf 作为序列化数据的协定
- websocket 前端通信
演示
netty 服务端完成
Server.java
启动类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
//websocket长衔接示例
public class Server {
public static void main(String[] args) throws Exception{
// 主线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从线程组
EventLoopGroup wokerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerChannelInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
}
ServerChannelInitializer.java
import com.example.nettydemo.protobuf.MessageData;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.util.List;
import static io.netty.buffer.Unpooled.wrappedBuffer;
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// HTTP要求的解码和编码
pipeline.addLast(new HttpServerCodec());
// 把多个音讯转换为一个单一的FullHttpRequest或是FullHttpResponse,
// 原因是HTTP解码器会在每一个HTTP音讯中天生多个音讯对象HttpRequest/HttpResponse,HttpContent,LastHttpContent
pipeline.addLast(new HttpObjectAggregator(65536));
// 重要用于处置惩罚大数据流,比方一个1G大小的文件假如你直接传输肯定会撑暴jvm内存的; 增添以后就不必斟酌这个问题了
pipeline.addLast(new ChunkedWriteHandler());
// WebSocket数据压缩
pipeline.addLast(new WebSocketServerCompressionHandler());
// 协定包长度限定
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true));
// 协定包解码
pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
objs.add(buf);
buf.retain();
}
});
// 协定包编码
pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
@Override
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
ByteBuf result = null;
if (msg instanceof MessageLite) {
result = wrappedBuffer(((MessageLite) msg).toByteArray());
}
if (msg instanceof MessageLite.Builder) {
result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}
// ==== 上面代码片断是拷贝自TCP ProtobufEncoder 源码 ====
// 然后下面再转成websocket二进制流,由于客户端不能直接剖析protobuf编码天生的
WebSocketFrame frame = new BinaryWebSocketFrame(result);
out.add(frame);
}
});
// 协定包解码时指定Protobuf字节数实例化为CommonProtocol范例
pipeline.addLast(new ProtobufDecoder(MessageData.RequestUser.getDefaultInstance()));
// websocket定义了通报数据的6中frame范例
pipeline.addLast(new ServerFrameHandler());
}
}
ServerFrameHandler.java
import com.example.nettydemo.protobuf.MessageData;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.List;
//处置惩罚文本协定数据,处置惩罚TextWebSocketFrame范例的数据,websocket特地处置惩罚文本的frame就是TextWebSocketFrame
public class ServerFrameHandler extends SimpleChannelInboundHandler<MessageData.RequestUser> {
private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//读到客户端的内容而且向客户端去写内容
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageData.RequestUser msg) throws Exception {
// channelGroup.add();
Channel channel = ctx.channel();
System.out.println(msg.getUserName());
System.out.println(msg.getAge());
System.out.println(msg.getPassword());
MessageData.ResponseUser bank = MessageData
.ResponseUser.newBuilder()
.setUserName("你好,叨教有什么能够协助你!")
.setAge(18).setPassword("11111").build();
channel.writeAndFlush(bank);
}
//每一个channel都有一个唯一的id值
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//打印出channel唯一值,asLongText要领是channel的id的全名
// System.out.println("handlerAdded:"+ctx.channel().id().asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// System.out.println("handlerRemoved:" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("非常发作");
ctx.close();
}
}
protobuf 文件的运用
proto 文件
syntax ="proto2";
package com.example.nettydemo.protobuf;
//optimize_for 加速剖析的速率
option optimize_for = SPEED;
option java_package = "com.example.nettydemo.protobuf";
option java_outer_classname="MessageData";
// 客户端发送过来的音讯实体
message RequestUser{
optional string user_name = 1;
optional int32 age = 2;
optional string password = 3;
}
// 返回给客户端的音讯实体
message ResponseUser{
optional string user_name = 1;
optional int32 age = 2;
optional string password = 3;
}
天生 proto 的Java 类
批量天生东西,直接找到这个 bat 或许 sh 文件,在对应的平台实行就能够了详细能够自行百度 protobuf 怎样运用
Windows 版本
set outPath=../../java
set fileArray=(MessageDataProto ATestProto)
# 将.proto文件天生java类
for %%i in %fileArray% do (
echo generate cli protocol java code: %%i.proto
protoc --java_out=%outPath% ./%%i.proto
)
pause
sh 版本
地点: https://github.com/lmxdawn/ne…
#!/bin/bash
outPath=../../java
fileArray=(MessageDataProto ATestProto)
for i in ${fileArray[@]};
do
echo "generate cli protocol java code: ${i}.proto"
protoc --java_out=$outPath ./$i.proto
done
websocket 完成
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket客户端</title>
</head>
<body>
<script src="protobuf.min.js"></script>
<script type="text/javascript">
var socket;
//假如浏览器支撑WebSocket
if (window.WebSocket) {
//参数就是与服务器衔接的地点
socket = new WebSocket("ws://localhost:8899/ws");
//客户端收到服务器音讯的时刻就会实行这个回调要领
socket.onmessage = function (event) {
var ta = document.getElementById("responseText");
// 解码
responseUserDecoder({
data: event.data,
success: function (responseUser) {
var content = "客服小姐姐: " + responseUser.userName +
", 小姐姐岁数: " + responseUser.age +
", 暗码: " + responseUser.password;
ta.value = ta.value + "\n" + content;
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log("解码悉数完成")
}
})
}
//衔接竖立的回调函数
socket.onopen = function (event) {
var ta = document.getElementById("responseText");
ta.value = "衔接开启";
}
//衔接断掉的回调函数
socket.onclose = function (event) {
var ta = document.getElementById("responseText");
ta.value = ta.value + "\n" + "衔接封闭";
}
} else {
alert("浏览器不支撑WebSocket!");
}
//发送数据
function send(message) {
if (!window.WebSocket) {
return;
}
// socket.binaryType = "arraybuffer";
// 推断是不是开启
if (socket.readyState !== WebSocket.OPEN) {
alert("衔接没有开启");
return;
}
var data = {
userName: message,
age: 18,
password: "11111"
};
requestUserEncoder({
data: data,
success: function (buffer) {
console.log("编码胜利");
socket.send(buffer);
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log("编码悉数完成")
}
});
}
/**
* 发送的音讯编码成 protobuf
*/
function requestUserEncoder(obj) {
var data = obj.data;
var success = obj.success; // 胜利的回调
var fail = obj.fail; // 失利的回调
var complete = obj.complete; // 胜利或许失利都邑回调
protobuf.load("../proto/MessageDataProto.proto", function (err, root) {
if (err) {
if (typeof fail === "function") {
fail(err)
}
if (typeof complete === "function") {
complete()
}
return;
}
// Obtain a message type
var RequestUser = root.lookupType("com.example.nettydemo.protobuf.RequestUser");
// Exemplary payload
var payload = data;
// Verify the payload if necessary (i.e. when possibly incomplete or invalid)
var errMsg = RequestUser.verify(payload);
if (errMsg) {
if (typeof fail === "function") {
fail(errMsg)
}
if (typeof complete === "function") {
complete()
}
return;
}
// Create a new message
var message = RequestUser.create(payload); // or use .fromObject if conversion is necessary
// Encode a message to an Uint8Array (browser) or Buffer (node)
var buffer = RequestUser.encode(message).finish();
if (typeof success === "function") {
success(buffer)
}
if (typeof complete === "function") {
complete()
}
});
}
/**
* 接收到服务器二进制流的音讯举行解码
*/
function responseUserDecoder(obj) {
var data = obj.data;
var success = obj.success; // 胜利的回调
var fail = obj.fail; // 失利的回调
var complete = obj.complete; // 胜利或许失利都邑回调
protobuf.load("../proto/MessageDataProto.proto", function (err, root) {
if (err) {
if (typeof fail === "function") {
fail(err)
}
if (typeof complete === "function") {
complete()
}
return;
}
// Obtain a message type
var ResponseUser = root.lookupType("com.example.nettydemo.protobuf.ResponseUser");
var reader = new FileReader();
reader.readAsArrayBuffer(data);
reader.onload = function (e) {
var buf = new Uint8Array(reader.result);
var responseUser = ResponseUser.decode(buf);
if (typeof success === "function") {
success(responseUser)
}
if (typeof complete === "function") {
complete()
}
}
});
}
</script>
<h1>迎接接见客服体系</h1>
<form onsubmit="return false">
<textarea name="message" style="width: 400px;height: 200px"></textarea>
<input type="button" value="发送数据" onclick="send(this.form.message.value);">
<h3>复兴音讯:</h3>
<textarea id="responseText" style="width: 400px;height: 300px;"></textarea>
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据">
</form>
</body>
</html>
扩大浏览
spring boot 完成的背景治理体系
vue + element-ui 完成的背景治理界面,接入 spring boot API接口