1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
| // GrpcBiStreamRequestAcceptor 处理流式请求
// 处理逻辑比较清楚,主要分为 onNext(正常请求)、onError(错误请求)、onCompleted(关闭请求)
// 在流式处理中,没有 requestHandler 来处理请求,
// 这是因为流式请求主要是 服务端发送数据给客户端,客户端接受后发送 ack response
@Override
public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {
StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {
final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();
final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();
String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();
String clientIp = "";
@Override
public void onNext(Payload payload) {
// 获取客户端的 ip
clientIp = payload.getMetadata().getClientIp();
traceDetailIfNecessary(payload);
Object parseObj;
try {
// 反序列化对象
parseObj = GrpcUtils.parse(payload);
} catch (Throwable throwable) {
Loggers.REMOTE_DIGEST
.warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
return;
}
// 请求对象为 null,不处理
if (parseObj == null) {
Loggers.REMOTE_DIGEST
.warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
return;
}
// 处理 ConnectionSetupRequest 请求,客户端启动时会发送这个请求
if (parseObj instanceof ConnectionSetupRequest) {
ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
Map<String, String> labels = setUpRequest.getLabels();
String appName = "-";
if (labels != null && labels.containsKey(Constants.APPNAME)) {
appName = labels.get(Constants.APPNAME);
}
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
// 新建 connection
Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
connection.setAbilities(setUpRequest.getAbilities());
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
// 注册 connection 对象, 如果不成功,则关闭连接
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
//Not register to the connection manager if current server is over limit or server is starting.
try {
Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
rejectSdkOnStarting ? " server is not started" : " server is over limited.");
connection.request(new ConnectResetRequest(), 3000L);
connection.close();
} catch (Exception e) {
//Do nothing.
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST
.warn("[{}]Send connect reset request error,error={}", connectionId, e);
}
}
}
} else if (parseObj instanceof Response) {
// 处理 response 请求,请求可能需要 ack
Response response = (Response) parseObj;
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST
.warn("[{}]Receive response of server request ,response={}", connectionId, response);
}
// ack 通知
RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
connectionManager.refreshActiveTime(connectionId);
} else {
Loggers.REMOTE_DIGEST
.warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
parseObj);
}
}
@Override
public void onError(Throwable t) {
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on error,error={}", connectionId, t);
}
if (responseObserver instanceof ServerCallStreamObserver) {
ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
if (serverCallStreamObserver.isCancelled()) {
//client close the stream.
} else {
try {
serverCallStreamObserver.onCompleted();
} catch (Throwable throwable) {
//ignore
}
}
}
}
@Override
public void onCompleted() {
if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on completed", connectionId);
}
if (responseObserver instanceof ServerCallStreamObserver) {
ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
if (serverCallStreamObserver.isCancelled()) {
//client close the stream.
} else {
try {
serverCallStreamObserver.onCompleted();
} catch (Throwable throwable) {
//ignore
}
}
}
}
};
return streamObserver;
}
|