10 Triple 协议

dubbo 基于 3.2.6 版本

dubbo 3.x 中,新增了一种协议,那就是 triple 协议,可以兼容 grpc 协议, 这两个协议的底层都是 http2 协议。 triple 协议实现的比较复杂,所以我会把关键代码贴出来。

export 导出服务

源码位置: org.apache.dubbo.rpc.protocol.tri.TripleProtocol#export

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    String key = serviceKey(url);
    ...
    invokers.add(invoker);

    // 添加到 pathResolver, 这个很关键
    Invoker<?> previous = pathResolver.add(url.getServiceKey(), invoker);
    if (previous != null) {
       ...
    }
    ...
    // 初始化线程池
    ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));
    
    // 绑定端口,开启服务, 注意 DefaultPuHandler 是空实现,这是和 DubboProtocol 实现的主要区别
    PortUnificationExchanger.bind(url, new DefaultPuHandler());
    // 序列化,不用关心
    optimizeSerialization(url);
    return exporter;
}

源码位置: org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationTransporter#bind

1
2
3
4
5
6
// PortUnificationExchanger#bind 最终会调用此方法
// 在 NettyPortUnificationServer 的父类构造函数中会调用 doOpen 方法
@Override
public AbstractPortUnificationServer bind(URL url, ChannelHandler handler) throws RemotingException {
    return new NettyPortUnificationServer(url, handler);
}

源码位置: org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer#doOpen

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 下面的代码是标准的 netty 代码, 我们只需要关注其中的 channelHandler 就可以了
public void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    ...
    bootstrap.group(bossGroup, workerGroup)
        .channel(NettyEventLoopFactory.serverSocketChannelClass())
        .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
        .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // Do not add idle state handler here, because it should be added in the protocol handler.
                final ChannelPipeline p = ch.pipeline();
                // 负责心跳,不用关心
                NettyChannelHandler nettyChannelHandler = new NettyChannelHandler(dubboChannels, getUrl(), NettyPortUnificationServer.this);
                // puHandler 是最重要的 channelHandler, 负责检测是 grpc 还是 triple 协议
                NettyPortUnificationServerHandler puHandler = new NettyPortUnificationServerHandler(getUrl(), true, getProtocols(),
                    NettyPortUnificationServer.this,
                    getSupportedUrls(), getSupportedHandlers());
                p.addLast("channel-handler", nettyChannelHandler);
                p.addLast("negotiation-protocol", puHandler);
            }
        });
    ...
}

源码位置: org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServerHandler#decode

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 当接受到请求时,netty 会回调这个方法
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    ...
    if (providerConnectionConfig != null && isSsl(in)) {
        // 检测 SSL,就是判断前5个字符
        enableSsl(ctx, providerConnectionConfig);
    } else {
        // 检测是 grpc 还是 triple
        for (final WireProtocol protocol : protocols) {
            in.markReaderIndex();
            ChannelBuffer buf = new NettyBackedChannelBuffer(in);
            final ProtocolDetector.Result result = protocol.detector().detect(buf);
            in.resetReaderIndex();
            switch (result) {
                case UNRECOGNIZED:
                    continue;
                case RECOGNIZED:
                    String protocolName = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class)
                        .getExtensionName(protocol);
                    // 获取 handler 和 url, 不用关心
                    ChannelHandler localHandler = this.handlerMapper.getOrDefault(protocolName, handler);
                    URL localURL = this.urlMapper.getOrDefault(protocolName, url);
                    channel.setUrl(localURL);
                    NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
                    // 配置 channelHandler,非常重要,后面继续解析
                    protocol.configServerProtocolHandler(url, operator);
                    // 移除当前 channelHandler,下一次就不需要在检测了
                    ctx.pipeline().remove(this);
                case NEED_MORE_DATA:
                    return;
                default:
                    return;
            }
        }
        ...
    }
}

