Kafka集群之异步通信

很久没写文章了,重新看了下新版kafka的异步通信逻辑,觉得有必要做个笔记。接下来以获取分片的offset为例子说明下客户端是如何从服务端获取到offset的。 首先看方法入口:

private long listOffset(TopicPartition partition, long timestamp) {
    while (true) {
        //向kafka集群发送获取offset请求,这里并未真正把请求发送出去,而是将请求缓存在queue中。而这里的RequestFuture实体实现了通信是异步的
        RequestFuture<Long> future = sendListOffsetRequest(partition, timestamp);
        //这里是关键的地方。包含发送请求和获取返回数据的逻辑
        client.poll(future);

        //成功发送并获取到返回数据
        if (future.succeeded())
            return future.value();

        if (!future.isRetriable())
            throw future.exception();

        if (future.exception() instanceof InvalidMetadataException)
            client.awaitMetadataUpdate();
        else
            time.sleep(retryBackoffMs);
    }
}

/**
 *并不真正发送请求,而是组装请求实体并缓存
 */
private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
    Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<>(1);
    partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
    PartitionInfo info = metadata.fetch().partition(topicPartition);
    if (info == null) {
        metadata.add(topicPartition.topic());
        log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", topicPartition);
        return RequestFuture.staleMetadata();
    } else if (info.leader() == null) {
        log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", topicPartition);
        return RequestFuture.leaderNotAvailable();
    } else {
        //获取分片的leader,leader接收请求并获取相应数据返回给客户端
        Node node = info.leader();
        ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
        return client.send(node, ApiKeys.LIST_OFFSETS, request)
                .compose(new RequestFutureAdapter<ClientResponse, Long>() {
                    @Override
                    public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
                        handleListOffsetResponse(topicPartition, response, future);
                    }
                });
    }
}

下面来看具体的send方法:

/**
 *Send a new request. Note that the request is not actually transmitted on the
 *network until one of the {@link #poll(long)} variants is invoked. At this
 *point the request will either be transmitted successfully or will fail.
 *Use the returned future to obtain the result of the send. Note that there is no
 *need to check for disconnects explicitly on the {@link ClientResponse} object;
 *instead, the future will be failed with a {@link DisconnectException}.
 *@param node The destination of the request
 *@param api The Kafka API call
 *@param request The request payload 
 *@return A future which indicates the result of the send.
 */
 /**
 *注释说明了这里并不会直接发送请求,而是等下一次poll的时候才处理那些在list中的为发送请求
 */
public RequestFuture<ClientResponse> send(Node node,
                                          ApiKeys api,
                                          AbstractRequest request) {
    return send(node, api, ProtoUtils.latestVersion(api.id), request);
}

private RequestFuture<ClientResponse> send(Node node,
                                          ApiKeys api,
                                          short version,
                                          AbstractRequest request) {
    long now = time.milliseconds();
    //创建异步处理完成handler
    RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
    RequestHeader header = client.nextRequestHeader(api, version);
    RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
    //将请求放到unsent列表中
    put(node, new ClientRequest(now, true, send, completionHandler));

    // wakeup the client in case it is blocking in poll so that we can send the queued request
    //唤醒因其他线程阻塞的io操作,表明现在可以退出来处理新的请求了
    client.wakeup();
    return completionHandler.future;
}

再来看下方法链里的compose方法:

/**
 *Convert from a request future of one type to another type
 *@param adapter The adapter which does the conversion
 *@param <S> The type of the future adapted to
 *@return The new future
 */
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
    //新建一个RequestFuture,只存储结果
    final RequestFuture<S> adapted = new RequestFuture<>();
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
            adapter.onSuccess(value, adapted);
        }

        @Override
        public void onFailure(RuntimeException e) {
            adapter.onFailure(e, adapted);
        }
    });
    //返回新创建的future
    return adapted;
}

该方法的主要作用是将一种类型转换为另一种类型,这里是将ClientResponse转化为Long。 真正转化逻辑由适配器的onsuccess方法处理,如下:

private void handleListOffsetResponse(TopicPartition topicPartition,
                                      ClientResponse clientResponse,
                                      RequestFuture<Long> future) {
    ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
    short errorCode = lor.responseData().get(topicPartition).errorCode;
    if (errorCode == Errors.NONE.code()) {
        List<Long> offsets = lor.responseData().get(topicPartition).offsets;
        if (offsets.size() != 1)
            throw new IllegalStateException("This should not happen.");
        long offset = offsets.get(0);
        log.debug("Fetched offset {} for partition {}", offset, topicPartition);
        //由compose方法返回的RequestFuture处理结果
        future.complete(offset);
    } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
            || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
        log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                topicPartition);
        future.raise(Errors.forCode(errorCode));
    } else {
        log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
                topicPartition, Errors.forCode(errorCode).message());
        future.raise(new StaleMetadataException());
    }
}

