在上一篇文章中我们提到,KafkaProducer会将每次发送的消息,追加到一个RecordAccumulator中,由RecordAccumulator负责消息的实际发送。
本篇文章将对RecordAccumulator进行原理分析。
过长流程的代码,会逐段进行分析。
消息追加
在KafkaProducer#doSend()方法中,我们通过调用RecordAccumulator#append()方法,将消息追加到RecordAccumulator中:
// 累加器的计数器操作
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
// 默认是一个空header
if (headers == null) headers = Record.EMPTY_HEADERS;
首先是全局计数器和当前消息头header的默认设置。
// 校验我们是已有一个在运行中的batch deque
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 进行同步操作
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 尝试写入到batch中
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
接下来,我们会获取当前partition的存储消息的双向队列。
如果成功获取存储当前partition消息的存储队列,则尝试将消息追加到此队列中,并返回追加结果。
getOrCreateDeque()
既然需要获取partition在缓冲区存储的消息队列,那么必然存在创建的情况,我们继续看getOrCreateDeque()方法的具体实现:
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
// 从缓存中获取
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
// 没有指定的分区的batch deque,创建一个新的
d = new ArrayDeque<>();
// 避免出现并发的情况,获取覆盖的值,如果其他线程已经创建了新deque,则使用此deque,否则使用当前线程创建的deque
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
- 先从缓存中获取指定partition的消息队列,如果指定的消息队列存在,则直接返回此消息队列。
- 如果不存在指定的消息队列,则需要为指定的partition创建一个新的消息队列。
- 消息队列的类型是ArrayDeque。
- 此时其他生产者线程可能也会往给定的partition发送消息,可能已经创建了一个消息队列,此时做一下兼容:
- 如果已经有线程创建了消息队列,则返回已创建的消息队列。
- 否则,使用当前线程创建的消息队列。
选择batch追加消息
tryAppend()会尝试将给定的消息写入到当前partition的消息队列中:
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
// 获取最后一个ProducerBatch,最后一个是要写入的batch
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
// 如果返回null,证明最后一个deque已经写不下了
if (future == null)
// 关闭当前batch集合的写入
last.closeForRecordAppends();
else
// 组装追加结果
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
此时我们发现,其实队列中存放的,并不是每一条消息实体,而是一个batch。
batch相当于批记录,存储了一定数量的record,一个批记录满了,则不会继续追加新record。
- 首先,我们获取最后一个batch。
- 如果当前队列没有batch,则本次无法追加消息,直接返回。
- 如果存在batch,则尝试向此batch中追加消息。
- 如果追加结果返回null,则证明最后一个batch已经满了,无法写入了,返回因为batch已满无法写入的追加结果。
向batch中追加消息
ProducerBatch#tryAppend()用于向给定batch中追加消息:
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 校验当前batch还能继续写入
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
// 不能继续写入,返回null
return null;
} else {
// 向MemoryRecords中追加内容
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
// 计算当前batch最大的record的大小
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
// 更新追加时间
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
// 我们必须确保每个返回给开发者的future,以防batch出现需要分隔成为几个新的batch,然后再重新发送
thunks.add(new Thunk(callback, future));
// 操作record计数器
this.recordCount++;
return future;
}
}
- 首先,校验当前batch是否仍有写入空间,如果不能继续写入,则返回null。
- ProducerBatch使用MemoryRecordsBuilder写入实际的消息。
- 计算当前batch中最大的record的大小。
- 更新最后一次追加消息的时间戳。
- 构建消息的发送的异步任务,并添加到异步任务列表中,等待回调执行,此时主要避免因为一个batch过大,导致需要拆分为几个小batch重新发送带来的问题。
- 递增当前batch的record计数器。
向MemoryRecords中追加消息
调用MemoryRecordsBuilder#append()方法,会向MemoryRecords中追加消息:
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
...
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
return null;
} else {
return appendLegacyRecord(offset, timestamp, key, value, magic);
}
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
由于Kafka存在KafkaProducer客户端版本不同的问题,此时会分版本进行处理,暂且可以认为:
MAGIC_VALUE_V2:0.10.2左右的版本。
MAGIC_VALUE_V1:0.9.0左右的版本。
MAGIC_VALUE_V0:更老的版本。
此时对于的是MAGIC_VALUE_V2和MAGIC_VALUE_V0、MAGIC_VALUE_V1做了不同的写入处理。
消息版本V2的写入处理
MemoryRecordsBuilder#appendDefaultRecord()用于追加V2版本的消息:
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
// 确认数据输出流处于开启状态
ensureOpenForRecordAppend();
// 计算相对偏移量
int offsetDelta = (int) (offset - baseOffset);
// 计算时间偏移量
long timestampDelta = timestamp - firstTimestamp;
// 获取写入的字节数量
// 注意:写入的时候,写入的是基于baseOffset的相对偏移量,时间戳也是相对时间戳
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
// 计数,标识位更新(上一次写入的绝对偏移量,时间戳)
recordWritten(offset, timestamp, sizeInBytes);
}
- 首先,确认数据输出流DataOutputStream是否关闭。
- 接着,计算追加的消息,相对于当前batch的第一条消息的相对偏移量和偏移时间。
- 向数据流中写入消息,注意,此时使用的都是相对的偏移量和时间戳。
- 递增当前MemoryRecords中record数量,记录当前最近一次写入的偏移量和时间戳。
我们继续来看如何写入MemoryRecords中的,DefaultRecord#writeTo():
public static int writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
// 获取key,value的字节大小,使用Protobuf压缩后的
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
// 使用varint写入消息key,value
ByteUtils.writeVarint(sizeInBytes, out);
// 现在record attributes还没有使用
byte attributes = 0;
out.write(attributes);
// 相对时间戳,varint类型
ByteUtils.writeVarlong(timestampDelta, out);
// 相对偏移量,varint类型
ByteUtils.writeVarint(offsetDelta, out);
if (key == null) {
// 如果没有key,则写入-1作为占位,也是varint类型
ByteUtils.writeVarint(-1, out);
} else {
// 使用varint写入key的大小,再正常的写入key
int keySize = key.remaining();
ByteUtils.writeVarint(keySize, out);
Utils.writeTo(out, key, keySize);
}
// 写入value
if (value == null) {
ByteUtils.writeVarint(-1, out);
} else {
int valueSize = value.remaining();
ByteUtils.writeVarint(valueSize, out);
Utils.writeTo(out, value, valueSize);
}
if (headers == null)
throw new IllegalArgumentException("Headers cannot be null");
// 写入header的长度
ByteUtils.writeVarint(headers.length, out);
// 写入header信息
for (Header header : headers) {
String headerKey = header.key();
if (headerKey == null)
throw new IllegalArgumentException("Invalid null header key found in headers");
byte[] utf8Bytes = Utils.utf8(headerKey);
ByteUtils.writeVarint(utf8Bytes.length, out);
out.write(utf8Bytes);
byte[] headerValue = header.value();
if (headerValue == null) {
ByteUtils.writeVarint(-1, out);
} else {
ByteUtils.writeVarint(headerValue.length, out);
out.write(headerValue);
}
}
// record的总大小(已经算上了varint)+ sizeInBytes的varint的大小
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
代码很长,有点占篇幅,其实主要将消息大小,消息属性,相对时间戳,相对偏移量,key的大小,key值,value大小,value值,headers依次使用Protobuf写入DataOutputStream中,最后返回消息大小和消息内容压缩后的大小。
MemoryRecordsBuilder中的DataOutputStream
DataOutputStream用于装饰其他输出流,是底层FilterOutputStream的高级封装,可以直接写入Java的基本类型。
MemoryRecordsBuilder的创建的分析,将在下文给出。
更老的消息写入处理
更老的消息写入并非采用了其他序列化手段,而是直接将消息转化为ByteBuffer后,写入数据流中,MemoryRecordsBuilder#appendLegacyRecord():
private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, byte magic) throws IOException {
ensureOpenForRecordAppend();
if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
timestamp = logAppendTime;
int size = LegacyRecord.recordSize(magic, key, value);
AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);
if (timestampType == TimestampType.LOG_APPEND_TIME)
timestamp = logAppendTime;
long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
return crc;
}
- 首先,也需要确认数据流是否关闭,此数据和V2版本使用的数据流是相同的。
- 接着,计算消息版本类型+key大小+value大小的消息总大小。
- 然后,向消息的头部写入相对偏移量和消息大小。
- 如果用于指定了消息追加的类型是LOG_APPEND_TIME类型,则添加时间戳。
- 然后,一次向数据流中写入消息版本类型,消息追加时间,key值,value值,压缩类型和消息追加时间戳类型。
- 递增当前MemoryRecords中record数量,记录当前最近一次写入的偏移量和时间戳。
消息追加成功的结果
在消息追加成功后,会返回一个FutureRecordMetadata对象,此对象实现了Future<RecordMetadata>类型的接口:
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
主要包含如下内容:
- 当前batch的生产消息的请求结果。
- 当前消息的相对偏移量。
- 创建record的时间戳。
- 校验码。
- 序列化后的key的大小。
- 序列化后的value的大小。
- 时间戳。
- 由于可能存在batch过大,被分为多个batch的情况,需要一个分离batch后的record数据的连接点。
这些异步任务会放在当前batch的缓存中,在batch发送后,并获取Kafka服务端的响应后,由专门的线程负责执行。
消息追加失败的结果
消息追加后,如果返回的异步任务是null,证明消息没有追加成功,此时证明当前batch已满,无法继续追加record,需要对当前batch进行关闭:
public void closeForRecordAppends() {
recordsBuilder.closeForRecordAppends();
}
public void closeForRecordAppends() {
if (appendStream != CLOSED_STREAM) {
try {
appendStream.close();
} catch (IOException e) {
throw new KafkaException(e);
} finally {
appendStream = CLOSED_STREAM;
}
}
}
关闭当前batch即为直接将batch的数据流进行关闭。
消息追加失败,并非到此就结束了,KafkaProducer还会继续尝试创建新的batch,用于追加消息。
创建新的batch
计算新batch的大小
我们继续看RecordAccumulator#append()方法的接下来的部分:
// 获取producer最大的batch大小
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 估算消息的字节数大小
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 申请一块用于追加当前record的对外内存
buffer = free.allocate(size, maxTimeToBlock);
由于之前存在的batch已经不允许追加消息了,那么接下来我们需要创建一个新的,在创建一个新的batch时,首先要做的就是计算新batch的大小。
这里batchSize是我们设定的每个batch的大小,同时我们也要预估当前需要追加的消息大小,取二者之间的一个最大值。
接下来,用这个最大值申请一块堆外内存。
创建新batch
synchronized (dq) {
// 获取batch队列锁之后,需要重新校验producer的状态
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 再度尝试追加record
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// 此时可能是Sender线程将batch发送,多出一个空间
return appendResult;
}
// 此时仍没有可追加的空间
// 则使用给定内存创建一个新的batch,并向新batch中追加record
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, time.milliseconds()));
// 将batch添加到当前partition的batch队列中
dq.addLast(batch);
// 此batch为新batch,必定为未发送的batch
incomplete.add(batch);
// 由于在锁中,batch还在使用这块对外内存,所以此时不要进行对外内存的接触分配
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
- 由于向当前partition的消息队列中放入新的batch,所以创建新batch的过程将使用同步操作。
- 由于此时可能发送线程将当前partition的消息队列发送到Kafka服务端,所以再度尝试追加一次消息。
- 如果追加成功,直接返回消息追加的异步任务。
- 如果仍然没有当前消息的内存空间,则开始着手创建一个新的batch。
- 我们先使用当前消息的版本号,创建一个MemoryRecordsBuilder。
- 接着使用创建好的MemoryRecordsBuilder创建ProducerBatch。
- 然后尝试向新创建的batch中追加消息。
- 最后将创建好的batch放入到当前partition的消息队列中。
- 由于此batch是新创建的,必定为未发送的batch,则添加到未收到发送响应的缓存中。
消息版本号一般由当前运行的Kafka版本决定。
构建MemoryRecordsBuilder
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
// Kafka事务版本规则校验
...
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
}
构建MemoryRecordsBuilder时,使用的是我们刚刚申请的一块堆外内存,消息版本号,压缩类型,创建时间戳类型,和基础偏移量。
构建ProducerBatch
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
this.createdMs = createdMs;
this.lastAttemptMs = createdMs;
this.recordsBuilder = recordsBuilder;
this.topicPartition = tp;
this.lastAppendTime = createdMs;
this.produceFuture = new ProduceRequestResult(topicPartition);
this.retry = false;
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
构建ProducerBatch时,使用的是当前topic-partition对象,我们构建好的MemoryRecordsBuilder,以及当前时间戳。
- 初始化ProducerBatch的创建时间,最近一次尝试的时间和最近一次追加的时间均为当前时间戳。
- 根据当前topic-partition信息,构建生产请求结果,生产请求结果用于组装每次追加消息返回的异步任务。
- 由于内存空间不足而创建的新batch,必然不是因为batch过大而分离出的batch。
- 计算并设置当前topic和压缩类型的压缩率。
新batch的消息追加也是调用ProducerBatch#append()方法,也就是和第一次追加时调用的方法的相同,在此不再赘述。
通过上面的原理分析,我们了解到,我们发送的消息经由RecordAccumulator,存储在消息所需要发送的partition的专属消息队列中,消息队列的最小集合单位是batch,使用的是堆外内存进行存储,返回给我们的是一个包含消息存储信息和发送请求结果的异步任务。
消息发送
前半部分对消息的写入缓冲区进行了原理分析,接下来将对消息的发送流程进行分析。
构建发送线程Sender
我们在构建KafkaProducer时,会实例化一个后台线程Sender:
KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
Time time) {
...
this.sender = newSender(logContext, kafkaClient, this.metadata);
}
继续看KafkaProducer#newSender()方法:
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
// 配置的发送请求超时时间
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
// 构建
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
// 生产者的计数器
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
// 如果已有和Kafka服务端建立连接的客户端,则使用此客户端,否则新创建一个客户端
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
metadata,
clientId,
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
// 是否需要进行重试
int retries = configureRetries(producerConfig, transactionManager != null, log);
// 发送请求需要的Kafka服务端副本的响应数量
short acks = configureAcks(producerConfig, transactionManager != null, log);
// 构建发送Sender线程
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
}
这里包含了很多我们了解Kafka时的配置:
- KafkaProducer每次发送时的超时时间。
- 与Kafka服务端进行交互的KafkaClient对象。
- 如果发送失败,是否需要进行重试。
- 生产消息,Kafka服务端副本的响应数量。
- 构建发送线程。
继续来看Sender的构造方法:
public Sender(LogContext logContext,
KafkaClient client,
ProducerMetadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
SenderMetricsRegistry metricsRegistry,
Time time,
int requestTimeoutMs,
long retryBackoffMs,
TransactionManager transactionManager,
ApiVersions apiVersions) {
this.log = logContext.logger(Sender.class);
// 同Kafka服务端进行交互的客户端
this.client = client;
// 关联的RecordAccumulator
this.accumulator = accumulator;
// KafkaProducer的元数据
this.metadata = metadata;
// 是否需要保证消息的顺序性,与max.in.flight.requests.per.connection参数设置有关
this.guaranteeMessageOrder = guaranteeMessageOrder;
// 请求允许的最大字节数
this.maxRequestSize = maxRequestSize;
// 线程的运行状态
this.running = true;
// 集群副本的必须响应数量
this.acks = acks;
// 消息发送失败的重试次数
this.retries = retries;
// 创建时间戳
this.time = time;
// 发送线程计数器
this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time);
// 发送请求的等待时间
this.requestTimeoutMs = requestTimeoutMs;
// 发送失败后的等待时间
this.retryBackoffMs = retryBackoffMs;
// KafkaProducer的版本
this.apiVersions = apiVersions;
// 如果开启了,则需要传递事务管理器
this.transactionManager = transactionManager;
// 初始化Kafka集群未响应的batch集合
this.inFlightBatches = new HashMap<>();
}
发送线程主要包含了以下参数:
- 请求相关:
- 单次请求设定的最大请求字节数。
- 发送请求的等待时间。
- 发送请求失败后下一次发送请求的等待时间。
- 消息相关:
- 用于存储消息的RecordAccumulator消息累加器。
- 是否需要保证消息的发送顺序。
- 消息发送到Kafka集群后,需要落地的副本数量。
- 消息发送失败的重试次数。
- Sender线程相关的:
- 与Kafka集群进行交互的KafkaClient。
- KafkaProducer的元数据。
- 线程运行状态更新为正在运行中。
- 发送线程专属的计数器。
- 初始化用于存储未收到Kafka集群响应的batch集合。
有关参数设定给大家列一下,方便理解:
参数 | 定义 | 默认值 |
---|---|---|
retries | 消息发送失败的重试次数 | 0 |
max.in.flight.requests.per.connection | 批量发送的batch个数 | 5 |
启动发送线程Sender
在KafkaProducer的构造函数中实例化发送线程Sender后,Kafka会使用KafkaThread将Sender线程包装起来,并启动线程:
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
- 线程名称命名为:”kafka-producer-network-thread” + cliendId。
- 使用KafkaThread包装创建好的Sender线程。
KafkaThread是对Thread类的简单继承,目的在于设定线程的Daemon属性,和对UncaughtExceptionHandler进行处理
clientId如果没有在配置中设定,则会在当前KafkaProducer进程中递增。
Sender线程的运行
调用KafkaThread#start()之后,会执行Thread实现的run()方法,Sender#run():
执行任务
Sender线程正常情况下会执行任务:
// 主要循环,循环到调用#close()方法
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
我们初始化Sender时,就已经设定running=true,也就在线程无中断,running状态不发生变化,主线程未退出等情况下,会一直不断的执行发送任务。
Sender#runOnce()方法用于执行一次发送任务,同时需要对启动事务的发送请求做一些特殊处理。
事务发送前的特殊处理
发送前,主要针对事务做了以下的处理:
if (transactionManager != null) {
try {
// 在需要的情况下,重置producerId
transactionManager.resetProducerIdIfNeeded();
// 如果事务管理器没有处于事务状态
if (!transactionManager.isTransactional()) {
// 此时是一个幂等的producer,确保要有producerId
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
// 如果当前事务管理器中还有未发送完的sequence,并且事务管理器的状态不是FETAL_ERROR,则将事务管理器状态更新值FATAL_ERROR
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (maybeSendAndPollTransactionalRequest()) {
// 需要发送和拉取事务请求,则直接返回,等待下一次请求发送
return;
}
// 在事务管理器状态为FATAL_ERROR,或者在开启幂等功能条件下没有producer id,则无法继续发送
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
// 获取事务管理器最后一次发生的错误
if (lastError != null)
// 如果累加器还有未发送的batch,则中断这些batch的发送
maybeAbortBatches(lastError);
// 拉取Kafka集群返回的响应,并结束本次发送请求
client.poll(retryBackoffMs, time.milliseconds());
return;
} else if (transactionManager.hasAbortableError()) {
// 如果当前事务管理器有中断错误
// 中断RecordAccumulator中还没有发送出去的batch
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// 已经以error的方式记录,但是传播到这里来进行一些清理
log.trace("Authentication exception while processing transactional request: {}", e);
// 设置事务管理器校验失败及异常信息
transactionManager.authenticationFailed(e);
}
}
- 在需要重置producerId的情况下,重置producerId。
- 如果事务管理器存在,但是没有处于事务状态,证明开启了幂等功能,幂等功能需要producerId。
- 如果当前事务管理器中还有未发送完的sequence序号,并且事务管理器的状态不是FATAL_ERROR,此时事务管理器处于异常状态,需要记录异常,并将事务管理器的状态切换至FATAL_ERROR。
- 如果事务管理器有正在准备发送的请求,则本次发送请求不做任何事情,直接返回。
接下是针对事务管理器处于异常状态下的判断:
- 如果当前事务管理器处于FATAL_ERROR状态,或者开启了幂等功能,则无法继续本次的发送请求,如果事务管理器存在异常信息,则还会中断RecordAccumulator中还未发送的batch,同时执行拉取Kafka集群的请求,结束本次发送请求。
- 如果当前事务管理器中存在中断错误,则需要中断RecordAccumulator中还未发送出去的batch。
构建生产数据
校验完事务管理器的状态和事务的必要参数后,接下来就进入核心阶段,组装生产请求并发送:
// 获取当前时间戳
long currentTimeMs = time.milliseconds();
// 发送生产数据,这里这是构建发送数据请求,然后将请求放入到Channel中,等待执行
long pollTimeout = sendProducerData(currentTimeMs);
获取当前的时间戳之后,接下来继续看Sender#sendProducerData()。
获取已经准备好发送的batch
首先,会从所有的partition中,筛选出已经准备好发送的batch:
// 从连接metadata中获取集群信息
Cluster cluster = metadata.fetch();
// 从累加器中获取处于准备发送状态的分区record累加器
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
继续看RecordAccumulator#ready()方法:
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// 是否有等待获取内存的线程
boolean exhausted = this.free.queued() > 0;
// 遍历每个partition的消息队列
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
// 对双向队列进行同步操作,避免还有追加的操作
synchronized (deque) {
// 当向比较多的分区生产消息时,当前调用是个热区域
// 我们通常校验第一个batch,用于避免更昂贵代价的校验
// 获取队列中第一个封装好的batch
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
// 获取分区信息
TopicPartition part = entry.getKey();
// 从集群信息中获取当前partition的leader节点
Node leader = cluster.leaderFor(part);
if (leader == null) {
// 如果当前分区不存在leader节点,添加到未知leader节点的topic集合中
// leader节点不存在的情况下,也是可以进行消息发送的
// 需要注意的是,即使双向队列为空,也不会将元素从batch中移除
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
// 此时partition存在leader节点
// 如果就绪节点集合中没有当前leader节点,并且当前partition不处于静默状态
// 获取当前batch的已经等待的时间
long waitedTimeMs = batch.waitedTimeMs(nowMs);
// 是否需要进行重试
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
// 判断batch是否处于满的状态,需要消息队列有至少两个batch
boolean full = deque.size() > 1 || batch.isFull();
// 是否超过等待时间
boolean expired = waitedTimeMs >= timeToWaitMs;
// 判断是否可发送
boolean sendable = full || expired || exhausted || closed || flushInProgress();
// 如果可发送,且无需进行重试,则将此partition对应的leader节点置为准备就绪状态
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
// 如果不可发送,或者需要进行重试
// 计算剩余等待时间
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// 注意:这是一个保守的估计结果
// 因为一个不可发送数据的分区可能会有一个临界点,不久之后会发现拥有可发送的数据
// 然而这已经足够好了,因为我们只是唤醒一下,然后就会继续sleep剩余的持续时间
// 所以取剩余等待时间和下一次就绪状态检查时间的最小值
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
// 包装就绪检查结果
// 参数包括,就绪的节点,下一次就绪检查的时间,以及没有首领节点的topic集合
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
- 遍历每一个RecordAccumulator中存储的partition的消息队列,操作每个消息队列时需要进行同步操作。
- 为了节约资源,我们每次只需校验每个partition的第一个batch即可。
- 首先,需要校验partition对应的leader节点是否存活。
- 接着,需要判断当前batch已经等待发送的时间,剩余等待发送的时间, 是否需要进行重试,重试的等待时间,是否已经过了需要发送的时间。
- 然后,根据上一步计算出的各种条件,判断当前是否满足发送条件,如果满足发送条件,则将partition对应的leader节点添加到准备就绪的节点中。
- 否则,不满足条件的情况下,需要计算下一次重试发送的时间。
我们最后发现,其实返回的并不是准备好发送的batch,而是准备就绪的leader节点。
这样做的意义是,由于leader节点存在多个partition的情况,所以我们在和同一个leader节点建立连接后,可以把leader节点上的partition消息队列一次全部传输过去。
尝试更新集群元数据
由于在获取准备就绪的leader节点过程中,存在未知leader节点的问题,所以我们需要重新拉取一次集群的元数据,看剩余的topic是否已经拥有leader节点:
if (!result.unknownLeaderTopics.isEmpty()) {
// 这个集合既包括了正在选举主节点的topic,也包括已经失效的topic,
// 将它们继续添加到metadata中,以确保被metadata包含在内,然后进行一次metadata更新,因为有消息仍需要发送到这些topic上
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
// 进行一次metadata更新
this.metadata.requestUpdate();
}
对leader节点进行筛选
接下来,对已经准备就绪的leader节点进行一次筛选,排除那些可能无法连接的leader节点:
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
汇总每个leader节点需要发送的batch
接下来就将batch的维度从partition转换到leader节点:
// 创建生产请求,汇总每个leader节点需要发送的batch列表
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 添加到运行中的batch集合中
addToInflightBatches(batches);
我们继续看RecordAccumulator#drain()方法:
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
// 汇总每个节点需要发送的batch列表
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
继续RecordAccumulator#drainBatchesForOneNode()方法:
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
// 获取当前节点的partition信息
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
// 为了减少竞争,循环不从索引0开始
int start = drainIndex = drainIndex % parts.size();
do {
// 获取指定索引的partition
PartitionInfo part = parts.get(drainIndex);
// 创建topic-partition对象,用于存储关联信息
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
// 计算下一个需要进行排空的索引
this.drainIndex = (this.drainIndex + 1) % parts.size();
// 如果当前partition请求处于静默状态,继续遍历下一个partition
if (isMuted(tp, now))
continue;
// 获取当前partition的batch队列
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
// 如果队列为空,继续遍历下一个partition
continue;
// 队列不为空的情况下,需要对队列进行同步操作
synchronized (deque) {
// 获取当前partition batch队列的第一个batch,但不取出
ProducerBatch first = deque.peekFirst();
// 如果不存在第一个batch,继续遍历下一个partition
if (first == null)
continue;
// 如果存在第一个batch
// 判断是否需要进行失败重试
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// 需要进行失败重试,继续遍历下一个partition
if (backoff)
continue;
// 如果大小总和已经超过当次发送总大小的阈值
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// 不需要继续发送了,跳出循环
break;
} else {
// 如果没有超出发送总大小阈值
// 首先对是否停止排空进行验证,主要是对事务状态进行校验
if (shouldStopDrainBatchesForPartition(first, tp))
break;
// 判断当前partition是否处于事务状态
boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
// 生成Kafka集群需要用到的producer id和epoch版本
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
// 此时需要获取第一个batch
ProducerBatch batch = deque.pollFirst();
// 如果需要produce id以及epoch,同时batch不需要序列号
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// 如果当前batch已经分配了一个序列号,我们就不要去修改produce id和序列号,因为可能会产生重复的情况
// 尤其是在前一次存在可能被接受的情况,并且如果我们修改了producer id和序列号,本次尝试也可能会被接受,引发了重复
// 除此之外,我们会更新为当前partition更新下一个序列号,同时也可以用事务管理器记录batch,这样即使在接收到顺序之外的请求,我们也能根据序列号继续确保维持序列号的顺序
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);
transactionManager.addInFlightBatch(batch);
}
// 关闭batch
batch.close();
// 修改传输的字节数大小
size += batch.records().sizeInBytes();
// 在就绪batch集合中添加当前batch
ready.add(batch);
// 设置当前的排空时间
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}
- 此时情况反过来,我们需要以当前leader节点上关联的partition集合,去RecordAccumulator中寻找指定的partition的消息队列。
- 由于可能存在竞争的条件下,则遍历不会从索引0开始,
- 在partition处于正常状态下,才可以继续后续动作。
- 我们尝试获取但不取出消息队列中的第一个batch,用以判断是否需要进行重试,以及计算是否超出请求的最大值。
- 如果满足发送数据大小条件限制,则会准备一些发送请求的参数:
- 事务管理器状态是否满足条件。
- 处于事务状态下,则还需要ProducerIdEpoch的必备参数对象。
- 处于事务状态下,还需要获取当前batch的sequence序号,以及生成下一个batch的sequence序号。
- 获取有关事务的必要条件后,我们关闭当前batch,不允许继续写入batch。
- 最后进行收尾工作,更新当前leader节点需要传输的字节数,并将batch添加到就绪集合中,并设置当前batch的准备就绪的时间。
sequence序号用于Kafka集群验证幂等性。
此时已经获取所有leader节点和每个leader节点需要发送的batch列表。
调整batch的顺序
在需要保证消息顺序的情况下,我们需要将本次将要发送的partition置为静默状态:
// 在需要保证消息顺序的情况下
if (guaranteeMessageOrder) {
// 关闭所有排空的partition
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
处理异常状况的batch
接下来将会对处于已经发送但是等待响应超时,已经失效的batch的进行汇总,合并为失效的batch集合:
// 重置下一个batch的失效时间
accumulator.resetNextBatchExpiryTime();
// 获取已经发送超时的正在运行的batch集合
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 获取当前累加器的所有失效的batch集合
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
// 汇总两种情况的失效集合
expiredBatches.addAll(expiredInflightBatches);
对于失效的batch,我们会进行如下处理:
// 遍历所有失效的batch,将它们置为失效
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
// 对失效batch执行失败操作
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
// 在拥有事务管理器的情况下,并且失效的batch进入了重试
if (transactionManager != null && expiredBatch.inRetry()) {
// 标记一下batch还没有发送成功,不允许接着分配sequence序号
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
- 如果是事务型的KafkaProducer,则会抛出异常。
- 如果是非事务型KafkaProducer,但是开启了幂等功能,则需要重置producer id,清空不允许分配sequence序号的缓存。
- 记录错误信息。
- 执行batch的异步回调任务,将错误信息告知开发者。
- 将batch移除正在处理的batch集合中,并释放batch占用的堆外内存。
发送生产请求
处理完之前发送的失效的batch之后,接下来需要发送生产请求:
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
// 等待时间必须大于0
pollTimeout = Math.max(pollTimeout, 0);
// 如果有准备就绪的节点,
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// 如果一些分区已经准备好发送数据了,select时间为0
// 其它情况下,如果一些分区拥有累加的数据,但是还没有就绪,那么select时间为当前时间戳和过期时间之差
// 在上述情况的其它情况下,select时间是当前时间戳和集群metadata过期时间之差
pollTimeout = 0;
}
// 构建生产消息请求,返回获取结果的等待时间
sendProduceRequests(batches, now);
return pollTimeout;
- 首先我们需要计算获取Kafka集群返回响应的等待时间:
- nextReadyCheckDelayMs为所有batch的剩余等待时间的最小值。
- notReadyTimeout当前客户端的处理上一次请求的剩余等待时间。
- RecordAccumulator的剩余失效时间。
- 将取三者之间的最小值,但是等待时间必须≥0。
- 如果有leader节点已经准备就绪,直接将等待时间置为0。
- 发送生产请求。
- 返回需要进行等待的时间。
我们继续看发送生产请求的执行过程,Sender#sendProducerRequests():
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
发送过程,会对每个leader节点单独执行发送,除了发送的batch列表,还包括当前请求时间,请求的超时时间,acks参数。
那么是如何发送到指定的leader节点的呢?
我们继续看Sender#sendProduceRequest():
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
// partition以及partition需要传输的batch集合
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
// partition以及partition传输的batch集合
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// 获取版本号
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
// 向后兼容
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
// 遍历每个batch
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
// 获取每个batch的record
MemoryRecords records = batch.records();
// 通常,在producer构建batch和发送写入请求之间,是允许出现一段延迟的,我们还可能会基于过时的metadata选择消息的格式
// 在最坏的情况下,我们将会乐观的选择去用新的消息格式,但是发现broker并不支持,所以我们就需要向下转换,以便broker能支持
// 向下转换已在处理在集群中中出现的边界case,因为broker可能在不同版本中并不支持同样的消息格式
// 举个例子,如果一个partition从一个支持的版本,迁移到一个不支持的版本,我们就需要进行转换
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
// 向两种集合中添加不同分类的待发送记录
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
// 获取transaction id
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
// 根据版本,构建request
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
// 构建请求完成后的回调任务
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
String nodeId = Integer.toString(destination);
// 创建请求
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 发送请求,请求完成后,会执行回调
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
代码虽然比较多,但是逻辑比较清晰:
- 首先也是要遍历当前leader节点需要发送的batch列表,对其中不符合版本条件的batch进行转换,使其符合发送的版本条件。
- 在遍历过程中会生成两个集合:
- topic-partition=>MemoryRecords集合用于发送给Kafka集群。
- topic-partition=>ProducerBatch用与构建响应的回调任务。
- 然后,根据要发送的数据,acks,请求超时时间,事务ID,当前消息最低版本类型,构建生产消息请求建造者对象。
- 接着,构建当前leader节点写入消息后返回的响应的回调任务。
- 最后,根据生产请求建造者对象和回调任务创建客户端请求,由客户端发送请求。
Sender线程的关闭
当Sender线程的运行状态running置为false后,将不再继续执行runOnce()。
如果我们不是强制关闭,则会有一个关闭的过程:
// 我们停止接收请求,但是可能还会有请求在事务管理器中、累加器中,或者等待ack
// 当前发送线程将一直等待所有请求完成
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
// 所有请求完成之后,再执行一次
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
此时的情况是,RecordAccumulator中仍有需要发送的batch,我们还会在进行一次发送。
针对事务存在的情况:
// 在任何提交或者中断,没有经过事务管理器的队列情况下,中断事务
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
// 事务管理器没有完成的情况下,进行中断
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
// 再执行一次发送
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
如果事务处理器也没有处理完所有的事务,也需要再进行一次发送。
如果调用的是强制关闭:
if (forceClose) {
// 存在事务管理器,则关闭事务管理器
// 此时,我们需要让所有未完成的事务请求或者batch失败,然后唤醒因为异步请求,现在仍然等待的获取结果的线程
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
// 中断没有完成的batch
this.accumulator.abortIncompleteBatches();
}
则中断所有未完成的batch,粗暴的进行关闭。
随后关闭KafkaClient,此后将不会发送任何请求。
try {
// 关闭client
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
总结
在消息发送上,Kafka采取了异步模型,模块各司其职:
- RecordAccumulator,负责缓存需要发送的partition的消息队列。
- Sender,是一个后台线程,生命周期与KafkaProducer线程相同,负责定期将RecordAccumulator中的数据以leader节点=>batch列表的维度,发送给Kafka集群。
- 接收到Kafka集群返回的响应后,会对batch中的每一条record进行回调处理。