源码位置: org.apache.dubbo.rpc.protocol.tri.TripleHttp2Protocol#configServerProtocolHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 配置 http2 相关的 channelHandler
@Override
public void configServerProtocolHandler(URL url, ChannelOperator operator) {
    ...
    final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
         ...
        .build();
    ExecutorSupport executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url);
    codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
    TripleWriteQueue writeQueue = new TripleWriteQueue();
    final Http2MultiplexHandler handler = new Http2MultiplexHandler(
        new ChannelInitializer<Http2StreamChannel>() {
            @Override
            protected void initChannel(Http2StreamChannel ch) {
                final ChannelPipeline p = ch.pipeline();
                p.addLast(new TripleCommandOutBoundHandler());
                // TripleHttp2FrameServerHandler 就是真实处理 http2 请求
                p.addLast(new TripleHttp2FrameServerHandler(frameworkModel, executorSupport,
                    headFilters, ch, writeQueue));
            }
        });
    // 添加一系列的 channelHandler
    List<ChannelHandler> handlers = new ArrayList<>();
    handlers.add(new ChannelHandlerPretender(codec)); // http2 的编解码
    handlers.add(new ChannelHandlerPretender(new FlushConsolidationHandler(64, true))); // 减少 flush 次数
    handlers.add(new ChannelHandlerPretender(new TripleServerConnectionHandler())); // 连接管理
    handlers.add(new ChannelHandlerPretender(handler)); // 处理 http2 frame
    handlers.add(new ChannelHandlerPretender(new TripleTailHandler())); // 释放 ReferenceCounted
    operator.configChannelHandler(handlers);
}

TripleHttp2FrameServerHandler

接受客户端的消息

源码位置: org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler#TripleHttp2FrameServerHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// http2 中的每个 stream 都会接受到回调方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof Http2HeadersFrame) {
        // 处理 Http2HeadersFrame, 读取 serviceName,methodName
        onHeadersRead(ctx, (Http2HeadersFrame) msg);
    } else if (msg instanceof Http2DataFrame) {
        // 处理 Http2DataFrame, 读取数据
        onDataRead(ctx, (Http2DataFrame) msg);
    } else if (msg instanceof ReferenceCounted) {
        // ignored
        ReferenceCountUtil.release(msg);
    }
}


// 处理 Http2DataFrame
public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception {
    tripleServerStream.transportObserver.onData(msg.content(), msg.isEndStream());
}

// 处理 Http2HeadersFrame
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
    Executor executor = executorSupport.getExecutor(msg.headers());
    tripleServerStream.setExecutor(executor);
    // 调用 ServerTransportObserver#onHeader 方法
    tripleServerStream.transportObserver.onHeader(msg.headers(), msg.isEndStream());
}

源码位置: org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerTransportObserver#onHeader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public void onHeader(Http2Headers headers, boolean endStream) {
    executor.execute(() -> processHeader(headers, endStream));
}

private void processHeader(Http2Headers headers, boolean endStream) {
    ...
    String[] parts = path.split("/");
    if (parts.length != 3) {
        responseErr(TriRpcStatus.UNIMPLEMENTED.withDescription("Bad path format:" + path));
        return;
    }
    String serviceName = parts[1];
    String originalMethodName = parts[2];

    // 从 pathResolver 中获取 invoker
    Invoker<?> invoker = getInvoker(headers, serviceName);
    ...

    // headers 转换为 map
    Map<String, Object> requestMetadata = headersToMap(headers, () -> {
        return Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
            .map(CharSequence::toString)
            .orElse(null);
    });
    boolean hasStub = pathResolver.hasNativeStub(path);
    if (hasStub) {
        listener = new StubAbstractServerCall(invoker, TripleServerStream.this,
            frameworkModel,
            acceptEncoding, serviceName, originalMethodName, executor);
    } else {
        // 常用的就是这个,下面以这个为例子
        listener = new ReflectionAbstractServerCall(invoker, TripleServerStream.this,
            frameworkModel, acceptEncoding, serviceName, originalMethodName, filters,
            executor);
    }
    // must before onHeader
    deframer = new TriDecoder(deCompressor, new ServerDecoderListener(listener));
    // 根据 methodDescriptor 来获取最终调用的 listener,非常重要
    listener.onHeader(requestMetadata);
}

源码位置: org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall#onHeader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Override
public void onHeader(Map<String, Object> requestMetadata) {
    this.requestMetadata = requestMetadata;
    ...
    startCall();
}

// 注意 startCall 应该调用子类的方法,在这里忽略,直接分析父类的方法逻辑
protected void startCall() {
    // 构建 RpcInvocation
    RpcInvocation invocation = buildInvocation(methodDescriptor);
    // 非常重要
    listener = startInternalCall(invocation, methodDescriptor, invoker);
}

源码位置: org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCall#startInternalCall