上面已经把请求初始化好,接下来会真正的通过socket发送请求并接收服务端返回的数据:

/**
 *Block indefinitely until the given request future has finished.
 *@param future The request future to await.
 *@throws WakeupException if {@link #wakeup()} is called from another thread
 */
public void poll(RequestFuture<?> future) {、
    //如果没完成则循环直到获取到数据
    while (!future.isDone())
        poll(Long.MAX_VALUE, time.milliseconds(), future);
}

/**
 *Block until the provided request future request has finished or the timeout has expired.
 *@param future The request future to wait for
 *@param timeout The maximum duration (in ms) to wait for the request
 *@return true if the future is done, false otherwise
 *@throws WakeupException if {@link #wakeup()} is called from another thread
 */
public boolean poll(RequestFuture<?> future, long timeout) {
    long begin = time.milliseconds();
    long remaining = timeout;
    long now = begin;
    do {
        poll(remaining, now, future);
        now = time.milliseconds();
        long elapsed = now - begin;
        remaining = timeout - elapsed;
    } while (!future.isDone() && remaining > 0);
    return future.isDone();
}

public void poll(long timeout, long now, PollCondition pollCondition) {
    // there may be handlers which need to be invoked if we woke up the previous call to poll
    //在发送之前处理已经完成的请求
    firePendingCompletedRequests();

    synchronized (this) {
        // send all the requests we can send now
        trySend(now);

        // check whether the poll is still needed by the caller. Note that if the expected completion
        // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
        // handler), the client will be woken up.
        if (pollCondition == null || pollCondition.shouldBlock()) {
            client.poll(timeout, now);
            now = time.milliseconds();
        } else {
            client.poll(0, now);
        }

        // handle any disconnects by failing the active requests. note that disconnects must
        // be checked immediately following poll since any subsequent call to client.ready()
        // will reset the disconnect status
        checkDisconnects(now);

        // trigger wakeups after checking for disconnects so that the callbacks will be ready
        // to be fired on the next call to poll()
        maybeTriggerWakeup();

        // try again to send requests since buffer space may have been
        // cleared or a connect finished in the poll
        trySend(now);

        // fail requests that couldn't be sent if they have expired
        failExpiredRequests(now);
    }

    // called without the lock to avoid deadlock potential if handlers need to acquire locks
    firePendingCompletedRequests();
}

private void firePendingCompletedRequests() {
    boolean completedRequestsFired = false;
    for (;;) {
        //处理handler
        RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
        if (completionHandler == null)
            break;

        completionHandler.fireCompletion();
        completedRequestsFired = true;
    }

    // wakeup the client in case it is blocking in poll for this future's completion
    if (completedRequestsFired)
        client.wakeup();
}

trySend(now)分支由以下方法完成,该方法主要是把列表里待发送的请求,对于同一个node,如果该node不包含处理完成的请求则不再向该node发送新的请求:

private boolean trySend(long now) {
    // send any requests that can be sent now
    boolean requestsSent = false;
    //循环请求列表并发送
    for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
        Node node = requestEntry.getKey();
        Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
        while (iterator.hasNext()) {
            ClientRequest request = iterator.next();
            if (client.ready(node, now)) {
                client.send(request, now);
                iterator.remove();
                requestsSent = true;
            }
        }
    }
    return requestsSent;
}

public void send(ClientRequest request, long now) {
    String nodeId = request.request().destination();
    if (!canSendRequest(nodeId))
        throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    doSend(request, now);
}

private void doSend(ClientRequest request, long now) {
    request.setSendTimeMs(now);
    //正在发送的请求放入queue中
    this.inFlightRequests.add(request);
    selector.send(request.request());
}

public void send(Send send) {
    KafkaChannel channel = channelOrFail(send.destination());
    try {
        channel.setSend(send);
    } catch (CancelledKeyException e) {
        this.failedSends.add(send.destination());
        close(channel);
    }
}

public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

在把请求准备好之后开始真正发送和接收数据:

