1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
| // 处理消息
// 代码非常多,沉下心来看
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader,
final TopicQueueMappingContext mappingContext,
final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
// 设置 response, 添加属性 PROPERTY_MSG_REGION,PROPERTY_TRACE_SWITCH
final RemotingCommand response = preSend(ctx, request, requestHeader);
if (response.getCode() != -1) {
return response;
}
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
// 获取 topic 配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
// 随机选择一个 queue
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
// 处理重试和死信消息
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
return response;
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
// 生成消息唯一ID
String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqKey == null || uniqKey.length() <= 0) {
uniqKey = MessageClientIDSetter.createUniqID();
oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey);
}
...
// 设置 tag 的 hashcode
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
// 检查事务消息
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
long beginTimeMillis = this.brokerController.getMessageStore().now();
// 是否异步发送,无论是同步还是异步,处理逻辑都是一样的
if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
CompletableFuture<PutMessageResult> asyncPutMessageFuture;
if (sendTransactionPrepareMessage) {
// 存储 prepare 事务消息
asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
// 存储消息,后面的章节会继续分析
asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
final int finalQueueIdInt = queueIdInt;
final MessageExtBrokerInner finalMsgInner = msgInner;
asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
RemotingCommand responseFuture =
// 处理存储结果,这里设置相应的 code 和 remark, 记录 metric
handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}
sendMessageCallback.onComplete(sendMessageContext, response);
}, this.brokerController.getPutMessageFutureExecutor());
// Returns null to release the send message thread
return null;
} else {
// 同步存储消息
PutMessageResult putMessageResult = null;
if (sendTransactionPrepareMessage) {
// 存储 prepare 事务消息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 存储消息, 后面的章节会继续分析
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// 处理存储结果,这里设置相应的 code 和 remark, 记录 metric
handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
}
|