Rpc的设计与实现


Rpc应该建立在

  • 网络通信(socket,netty……)
  • 消息(protobuf,thrift……)

的基础之上


分层

  • 应用层
  • RPC层
  • 消息传输层(protobuf on netty)
  • 网络框架层(netty)
  • 网络通信层(socket)

先暂时忽略掉底层 东西,考虑一个Rpc流程……

C(客户端), S(服务器)

APP(应用层), RPC(Rpc层)


  • CAPP:调用一个方法->
  • CRPC:将方法参数序列化通过网络传输到服务器->
  • SRPC:收到数据反序列化为消息,然后调用相应处理方法->
  • SAPP:一个普通方法被调用,并返回数据->
  • SRPC:将应用层返回的数据序列化,通过网络返回给客户端->
  • CRPC:收到服务器返回的数据,反序列化之后,传递给应用层->
  • CAPP:得到函数调用结果

为了简单,下面所讨论的RPC都只是一种简单的RPC, 请求和返回的都是继承自MessageLite的消息

即:

1
MessageLite rpcCall(MessageLite msg);

问题:

  • 怎么将一个请求同步或者异步的返回?
  • 请求与返回怎么对应上?

同步异步

Future & Promise

序列号

每一个消息中,都需要携带一个序列号,返回时,携带请求是的序列号一起返回,
客户端再根据序列号找到调用者

Future:

1
2
3
4
5
6
7
public interface Future<V> {
boolean isDone();
boolean isSuccess();
boolean isCancellable();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
V get();
}

Promise:

1
2
3
4
5
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
Promise<V> setFailure(Throwable cause);
Promise<V> sync();
}


带有序列号的 RpcWraper消息:

1
2
3
4
5
6
message RpcWraperMessage {
required int64 serialid = 1;
required bool is_request = 2;
required string msg_name = 3;
required bytes protoMsg = 4;
}


在Promise和RpcWraperMessage的基础上
RpcClient:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RpcClient extends Client {
//...

public <R> R sendRpcMsg(MessageLite msg) {
Promise<R> resPromise = sendRpcMsgAsync(msg);
resPromise.awaitUninterruptibly();
if (resPromise.isSuccess())
return resPromise.getNow();
return null;
}

public <R> Promise<R> sendRpcMsgAsync(MessageLite msg) {
Promise<R> resPromise = Server.getInstance().getClientGroup()
.next().newPromise();
long serialid = SerialIdGenerator.next();
TimeOutRpcCallCache.getInstance().add(serialid, resPromise);

RpcWraper.RpcWraperMessage rwm = RpcWraper.RpcWraperMessage.newBuilder()
.setSerialid(serialid)
.setMsgName(msg.getClass().getName())
.setIsRequest(true)
.setProtoMsg(msg.toByteString())
.build();
sendMsg(rwm);

return resPromise;
}
}


针对RpcWraperMessage的消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class RpcWraperMessageHandler extends MessageHandler {
public void onMessage(Channel channel, MessageLite messageLite) {
RpcWraper.RpcWraperMessage rwm = (RpcWraper.RpcWraperMessage)messageLite;
MessageDispatcher.MessageMetaData mmd = MessageDispatcher.getInstance()
.getMessageMetaDataByClassName(rwm.getMsgName());

//...
MessageLite msg = mmd.getMessageLite().newBuilderForType()
.mergeFrom(rwm.getProtoMsg()).build();
if (rwm.getIsRequest()) {
MessageHandler handler = mmd.getHandler();
if (handler instanceof RpcMessageHandler) {
((RpcMessageHandler) handler).onMessage(channel,
msg, rwm.getSerialid());
}
} else {
handleResponse(channel, msg, rwm.getSerialid());
}
//...
}

private void handleResponse(Channel channel, MessageLite msg, long serialid) {
RpcCacheEntry entry = TimeOutRpcCallCache.getInstance()
.getAndRemove(serialid);
//...
entry.result.trySuccess(msg); // !!!这里设置结果
}
}