public List<ClientResponse> poll(long timeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        //处理发送和接收
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleTimedOutRequests(responses, updatedNow);

    // invoke callbacks
    for (ClientResponse response : responses) {
        if (response.request().hasCallback()) {
            try {
                response.request().callback().onComplete(response);
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

    return responses;
}

public void poll(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("timeout should be >= 0");

    //清除之前的io操作留下的缓存数据
    clear();

    if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
        timeout = 0;

    /* check ready keys */
    long startSelect = time.nanoseconds();
    int readyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
        //开始发送接收操作
        pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    }

    addToCompletedReceives();

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

    // we use the time at the end of select to ensure that we don't close any connections that
    // have just been processed in pollSelectionKeys
    maybeCloseOldestConnection(endSelect);
}

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                               boolean isImmediatelyConnected,
                               long currentTimeNanos) {
    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        KafkaChannel channel = channel(key);

        // register all per-connection metrics at once
        sensors.maybeRegisterConnectionMetrics(channel.id());
        if (idleExpiryManager != null)
            idleExpiryManager.update(channel.id(), currentTimeNanos);

        try {

            /* complete any connections that have finished their handshake (either normally or immediately) */
            if (isImmediatelyConnected || key.isConnectable()) {
                //如果是连接请求则等待其完成连接
                if (channel.finishConnect()) {
                    this.connected.add(channel.id());
                    this.sensors.connectionCreated.record();
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                            socketChannel.socket().getReceiveBufferSize(),
                            socketChannel.socket().getSendBufferSize(),
                            socketChannel.socket().getSoTimeout(),
                            channel.id());
                } else
                    continue;
            }

            /* if channel is not ready finish prepare */
            if (channel.isConnected() && !channel.ready())
                channel.prepare();

            //这里读取和发送操作是放在一个方法体中,由于前面的while循环可以保证在发送操作完成之后可以立即不断的进行读取操作
            /* if channel is ready read from any connections that have readable data */
            if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                //读取操作
                NetworkReceive networkReceive;
                while ((networkReceive = channel.read()) != null)
                    //将接收到的数据放入queue等待处理
                    addToStagedReceives(channel, networkReceive);
            }

            /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
            if (channel.ready() && key.isWritable()) {
                //写操作,把上面准备好的请求发送给channel
                Send send = channel.write();
                if (send != null) {
                    //缓存发送完成的请求
                    this.completedSends.add(send);
                    this.sensors.recordBytesSent(channel.id(), send.size());
                }
            }

            /* cancel any defunct sockets */
            if (!key.isValid()) {
                //出现异常关闭channel
                close(channel);
                //缓存连接关闭的channel
                this.disconnected.add(channel.id());
            }

        } catch (Exception e) {
            String desc = channel.socketDescription();
            if (e instanceof IOException)
                log.debug("Connection with {} disconnected", desc, e);
            else
                log.warn("Unexpected error from {}; closing connection", desc, e);
            close(channel);
            this.disconnected.add(channel.id());
        }
    }
}

private void addToCompletedReceives() {
    //循环上面读取操作放入queue的数据,将其缓存进completedReceives
    if (!this.stagedReceives.isEmpty()) {
        Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
            KafkaChannel channel = entry.getKey();
            if (!channel.isMute()) {
                Deque<NetworkReceive> deque = entry.getValue();
                NetworkReceive networkReceive = deque.poll();
                this.completedReceives.add(networkReceive);
                this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
                if (deque.isEmpty())
                    iter.remove();
            }
        }
    }
}

在发送和接收操作完成之后,接下来就要处理接收到的数据了:

1.首先把已经发送成功的请求从队列中去掉,避免重发:

private void handleCompletedSends(List<ClientResponse> responses, long now) {
    // if no response is expected then when the send is completed, return it
    for (Send send : this.selector.completedSends()) {
        ClientRequest request = this.inFlightRequests.lastSent(send.destination());
        if (!request.expectResponse()) {
            this.inFlightRequests.completeLastSent(send.destination());
            responses.add(new ClientResponse(request, now, false, null));
        }
    }
}

2.处理已经接收到的数据:

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        //获取该次返回数据对应的那次请求。上面的trySend方法保证了同一个source下不会有超过2个未完成的请求,所以这里不用担心获取的请求不是该次返回对应的那次请求
        ClientRequest req = inFlightRequests.completeNext(source);
        Struct body = parseResponse(receive.payload(), req.request().header());
        if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
            responses.add(new ClientResponse(req, now, false, body));
    }
}

3.处理那些断开的连接:

private void handleDisconnections(List<ClientResponse> responses, long now) {
    for (String node : this.selector.disconnected()) {
        log.debug("Node {} disconnected.", node);
        processDisconnection(responses, node, now);
    }
    // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
    if (this.selector.disconnected().size() > 0)
        metadataUpdater.requestUpdate();
}

4.处理新创建的连接:

private void handleConnections() {
    for (String node : this.selector.connected()) {
        log.debug("Completed connection to node {}", node);
        this.connectionStates.connected(node);
    }
}

5.处理超时的请求:

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
    List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
    for (String nodeId : nodeIds) {
        // close connection to the node
        this.selector.close(nodeId);
        log.debug("Disconnecting from node {} due to request timeout.", nodeId);
        processDisconnection(responses, nodeId, now);
    }

    // we disconnected, so we should probably refresh our metadata
    if (nodeIds.size() > 0)
        metadataUpdater.requestUpdate();
}

