1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
| // client 流式处理请求
// onNext: 处理请求
// onError 和 onCompleted 来变换 grpc server
private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub,
final GrpcConnection grpcConn) {
return streamStub.requestBiStream(new StreamObserver<Payload>() {
@Override
public void onNext(Payload payload) {
LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}",
grpcConn.getConnectionId(), payload.toString());
try {
Object parseBody = GrpcUtils.parse(payload);
final Request request = (Request) parseBody;
if (request != null) {
try {
// 处理服务端请求
Response response = handleServerRequest(request);
if (response != null) {
response.setRequestId(request.getRequestId());
// 响应
sendResponse(response);
} else {
LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(),
request.getRequestId());
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}",
grpcConn.getConnectionId(), payload.toString(), e.getMessage());
Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR,
"Handle server request error");
errResponse.setRequestId(request.getRequestId());
sendResponse(errResponse);
}
}
} catch (Exception e) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
}
}
@Override
public void onError(Throwable throwable) {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
grpcConn.getConnectionId(), throwable);
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}",
grpcConn.getConnectionId(), isRunning, isAbandon);
}
}
@Override
public void onCompleted() {
boolean isRunning = isRunning();
boolean isAbandon = grpcConn.isAbandon();
if (isRunning && !isAbandon) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server",
grpcConn.getConnectionId());
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsync();
}
} else {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}",
grpcConn.getConnectionId(), isRunning, isAbandon);
}
}
});
}
|