RPC Introduction

What is RPC

Remote Procedure Call (RPC) is a protocol that one program can use to request a service from a program located in another computer on a network without having to understand the network’s details. A procedure call is also sometimes known as a function call or a subroutine call. A procedure call is also sometimes known as a function call or a subroutine call.

RPC 是一种远程过程调用协议,一个程序可以使用该协议从位于网络上另一台计算机中的程序请求服务,而无需了解网络的详细信息。RPC 有时也称为函数调用或子程序调用。RPC 是进程间通信的一种方式。

Why is RPC

网上很多关于 RPC 的优点,笔者都不太赞同。不能将那些成熟的 RPC 框架解决的问题全部归结为 RPC 的优点。比如 RPC 的概念中并没有提到服务治理等问题,但是常见的 RPC 框架(如 DUBBO)均解决了服务治理的相关问题,这显然不能作为 RPC 的优点。要解释“Why is RPC?”的问题,还是要回归到 RPC 的概念上来。

  • RPC 可以使远程过程调用变得像本地调用一样简单。
  • 方便计算能力的横向扩展。

有兴趣可以阅读 dubbo 诞生的背景 以及dubbo 解决了哪些需求

RPC vs HTTP

其实这两者不是同一个维度的概念。从 RPC 的概念上来讲,HTTP 其实就是 RPC 的一种实现。

RPC vs RMI

这两者也不是同一个维度的概念。

RMIJAVA 提供的一种远程方法调用,而 RPC 的变成模型是远程过程调用RPC 并没有规定采用哪种编程语言实现,我们甚至可以使用 JAVA 语言实现一个具体的 RPC 协议,内部采用 RMI 来实现数据的传输,例如 dubbo 中就可以采用 rmi 协议

How to implement RPC

我们先思考这么一个问题:要怎么去调用远程的服务呢?

  1. 要知道 ip 和端口,以确定远程唯一的一个进程。
  2. 要知道调用远程的进程中的什么服务,以确定方法和参数
  3. 调用远程的服务之后,要返回结果。

根据上述的三点,我们来看一下如何实现一个 RPC 框架。

原理

在一个 RPC 的架构中包含 4 个组件:

  1. Client:即客户端,服务调用方
  2. Client Stub:客户端存根,用于存方服务端地址信息,将客户端的信息打包成网络消息,再通过网络发送给服务提供者
  3. Server Stub:服务端存根,接收客户端发送过来的消息,并调用本地服务
  4. Server:服务提供者

一个 RPC 的调用过程:

  1. Client 调用以本地调用方式调用服务;
  2. Client Stub 接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体(在 Java 中是序列化的过程);
  3. Client Stub 找到服务地址,并将消息体通过网络发送到服务端;
  4. Server Stub 收到消息后进行解码(在 Java 中是反序列化的过程);
  5. Server Stub 根据解码结果调用 Server 处理;
  6. Server 将结果返回给 Server Stub
  7. Server Stub 将返回结果打包成消息体(在 Java 中是序列化的过程);
  8. Server Stub 将打包后的消息通过网络并发送至消费方;
  9. Client Stub 接收到消息,并进行解码(在 Java 中是反序列化的过程);
  10. Client 得到最终结果。

一图顶千言

实现

我们简单的采用 Java 语言来实现一个 RPC 框架。

命名

作为钢铁侠粉丝,这里我们将项目命名为 Edith

Even dead,I am hero. -Tony Stark

伊人已逝,仍是英雄。永远纪念 _Tony Stark_。

暴露服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class RpcFramework {
public static void export(final Object service, int port) throws Exception {
if (service == null) {
throw new IllegalArgumentException("Service instance is null");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}

System.out.println("Export service " + service.getClass().getName() + " on port " + port);
ServerSocket server = new ServerSocket(port);
for (; ; ) {
try {
final Socket socket = server.accept();
new Thread(() -> {
try {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Method method = service.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(service, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

引用服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class RpcFramework {
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) {
if (interfaceClass == null) {
throw new IllegalArgumentException("Interface class is null");
}
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
}
if (host == null || host.length() == 0) {
throw new IllegalArgumentException("Host is null!");
}
if (port <= PORT_MIN || port > PORT_MAX) {
throw new IllegalArgumentException("Invalid port " + port);
}
System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, (proxy, method, arguments) -> {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
});
}
}

定义接口

1
2
3
4
5
6
7
8
9
10
public interface HelloService {

/**
* 返回一个字符串
*
* @param name name
* @return Hello name
*/
String hello(String name);
}

编写服务提供者

先实现 HelloService

1
2
3
4
5
6
7
8
9
public class HelloServiceImpl implements HelloService {

@Override
public String hello(String name) {
String result = "Hello " + name;
System.out.println(Thread.currentThread().getName() + " " + result);
return result;
}
}

启动并暴漏服务

1
2
3
4
5
6
7
public class Provider {

public static void main(String[] args) throws Exception {
HelloService helloService = new HelloServiceImpl();
RpcFramework.export(helloService, 8888);
}
}

编写服务调用者

1
2
3
4
5
6
7
8
public class Consumer {

public static void main(String[] args) {
HelloService helloService = RpcFramework.refer(HelloService.class, "127.0.0.1", 8888);
String name = "Peter Parker";
System.out.println(helloService.hello(name));
}
}

执行 Consumer,查看控制台输出:

1
Hello Peter Parker

总结

本文的例子,采用了阻塞的 Socket IO 流来进行 Client 和 Server 之间的端对端通信。方法的远程调用采用了 Java 的动态代理,参数的序列化与反序列化采用 ObjectStream。成熟的 RPC 框架会对上述的实现方式进行替换,比如采用 NettyNIO 来实现非阻塞通信,采用成熟的注册中心来统一管理服务的注册与发现,采用 hession2fastjson 等序列化方式等等。成熟的 RPC 框架还会考虑调用过程中的诸多问题,比如重试机制、超时配置等等。

本文参考了 徐靖峰的博客