最后对所有请求进行回调处理,这里就要开始体现异步了,通过request的callback方法体,在最开始组装请求实体的时候,初始化了一个RequestFutureCompletionHandler作为该请求的callback:

for (ClientResponse response : responses) {
        if (response.request().hasCallback()) {
            try {
                //调用回调方法的onComplete方法
                response.request().callback().onComplete(response);
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

看下RequestFutureCompletionHandler的定义:

public class RequestFutureCompletionHandler implements RequestCompletionHandler {
    private final RequestFuture<ClientResponse> future;
    private ClientResponse response;
    private RuntimeException e;

    public RequestFutureCompletionHandler() {
        this.future = new RequestFuture<>();
    }

    public void fireCompletion() {
        if (e != null) {
            future.raise(e);
        } else if (response.wasDisconnected()) {
            ClientRequest request = response.request();
            RequestSend send = request.request();
            ApiKeys api = ApiKeys.forId(send.header().apiKey());
            int correlation = send.header().correlationId();
            log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                    api, request, correlation, send.destination());
            future.raise(DisconnectException.INSTANCE);
        } else {
            //调用RequestFuture.complete方法
            future.complete(response);
        }
    }

    public void onFailure(RuntimeException e) {
        this.e = e;
        pendingCompletion.add(this);
    }

    @Override
    public void onComplete(ClientResponse response) {
        //将返回实体赋给该回调类的变量
        this.response = response;
        //将该回调类放入queue中等待在firePendingCompletedRequests统一处理,不在这里处理是为了避免发生死锁
        pendingCompletion.add(this);
    }
}

private void firePendingCompletedRequests() {
    boolean completedRequestsFired = false;
    for (;;) {
        RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
        if (completionHandler == null)
            break;
        //调用RequestFutureCompletionHandler的fireCompletion()方法
        completionHandler.fireCompletion();
        completedRequestsFired = true;
    }

    // wakeup the client in case it is blocking in poll for this future's completion
    if (completedRequestsFired)
        client.wakeup();
}

接下来看看RequestFuture的几个关键方法:

/**
 *Complete the request successfully. After this call, {@link #succeeded()} will return true
 *and the value can be obtained through {@link #value()}.
 *@param value corresponding value (or null if there is none)
 *@throws IllegalStateException if the future has already been completed
 *@throws IllegalArgumentException if the argument is an instance of {@link RuntimeException}
 */
public void complete(T value) {
    if (value instanceof RuntimeException)
        throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
    //将value值赋给result
    if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
        throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
    fireSuccess();
}

private void fireSuccess() {
    T value = value();
    while (true) {
        //调用listener的onSuccess方法
        RequestFutureListener<T> listener = listeners.poll();
        if (listener == null)
            break;
        listener.onSuccess(value);
    }
}

这里可能会疑问:哪来的listener呢?就是在最开始compose方法初始化的:

addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
            //这里会调用适配器的onSuccess方法
            adapter.onSuccess(value, adapted);
        }

        @Override
        public void onFailure(RuntimeException e) {
            adapter.onFailure(e, adapted);
        }
    });

new RequestFutureAdapter<ClientResponse, Long>() {
                    @Override
                    public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
                        //处理最终结果,将response实体中解析出offset赋给future的result
                        handleListOffsetResponse(topicPartition, response, future);
                    }
                }

private void handleListOffsetResponse(TopicPartition topicPartition,
                                      ClientResponse clientResponse,
                                      RequestFuture<Long> future) {
    ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
    short errorCode = lor.responseData().get(topicPartition).errorCode;
    if (errorCode == Errors.NONE.code()) {
        List<Long> offsets = lor.responseData().get(topicPartition).offsets;
        if (offsets.size() != 1)
            throw new IllegalStateException("This should not happen.");
        long offset = offsets.get(0);
        log.debug("Fetched offset {} for partition {}", offset, topicPartition);
        //这里又来调用complete,这里不会再去调用什么listener的onSuccess了,因为没有为这个future设置listener。只是简单的把offset赋给future的result
        future.complete(offset);
    } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
            || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
        log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                topicPartition);
        future.raise(Errors.forCode(errorCode));
    } else {
        log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
                topicPartition, Errors.forCode(errorCode).message());
        future.raise(new StaleMetadataException());
    }
}

至此,整个过程已经基本上完成了,最后通过future.value获取为future的result赋的值。

其中比较绕的地方是RequestFuture中的各种回调操作,多看几遍基本上就能掌握。



Previous     Next
uohzoaix /
Published under (CC) BY-NC-SA in categories kafka  tagged with