很久没写文章了,重新看了下新版kafka的异步通信逻辑,觉得有必要做个笔记。接下来以获取分片的offset为例子说明下客户端是如何从服务端获取到offset的。 首先看方法入口:
private long listOffset(TopicPartition partition, long timestamp) {
while (true) {
//向kafka集群发送获取offset请求,这里并未真正把请求发送出去,而是将请求缓存在queue中。而这里的RequestFuture实体实现了通信是异步的
RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
//这里是关键的地方。包含发送请求和获取返回数据的逻辑
client.poll(future);
//成功发送并获取到返回数据
if (future.succeeded())
return future.value();
if (!future.isRetriable())
throw future.exception();
if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
else
time.sleep(retryBackoffMs);
}
}
/**
*并不真正发送请求,而是组装请求实体并缓存
*/
private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<>(1);
partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
PartitionInfo info = metadata.fetch().partition(topicPartition);
if (info == null) {
metadata.add(topicPartition.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
return RequestFuture.staleMetadata();
} else if (info.leader() == null) {
log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
return RequestFuture.leaderNotAvailable();
} else {
//获取分片的leader,leader接收请求并获取相应数据返回给客户端
Node node = info.leader();
ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
return client.send(node, ApiKeys.LIST_OFFSETS, request)
.compose(new RequestFutureAdapter<ClientResponse, Long>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
handleListOffsetResponse(topicPartition, response, future);
}
});
}
}
下面来看具体的send方法:
/**
*Send a new request. Note that the request is not actually transmitted on the
*network until one of the {@link #poll(long)} variants is invoked. At this
*point the request will either be transmitted successfully or will fail.
*Use the returned future to obtain the result of the send. Note that there is no
*need to check for disconnects explicitly on the {@link ClientResponse} object;
*instead, the future will be failed with a {@link DisconnectException}.
*@param node The destination of the request
*@param api The Kafka API call
*@param request The request payload
*@return A future which indicates the result of the send.
*/
/**
*注释说明了这里并不会直接发送请求,而是等下一次poll的时候才处理那些在list中的为发送请求
*/
public RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
AbstractRequest request) {
return send(node, api, ProtoUtils.latestVersion(api.id), request);
}
private RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
short version,
AbstractRequest request) {
long now = time.milliseconds();
//创建异步处理完成handler
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api, version);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
//将请求放到unsent列表中
put(node, new ClientRequest(now, true, send, completionHandler));
// wakeup the client in case it is blocking in poll so that we can send the queued request
//唤醒因其他线程阻塞的io操作,表明现在可以退出来处理新的请求了
client.wakeup();
return completionHandler.future;
}
再来看下方法链里的compose方法:
/**
*Convert from a request future of one type to another type
*@param adapter The adapter which does the conversion
*@param <S> The type of the future adapted to
*@return The new future
*/
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
//新建一个RequestFuture,只存储结果
final RequestFuture<S> adapted = new RequestFuture<>();
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
//返回新创建的future
return adapted;
}
该方法的主要作用是将一种类型转换为另一种类型,这里是将ClientResponse转化为Long。 真正转化逻辑由适配器的onsuccess方法处理,如下:
private void handleListOffsetResponse(TopicPartition topicPartition,
ClientResponse clientResponse,
RequestFuture<Long> future) {
ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
short errorCode = lor.responseData().get(topicPartition).errorCode;
if (errorCode == Errors.NONE.code()) {
List<Long> offsets = lor.responseData().get(topicPartition).offsets;
if (offsets.size() != 1)
throw new IllegalStateException("This should not happen.");
long offset = offsets.get(0);
log.debug("Fetched offset {} for partition {}", offset, topicPartition);
//由compose方法返回的RequestFuture处理结果
future.complete(offset);
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
future.raise(Errors.forCode(errorCode));
} else {
log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
topicPartition, Errors.forCode(errorCode).message());
future.raise(new StaleMetadataException());
}
}
上面已经把请求初始化好,接下来会真正的通过socket发送请求并接收服务端返回的数据:
/**
*Block indefinitely until the given request future has finished.
*@param future The request future to await.
*@throws WakeupException if {@link #wakeup()} is called from another thread
*/
public void poll(RequestFuture<?> future) {、
//如果没完成则循环直到获取到数据
while (!future.isDone())
poll(Long.MAX_VALUE, time.milliseconds(), future);
}
/**
*Block until the provided request future request has finished or the timeout has expired.
*@param future The request future to wait for
*@param timeout The maximum duration (in ms) to wait for the request
*@return true if the future is done, false otherwise
*@throws WakeupException if {@link #wakeup()} is called from another thread
*/
public boolean poll(RequestFuture<?> future, long timeout) {
long begin = time.milliseconds();
long remaining = timeout;
long now = begin;
do {
poll(remaining, now, future);
now = time.milliseconds();
long elapsed = now - begin;
remaining = timeout - elapsed;
} while (!future.isDone() && remaining > 0);
return future.isDone();
}
public void poll(long timeout, long now, PollCondition pollCondition) {
// there may be handlers which need to be invoked if we woke up the previous call to poll
//在发送之前处理已经完成的请求
firePendingCompletedRequests();
synchronized (this) {
// send all the requests we can send now
trySend(now);
// check whether the poll is still needed by the caller. Note that if the expected completion
// condition becomes satisfied after the call to shouldBlock() (because of a fired completion
// handler), the client will be woken up.
if (pollCondition == null || pollCondition.shouldBlock()) {
client.poll(timeout, now);
now = time.milliseconds();
} else {
client.poll(0, now);
}
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
// trigger wakeups after checking for disconnects so that the callbacks will be ready
// to be fired on the next call to poll()
maybeTriggerWakeup();
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(now);
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();
}
private void firePendingCompletedRequests() {
boolean completedRequestsFired = false;
for (;;) {
//处理handler
RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
if (completionHandler == null)
break;
completionHandler.fireCompletion();
completedRequestsFired = true;
}
// wakeup the client in case it is blocking in poll for this future's completion
if (completedRequestsFired)
client.wakeup();
}
trySend(now)分支由以下方法完成,该方法主要是把列表里待发送的请求,对于同一个node,如果该node不包含处理完成的请求则不再向该node发送新的请求:
private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
//循环请求列表并发送
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {
client.send(request, now);
iterator.remove();
requestsSent = true;
}
}
}
return requestsSent;
}
public void send(ClientRequest request, long now) {
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
doSend(request, now);
}
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
//正在发送的请求放入queue中
this.inFlightRequests.add(request);
selector.send(request.request());
}
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
在把请求准备好之后开始真正发送和接收数据:
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//处理发送和接收
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
//清除之前的io操作留下的缓存数据
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//开始发送接收操作
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
//如果是连接请求则等待其完成连接
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
//这里读取和发送操作是放在一个方法体中,由于前面的while循环可以保证在发送操作完成之后可以立即不断的进行读取操作
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
//读取操作
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
//将接收到的数据放入queue等待处理
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
//写操作,把上面准备好的请求发送给channel
Send send = channel.write();
if (send != null) {
//缓存发送完成的请求
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
//出现异常关闭channel
close(channel);
//缓存连接关闭的channel
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
private void addToCompletedReceives() {
//循环上面读取操作放入queue的数据,将其缓存进completedReceives
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
Deque<NetworkReceive> deque = entry.getValue();
NetworkReceive networkReceive = deque.poll();
this.completedReceives.add(networkReceive);
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
if (deque.isEmpty())
iter.remove();
}
}
}
}
在发送和接收操作完成之后,接下来就要处理接收到的数据了:
1.首先把已经发送成功的请求从队列中去掉,避免重发:
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
ClientRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse()) {
this.inFlightRequests.completeLastSent(send.destination());
responses.add(new ClientResponse(request, now, false, null));
}
}
}
2.处理已经接收到的数据:
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
//获取该次返回数据对应的那次请求。上面的trySend方法保证了同一个source下不会有超过2个未完成的请求,所以这里不用担心获取的请求不是该次返回对应的那次请求
ClientRequest req = inFlightRequests.completeNext(source);
Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
3.处理那些断开的连接:
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (String node : this.selector.disconnected()) {
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now);
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
metadataUpdater.requestUpdate();
}
4.处理新创建的连接:
private void handleConnections() {
for (String node : this.selector.connected()) {
log.debug("Completed connection to node {}", node);
this.connectionStates.connected(node);
}
}
5.处理超时的请求:
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now);
}
// we disconnected, so we should probably refresh our metadata
if (nodeIds.size() > 0)
metadataUpdater.requestUpdate();
}
最后对所有请求进行回调处理,这里就要开始体现异步了,通过request的callback方法体,在最开始组装请求实体的时候,初始化了一个RequestFutureCompletionHandler作为该请求的callback:
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
//调用回调方法的onComplete方法
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
看下RequestFutureCompletionHandler的定义:
public class RequestFutureCompletionHandler implements RequestCompletionHandler {
private final RequestFuture<ClientResponse> future;
private ClientResponse response;
private RuntimeException e;
public RequestFutureCompletionHandler() {
this.future = new RequestFuture<>();
}
public void fireCompletion() {
if (e != null) {
future.raise(e);
} else if (response.wasDisconnected()) {
ClientRequest request = response.request();
RequestSend send = request.request();
ApiKeys api = ApiKeys.forId(send.header().apiKey());
int correlation = send.header().correlationId();
log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
api, request, correlation, send.destination());
future.raise(DisconnectException.INSTANCE);
} else {
//调用RequestFuture.complete方法
future.complete(response);
}
}
public void onFailure(RuntimeException e) {
this.e = e;
pendingCompletion.add(this);
}
@Override
public void onComplete(ClientResponse response) {
//将返回实体赋给该回调类的变量
this.response = response;
//将该回调类放入queue中等待在firePendingCompletedRequests统一处理,不在这里处理是为了避免发生死锁
pendingCompletion.add(this);
}
}
private void firePendingCompletedRequests() {
boolean completedRequestsFired = false;
for (;;) {
RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
if (completionHandler == null)
break;
//调用RequestFutureCompletionHandler的fireCompletion()方法
completionHandler.fireCompletion();
completedRequestsFired = true;
}
// wakeup the client in case it is blocking in poll for this future's completion
if (completedRequestsFired)
client.wakeup();
}
接下来看看RequestFuture的几个关键方法:
/**
*Complete the request successfully. After this call, {@link #succeeded()} will return true
*and the value can be obtained through {@link #value()}.
*@param value corresponding value (or null if there is none)
*@throws IllegalStateException if the future has already been completed
*@throws IllegalArgumentException if the argument is an instance of {@link RuntimeException}
*/
public void complete(T value) {
if (value instanceof RuntimeException)
throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
//将value值赋给result
if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
fireSuccess();
}
private void fireSuccess() {
T value = value();
while (true) {
//调用listener的onSuccess方法
RequestFutureListener<T> listener = listeners.poll();
if (listener == null)
break;
listener.onSuccess(value);
}
}
这里可能会疑问:哪来的listener呢?就是在最开始compose方法初始化的:
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
//这里会调用适配器的onSuccess方法
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
new RequestFutureAdapter<ClientResponse, Long>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
//处理最终结果,将response实体中解析出offset赋给future的result
handleListOffsetResponse(topicPartition, response, future);
}
}
private void handleListOffsetResponse(TopicPartition topicPartition,
ClientResponse clientResponse,
RequestFuture<Long> future) {
ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
short errorCode = lor.responseData().get(topicPartition).errorCode;
if (errorCode == Errors.NONE.code()) {
List<Long> offsets = lor.responseData().get(topicPartition).offsets;
if (offsets.size() != 1)
throw new IllegalStateException("This should not happen.");
long offset = offsets.get(0);
log.debug("Fetched offset {} for partition {}", offset, topicPartition);
//这里又来调用complete,这里不会再去调用什么listener的onSuccess了,因为没有为这个future设置listener。只是简单的把offset赋给future的result
future.complete(offset);
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
future.raise(Errors.forCode(errorCode));
} else {
log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
topicPartition, Errors.forCode(errorCode).message());
future.raise(new StaleMetadataException());
}
}
至此,整个过程已经基本上完成了,最后通过future.value获取为future的result赋的值。
其中比较绕的地方是RequestFuture中的各种回调操作,多看几遍基本上就能掌握。