kafka在启动之初会初始化KafkaServer类,当集群中有多个broker时,会实例化多个KafkaServer,该类描述了一个broker的生命周期,除此之外,还会初始化各类Manager,比如与log(数据)相关的:LogManager,OffsetManager,ReplicaManager,这三个Manager都是与kafka的数据打交道的,LogManager主要管理真实的数据(MessageSet)及数据索引(Index),OffsetManager主要管理生产者和消费者生产或消费数据的偏移量以决定下次生产或消费的位置,ReplicaManager主要管理数据在集群当中的复制行为,一份数据可以在集群中有多个备份。除了Manager之外还有几个比较重要的概念:KafkaController(特别重要,处理集群的资源调度),SocketServer(用于处理网络请求,包括生产数据及消费数据等请求)。本文主要讲解SocketServer,KafkaController下篇文章再来讲解。
同理,SocketServer也是每个broker会有一个实例,每个SocketServer实例包含一个Acceptor线程(接收新连接),N个Processor线程(每个Processor线程包含一个Selector用来接收socket请求),M个Handler线程(处理Processor的请求并将处理结果发送给Processor)。
Acceptor线程在创建之后就开始监听OP_ACCEPT事件,一旦接收到ACCEPT请求则将该请求交给一个Processor线程处理,选取Processor的方式是以RoundRobin的方式进行选取,Processor线程处理ACCEPT请求很简单,只是将该请求对应的SocketChannel放入LinkedQueue(每个Processor线程会单独拥有一个这样的LinkedQueue)中并唤醒该Processor线程对应的Selector以便可以接受从SocketChannel发来的请求。
在Processor线程处理完Acceptor线程的ACCEPT事件之后,Processor线程会立即将SocketChannel注册为接受OP_READ事件,并且其对应的Selector会一直监听该事件的发生。一旦接听到READ事件,该线程会将该事件封装成一个Request对象,并放入RequestChannel的requestQueue中,注意这里并不及时处理该Request,KafkaRequestHandler线程会一直阻塞从RequestChannel取出Request并交由KafkaApis(以后会讲)处理,完成了这个动作之后该事件对应的SelectionKey将不再接受OP_READ事件。当KafkaApis处理完这个请求后会将处理结果放入RequestChannel中responseQueues对应的processor id(在封装成Request对象时传递过去的)的BlockingQueue中。
KafkaApis处理完Request后,Processor线程会从responseQueues中取出Response,Response的处理结果有三种形式:NoOpAction,SendAction,CloseConnectionAction,当为NoOpAction时,会把相应的SelectionKey(封装成Request对象时传递的SelectionKey)重新注册为OP_READ事件以便重新接收新的请求,当为SendAction时就说明该Response需要发送给客户端,这时将对应的SelectionKey注册为OP_WRITE事件并将SelectionKey的attach设为该Response,当为CloseConnectionAction则关闭该SelectionKey对应的SocketChannel。
当Processor线程监听到write事件时,即上面的SendAction,则将该SelectionKey对应的attach发送到SelectionKey对应的SocketChannel,当发送完成时将SelectionKey重新注册为OP_READ事件,否则还是注册为OP_WRITE事件以便下次继续发送。