`

高性能RPC over MINA&google protobuf 代码&实例 (二)

 
阅读更多

在本系列上篇http://maoyidao.iteye.com/blog/1636923 实现了基于google protobuf的序列化反序列化,现在看看怎么把他们组装到MINA的nio中。本篇主要描述怎么处理断包。

 

使用MINA的CumulativeProtocolDecoder是个好主意,先从MINA自己的sample开始。在这个例子中如果接受到的IoBuffer只包含一部分消息, decoders应该继续接收消息知道接收到完整的消息之后再进行解码。

List1

 

public class CrLfTerminatedCommandLineDecoder
         extends CumulativeProtocolDecoder {

     private Command parseCommand(IoBuffer in) {
         // Convert the bytes in the specified buffer to a
         // Command object.
         ...
     }

     protected boolean doDecode(
             IoSession session, IoBuffer in, ProtocolDecoderOutput out)
             throws Exception {

         // Remember the initial position.
         int start = in.position();

         // Now find the first CRLF in the buffer.
         byte previous = 0;
         while (in.hasRemaining()) {
             byte current = in.get();

             if (previous == '\r' && current == '\n') {
                 // Remember the current position and limit.
                 int position = in.position();
                 int limit = in.limit();
                 try {
                     in.position(start);
                     in.limit(position);
                     // The bytes between in.position() and in.limit()
                     // now contain a full CRLF terminated line.
                     out.write(parseCommand(in.slice()));
                 } finally {
                     // Set the position to point right after the
                     // detected line and set the limit to the old
                     // one.
                     in.position(position);
                     in.limit(limit);
                 }
                 // Decoded one line; CumulativeProtocolDecoder will
                 // call me again until I return false. So just
                 // return true until there are no more lines in the
                 // buffer.
                 return true;
             }

             previous = current;
         }

         // Could not find CRLF in the buffer. Reset the initial
         // position to the one we recorded above.
         in.position(start);

         return false;
     }
 }

 

IoBuffer继承java.nio.ByteBuffer,MINA定义这个类有两个原因:

ByteBuffer没有提供get/putString这样方便的方法。

ByteBuffer只能分配固定大小的空间,IoBuffer通过setAutoExpand,expand等方法支持写入变长数据,例如:

 

String greeting = messageBundle.getMessage("hello"); 
IoBuffer buf = IoBuffer.allocate(16);
// Turn on autoExpand (it is off by default)
buf.setAutoExpand(true);
CharsetEncoder utf8encoder = Charset.forName("UTF-8").newEncoder();
buf.putString(greeting, utf8encoder);
 

IoBuffer重新分配了底层的ByteBuffer,因为在这个例子里数据超过了16 byte。扩展的时候,ByteBuffer容量(capacity)增倍,limit则指向写入内容的最后一个位置。

 

List1代码展示的有关断包的逻辑是找到了下\r\n,即认为上一个包已经全部接收到。这是一中常见的断包逻辑,但maoyidao更喜欢使用指定包长度的方法,逻辑更清晰,代码更简单。结合http://maoyidao.iteye.com/blog/1636923 中序列化时加入的验证码,decode代码应该是这样的:

 

 

 public class MaoyidaoDecoder
         extends CumulativeProtocolDecoder {
     protected boolean doDecode(
             IoSession session, IoBuffer in, ProtocolDecoderOutput out)
             throws Exception 
     {
          CodedInputStream cis = CodedInputStream.newInstance(getBytes(in, 10));
		    int pos1 = in.position();
		    int pos2 = in.limit();
		    
		    int flag1 = cis.readRawVarint32();
		    int flag2 = cis.readRawVarint32();
		    if(flag1 != 3 || flag2 != 7){
		    	// 校验码不对,丢弃这个包
		    	in.position(pos2);
		    	return true;
		    }else{
		    	// 校验码无问题,重设postion,读下面的内容
		    	in.position(pos1 + 2);
		    }
		    
		    int contentLength = CodedInputStream.newInstance(getBytes(in, 5)).readRawVarint32();
		    int contentLength0 = contentLength + CodedOutputStream.computeRawVarint32Size(contentLength);
		    
		    // 是否有足够的数据
		    if(in.remaining() >= contentLength0){			
		    	try {
		    		Maoyidao.MaoyidaoPacket.Builder builder = Maoyidao.MaoyidaoPacket.newBuilder();
		    		CodedInputStream.newInstance(getBytes(in, protocolLength)).readMessage(builder, 
		    			ExtensionRegistryLite.getEmptyRegistry());				
		    		out.write(builder.build());
		    		in.position(in.position() + protocolLength);
		    		return true;
		    	} catch (Exception e) {
		    		
		    	}			
		    }
		    // 没有足够的数据,返回false,IoBuffer退回到最初的位置
		    else {
		    	in.position(pos1);
		    	return false;
			}
     }
 }

通过这个decodefilter,向上层的outputstream流中写入了rpc对象protobuf序列化后的字符流,这样后续处理只需根据protobuf对象,即可。

 

本系列(一)(二)2篇文章介绍了基于protobuf/MINA构建JAVA RPC的主要代码逻辑。

 

 本文系maoyidao原创,转载请引用原链接:

http://maoyidao.iteye.com/blog/1637193

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics