详解NIO框架Mina的编解码的应用实例

JAVA herman 2694浏览 0评论
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:xttblog,发送下载链接帮助你免费下载!
本博客日IP超过1800,PV 2600 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog,之前的微信号好友位已满,备注:返现
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领

说到NIO框架,目前最流行的有Mina、Jetty、HP-Socket等。它们的实现原理基本相同,只是在细节上有些区别。说到网络编程,就离不开编解码,NIO的核心也就是让我们实现编解码,更多的关注于业务,而不再关注架构的设计。如果你会了编解码,那么网络编程就不再是问题。
今天小编就为大家详细的实现一个普通的编解码功能。先来看一下我们的协议:
mina编解码协议
解码代码如下:

public class XttblogDecoder extends CumulativeProtocolDecoder {
    private static final Logger log = Logger.getLogger(XttblogDecoder.class);
    private Charset charset =null;
    private static final String HUPU_TCP_BUFFER = "xttblog_tcp_msg_buffer";
    
    public XttblogDecoder(Charset charset) {
        this.charset=charset;
    }
    
    @Override
    protected boolean doDecode(IoSession session, IoBuffer ioBuffer, ProtocolDecoderOutput out) {
        return mergePocket(session, ioBuffer, out);
    }
    
    private boolean mergePocket(IoSession session, IoBuffer ioBuffer, ProtocolDecoderOutput out){
        // 上一次未处理完的byte数组 ,当前需要读的byte数组,两次合起来的总byte数组
        byte[] last_byte = {} , read_byte = null , total_byte = null;
        Object msg = session.getAttribute(HUPU_TCP_BUFFER);
        try {
            last_byte = (null == msg) ? last_byte : (byte[])msg;
            // 没有剩下的则解析完成
            if(ioBuffer.hasRemaining()){
                ioBuffer.order(ByteOrder.LITTLE_ENDIAN);
                // 获取消息剩下的长度
                int bufLen = ioBuffer.remaining();
                read_byte = new byte[bufLen];
                ioBuffer.get(read_byte);
                total_byte = new byte[last_byte.length + read_byte.length];
                System.arraycopy(last_byte, 0, total_byte, 0, last_byte.length);
                System.arraycopy(read_byte, 0, total_byte, last_byte.length, read_byte.length);
                return decodePocket(session,out,total_byte);
            }
            if (last_byte.length <= 0) {
                // 当前为空数据包,继续下一次读取
                return Boolean.TRUE;
            }
            return decodePocket(session,out,last_byte);
        } catch (Exception e) {
            disposeTcpConnect(session);
            log.error("TCP协议解码异常"+e.getMessage(),e);
        } finally{
            //等待回收
            last_byte = null;
            read_byte = null;
            total_byte = null;
            msg = null;
        }
        return Boolean.FALSE;
    }
    
    private boolean decodePocket(IoSession session,ProtocolDecoderOutput out,byte [] bufByte){
        int bufByteLen = bufByte.length;
        // 如果存在头部部分
        if(bufByteLen>8){
            MessageInfo msgInfo = new MessageInfo();
            msgInfo.setMsgIdentify((short) (((bufByte[1] & 0xff) << 8) | (bufByte[0] & 0xff)));
            msgInfo.setMsgBodySize(((bufByte[7] & 0xff) << 24) | ((bufByte[6] & 0xff) << 16) | ((bufByte[5] & 0xff) << 8) | (bufByte[4] & 0xff));
            int msgLen = msgInfo.getMsgBodySize() + 8;
            if(bufByteLen >= msgLen){
                byte [] body_byte = new byte[msgInfo.getMsgBodySize()];
                System.arraycopy(bufByte, 8, body_byte, 0, body_byte.length);
                msgInfo.setMsgBody(new String(body_byte,charset));
                out.write(msgInfo);
                if(bufByteLen == msgLen){
                    session.removeAttribute(HUPU_TCP_BUFFER);
                    return Boolean.TRUE;
                }
                byte [] last_byte = new byte[bufByteLen - msgLen];
                System.arraycopy(bufByte, msgLen , last_byte, 0, last_byte.length);
                //当前数据包未读取完整,存储到会话中,等待下次读取
                session.setAttribute(HUPU_TCP_BUFFER, last_byte);
                if(last_byte.length > 8){
                    return decodePocket(session, out, last_byte);
                }
                body_byte = null;
                last_byte = null;
            }else{
                session.setAttribute(HUPU_TCP_BUFFER, bufByte);
            }
        }
        return Boolean.FALSE;
    }
    
    private void disposeTcpConnect(IoSession session){
        try {
            // 关闭会话
            if (null != session) {
                session.close(true);
                //很多人因为没有销毁IoService出现打开文件过多问题
                //具体文章,查看:http://www.xttblog.com/?p=446
                session=null;
            }
        } catch (Exception e) {
            log.error("关闭TCP连接异常" + e.getMessage(), e);
        }
    }
}

编码代码如下:

public class XttblogEncoder extends ProtocolEncoderAdapter {
    private static final Logger log = Logger.getLogger(XttblogEncoder.class);
    private Charset charset = null;

    public XttblogEncoder(Charset charset) {
        this.charset = charset;
    }

    public void encode(IoSession session, Object message,ProtocolEncoderOutput out) {
        try {
            MessageInfo hmi = (MessageInfo) message;
            if (null != hmi) {
                IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);
                buf.order(ByteOrder.LITTLE_ENDIAN);
                byte[] bytearr = hmi.getMsgBody().getBytes(charset);
                hmi.setMsgBodySize(bytearr.length);
                buf.putShort(hmi.getMsgIdentify());
                buf.putShort((short)0);//保留字段
                buf.putInt(hmi.getMsgBodySize());
                buf.put(bytearr);
                buf.flip();
                out.write(buf);
                out.flush();
            }
        } catch (Exception e) {
            log.error("TCP协议编码异常!"+e.getMessage(),e);
        }
    }
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加QQ1群:135430763(2000人群已满),QQ2群:454796847(已满),QQ3群:187424846(已满)。QQ群进群密码:xttblog,想加微信群的朋友,之前的微信号好友已满,请加博主新的微信号:xttblog,备注:“xttblog”,添加博主微信拉你进群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作可添加助理微信进行沟通!

本文原文出处:业余草: » 详解NIO框架Mina的编解码的应用实例