设置调用监听器, 比如 UnaryServerCallListener, ServerStreamServerCallListener, BiStreamServerCallListener

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected ServerCall.Listener startInternalCall(
    RpcInvocation invocation,
    MethodDescriptor methodDescriptor,
    Invoker<?> invoker) {
    this.cancellationContext = RpcContext.getCancellationContext();
    ServerCallToObserverAdapter<Object> responseObserver =
        new ServerCallToObserverAdapter<>(this, cancellationContext);
    try {
        ServerCall.Listener listener;
        switch (methodDescriptor.getRpcType()) {
            case UNARY:
                listener = new UnaryServerCallListener(invocation, invoker, responseObserver, packableMethod.needWrapper());
                request(2);
                break;
            case SERVER_STREAM:
                listener = new ServerStreamServerCallListener(invocation, invoker,
                    responseObserver);
                request(2);
                break;
            case BI_STREAM:
            case CLIENT_STREAM:
                listener = new BiStreamServerCallListener(invocation, invoker,
                    responseObserver);
                request(1);
                break;
            default:
                throw new IllegalStateException("Can not reach here");
        }
        return listener;
    } catch (Exception e) {
        ...
    }
    return null;
}

下面以 UnaryServerCallListener 来分析是怎么处理请求, 这里一定要仔细看。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
onHeadersRead(ctx, (Http2HeadersFrame) msg); 
// 读取 headers,设置了当前的 listener 为 UnaryServerCallListener

onDataRead(ctx, (Http2DataFrame) msg); 
// endStream 为 false,会调用 deframer.deframe(data) 解析数据
// 最终调用 UnaryServerCallListener#onMessage 方法来设置参数

onDataRead(ctx, (Http2DataFrame) msg); 
// endStream 为 true,会调用 deframer.close() 关闭 listener
// 最终调用 UnaryServerCallListener#onComplete 方法来执行 invoke 方法

onReturn(r.getValue());
// 把结果返回给客户端

refer 引用服务

源码位置: org.apache.dubbo.rpc.protocol.tri.TripleProtocol#refer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 序列化优化,不用关心
    optimizeSerialization(url);
    ExecutorService streamExecutor = getOrCreateStreamExecutor(
        url.getOrDefaultApplicationModel(), url);
    // 连接端口,注意 DefaultPuHandler 是空实现,这是和 DubboProtocol 实现的主要区别
    AbstractConnectionClient connectionClient = PortUnificationExchanger.connect(url, new DefaultPuHandler());
    // 包装为 tripleInvoker
    TripleInvoker<T> invoker = new TripleInvoker<>(type, url, acceptEncodings,
        connectionClient, invokers, streamExecutor);
    invokers.add(invoker);
    return invoker;
}

源码位置: org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationTransporter#connect

1
2
3
4
5
6
7
// PortUnificationExchanger#connect 最终会调用此方法
@Override
public AbstractConnectionClient connect(URL url, ChannelHandler handler) throws RemotingException {
    ConnectionManager manager = url.getOrDefaultFrameworkModel().getExtensionLoader(ConnectionManager.class).getExtension(MultiplexProtocolConnectionManager.NAME);
    // 连接, 最终会调用 NettyConnectionManager#connect 方法
    return manager.connect(url, handler);
}

源码位置: org.apache.dubbo.remoting.transport.netty4.NettyConnectionManager#connect

1
2
3
4
5
6
7
8
9
// 在 NettyConnectionClient 的父类构造方法中会调用 doOpen 和 doConnect 方法
@Override
public AbstractConnectionClient connect(URL url, ChannelHandler handler) {
    try {
        return new NettyConnectionClient(url, handler);
    } catch (RemotingException e) {
        throw new RuntimeException(e);
    }
}

源码位置: org.apache.dubbo.remoting.transport.netty4.NettyConnectionClient#doOpen

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Override
protected void doOpen() throws Throwable {
    initConnectionClient();
    // 初始化 netty 的 bootstrap, 设置了 http2 的编解码
    initBootstrap();
}

@Override
protected void doConnect() throws RemotingException {
    ...
    createConnectingPromise();
    // 连接端口
    final ChannelFuture promise = bootstrap.connect();
    ...忽略错误处理逻辑
}

TripleInvoker

当客户端调用方法时,就会执行 TripleInvoker#doInvoke 方法, 接下来分析这部分逻辑。

源码位置: org.apache.dubbo.rpc.protocol.tri.TripleInvoker#doInvoke

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 根据不同的方式来调用,比如 invokeUnary, invokeServerStream, invokeBiOrClientStream
@Override
protected Result doInvoke(final Invocation invocation) {
    ...
    ClientCall call = new TripleClientCall(connectionClient, callbackExecutor,
        getUrl().getOrDefaultFrameworkModel(), writeQueue);
    AsyncRpcResult result;
    try {
        switch (methodDescriptor.getRpcType()) {
            case UNARY:
                // 重点分析这个
                result = invokeUnary(methodDescriptor, invocation, call, callbackExecutor);
                break;
            case SERVER_STREAM:
                result = invokeServerStream(methodDescriptor, invocation, call);
                break;
            case CLIENT_STREAM:
            case BI_STREAM:
                result = invokeBiOrClientStream(methodDescriptor, invocation, call);
                break;
            default:
                throw new IllegalStateException("Can not reach here");
        }
        return result;
    } catch (Throwable t) {
        ...省略处理错误逻辑 
    }
}