更进一步
RpcWraperMessage也只是一个普通的消息,那么底层消息是如何分发到指定的Handler中的?


首先:看到在RpcWraperMessage中存在一个msg_name字段,填充的内容则是msg.getClass().getName()

同样的,在底层消息设计时,也存在这样一个字段

1
2
3
4
5
6
{
int nameLen;
int msgLen;
bytes name;
bytes protobufMsg; // 这里保存的才是Protobuf转换之后的二进制数据
}


  • 注册:所有的消息和消息对应的Handler需要有一个注册的过程,将二者绑定在一起
1
2
3
4
MessageDispatcher.getInstance()
.register(RpcWraper.RpcWraperMessage.getDefaultInstance(), new RpcWraperMessageHandler());
MessageDispatcher.getInstance()
.register(BaseMessageProtos.LoginMessage.getDefaultInstance(), new LoginMessageHandler());

  • 接收消息:当收到消息时,根据消息结构中的name字段,找到对应的Handler,然后将消息分发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class MessageMetaData {
private MessageHandler handler;
private MessageLite messageLite;
}

// 这里的参数是已经被Decode之后的数据
private boolean doDispatch0(Channel channel,
String name,
byte[] msg)
{

MessageMetaData mmd = messageMap.get(name);
//...
MessageLite messageLite = mmd.getMessageLite().newBuilderForType()
.mergeFrom(msg).build();
mmd.getHandler().onMessage(channel, messageLite);
return true;
//...
}

消息的编解码:

Protobuf:

1
2
3
4
5
6
7
8
9
//消息的构造
BaseMessageProtos.LoginMessage msg = BaseMessageProtos.LoginMessage
.newBuilder()
// .setXXX(xxx)
.build();

//消息的解码
MessageLite messageLite = mmd.getMessageLite().newBuilderForType()
.mergeFrom(msg).build();


Netty层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
	//Encoder:
public class MessageEncoder extends MessageToByteEncoder<MessageLite> {
protected void encode(ChannelHandlerContext ctx,
MessageLite msg,
ByteBuf out) throws Exception
{

if (out.isWritable()) {
int msgLen = msg.getSerializedSize();
int nameLen = msg.getClass().getName().length();
out.writeInt(nameLen);
out.writeInt(msgLen);
out.writeBytes(msg.getClass().getName().getBytes());
out.writeBytes(msg.toByteArray());
}
}
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
	//Decoder:
public class MessageDecoder extends ReplayingDecoder<Integer> {
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf,
List<Object> list) throws Exception
{

if (state() == null)
state(0);

int nameLen = byteBuf.readInt();
int msgLen = byteBuf.readInt();

byte[] byteName = new byte[nameLen];
byteBuf.readBytes(byteName, 0, nameLen);
String name = new String(byteName, "UTF-8");

byte[] byteMsg = new byte[msgLen];
byteBuf.readBytes(byteMsg, 0, msgLen);
}
}

Encoder和Decoder怎么用?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//Netty 的基础用法
//client
bootstrap = new Bootstrap().channel(NioSocketChannel.class)
.group(Server.getInstance().getClientGroup()).remoteAddress(address);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new MessageDecoder())
.addLast(new MessageEncoder())
.addLast(new WatchDog(Client.this));
}
});
bootstrap.connect()

//server
bootstrap = new ServerBootstrap();
bootstrap.group(serverGroup, clientGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new WatchDog(Server.this))
.addLast(new MessageEncoder())
.addLast(new MessageDecoder());
}
});
bootstrap.bind(port).syncUninterruptibly();

类比直接使用socket


Netty简单总结:

Promise&Future

ByteBuf

Decoder&Encoder

ChannelPipeline

EventLoopGroup

Bootstrap

所有都是异步


更多:
实现针对函数的真正RPC
利用动态代理、注解等实现App层与RPC层之间的调用


源码

Thanks~

文章目录
,