ErrorHandlingDeserializer
https://velog.io/@kshired/Spring-Kafka-ErrorHandlingDeserializer
Spring Kafka ErrorHandlingDeserializer
Kafka consumer를 사용하다보면, JsonDeserializer를 많이 사용하게 됩니다.하지만 가끔 정의한 schema와 다른 value가 의도치않게 들어오는 경우가 생기는데 ( 이벤트 수기 발행이라던지, 프로듀서 쪽에서
velog.io
ErrorHandlingDeserializer 의 동작방식은 위 페이지에서 자세하게 잘 정리해주셨습니다.
요번에 해당 옵션을 적용하면서,
갑자기 warn log가 엄청나게 쌓이는 현상이 발생했는데요,
적용했던 내용, 문제 원인과 조치 방법을 공유하려 합니다.
적용 내용
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> consumerKafkaFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setRecordMessageConverter(new StringJsonMessageConverter());
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap-server:9092"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "auto-offset-reset");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return props;
}
위 처럼, Deserializer 설정을 추가 한 뒤, Bean 등록을 해주었습니다.
해당 옵션이 켜지게 되면, consumer 는 checkNullKeyForExceptions 옵션이 true로 설정됩니다.
이 옵션은 ErrorHandlingDeserializer 를 사용하는 한, 변경 false 로 변경 할 수 없습니다.
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
consumerProperties, false,
applicationContext == null
? getClass().getClassLoader()
: applicationContext.getClassLoader());
문제 상황
운영 부서 특성상, kafka record 발행 시, key 값을 optional 로 발행해주고 있었는데요,
순서가 중요한 토픽 -> key 값에 id
순서가 중요하지 않은 토픽 -> key 값에 null
ErrorHandlingDeserializer 의 코드 상, null 값 record 가 들어왔을 때 warn 로그를 찍어주고있습니다.
private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {
if (cRecord.value() instanceof DeserializationException ex) {
throw ex;
}
if (cRecord.key() instanceof DeserializationException ex) {
throw ex;
}
if (cRecord.value() == null && this.checkNullValueForExceptions) {
checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
}
if (cRecord.key() == null && this.checkNullKeyForExceptions) {
checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
doInvokeOnMessage(cRecord);
if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
ackCurrent(cRecord);
}
}
SereializationUtils 코드 상, header 가 DeserializationExceptionHeader 아닐 시, 로깅합니다.
@Nullable
public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
String headerName, LogAccessor logger) {
Header header = record.headers().lastHeader(headerName);
if (!(header instanceof DeserializationExceptionHeader)) {
// 여기 부분이 찍힘
logger.warn(
() -> String.format("Foreign deserialization exception header in (%s) ignored; possible attack?",
KafkaUtils.format(record)));
return null;
}
if (header != null) {
byte[] value = header.value();
DeserializationException exception = byteArrayToDeserializationException(logger, header);
if (exception != null) {
Headers headers = new RecordHeaders(record.headers().toArray());
headers.remove(headerName);
exception.setHeaders(headers);
}
return exception;
}
return null;
}
null 키로 역직렬화 된 레코드 정보가 있지만, commit 과정을 넘어가는 것에 대한 로깅으로 보시면 되겠습니다.
저희 부서에서는 warning 로그 도 저장소에 쌓고있는데요.
이 코드가 배포되면서 급격하게 사용량이 증가하기 시작하였습니다. 😭
해결
따라서 log4j상에서 해당 패키지를 exclude 시킬지, key deserializer 에는 ErrorHandlingDeserializer 를 적용하지 않을지 고민하다가, 후자로 결정하여 적용하였습니다.