源码位置: org.apache.dubbo.rpc.protocol.tri.TripleInvoker#invokeUnary

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 一元请求逻辑
AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation invocation,
                           ClientCall call, ExecutorService callbackExecutor) {

    ...
    final AsyncRpcResult result;
    DeadlineFuture future = DeadlineFuture.newFuture(getUrl().getPath(),
        methodDescriptor.getMethodName(), getUrl().getAddress(), timeout, callbackExecutor);

    RequestMetadata request = createRequest(methodDescriptor, invocation, timeout);

    final Object pureArgument;

    // 封装参数
    if (methodDescriptor instanceof StubMethodDescriptor) {
        pureArgument = invocation.getArguments()[0];
    } else {
        if (methodDescriptor.isGeneric()) {
            Object[] args = new Object[3];
            args[0] = RpcUtils.getMethodName(invocation);
            args[1] = Arrays.stream(RpcUtils.getParameterTypes(invocation)).map(Class::getName).collect(Collectors.toList());
            args[2] = RpcUtils.getArguments(invocation);
            pureArgument = args;
        } else {
            pureArgument = invocation.getArguments();
        }
    }
    result = new AsyncRpcResult(future, invocation);
    ...
    ClientCall.Listener callListener = new UnaryClientCallListener(future);
    // start 方法非常重要,创建了 TripleClientStream, 并设置 channelHandler
    final StreamObserver<Object> requestObserver = call.start(request, callListener);
    // 发送请求
    requestObserver.onNext(pureArgument);
    requestObserver.onCompleted();
    return result;
}

源码位置: org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall#start

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Override
public StreamObserver<Object> start(RequestMetadata metadata,
                                    ClientCall.Listener responseListener) {
    this.requestMetadata = metadata;
    this.listener = responseListener;
    // 在构造方法中调用 initHttp2StreamChannel 方法
    this.stream = new TripleClientStream(frameworkModel, executor, (Channel) connectionClient.getChannel(true),
        this, writeQueue);
    return new ClientCallToObserverAdapter<>(this);
}

源码位置: org.apache.dubbo.rpc.protocol.tri.stream.TripleClientStream#initHttp2StreamChannel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 初始化 http2 stream 的 channelHandler
private TripleStreamChannelFuture initHttp2StreamChannel(Channel parent) {
    TripleStreamChannelFuture streamChannelFuture = new TripleStreamChannelFuture(parent);
    Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
    bootstrap.handler(new ChannelInboundHandlerAdapter() {
            @Override
            public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                Channel channel = ctx.channel();
                // 负责发送请求
                channel.pipeline().addLast(new TripleCommandOutBoundHandler());
                // 负责接受响应
                channel.pipeline().addLast(new TripleHttp2ClientResponseHandler(createTransportListener()));
            }
        });
    CreateStreamQueueCommand cmd = CreateStreamQueueCommand.create(bootstrap, streamChannelFuture);
    this.writeQueue.enqueue(cmd);
    return streamChannelFuture;
}

TripleHttp2ClientResponseHandler

接受服务端的消息

源码位置: org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler#channelRead0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 负责接受响应
protected void channelRead0(ChannelHandlerContext ctx, Http2StreamFrame msg) throws Exception {
    if (msg instanceof Http2HeadersFrame) {
        final Http2HeadersFrame headers = (Http2HeadersFrame) msg;
        transportListener.onHeader(headers.headers(), headers.isEndStream());
    } else if (msg instanceof Http2DataFrame) {
        final Http2DataFrame data = (Http2DataFrame) msg;
        transportListener.onData(data.content(), data.isEndStream());
    } else {
        super.channelRead(ctx, msg);
    }
}

测试类

org.apache.dubbo.rpc.protocol.tri.TripleProtocolTest#testDemoProtocol

在调试过程,可能会出现超时,可以添加下面代码来解决。

1
2
3
4
URL consumerUrl = URL.valueOf(
    "tri://127.0.0.1:" + availablePort + "/" + IGreeter.class.getName());
// 添加下面代码
RpcContext.getClientAttachment().getObjectAttachments().put("timeout", 180000);
0%