백엔드/spring

ErrorHandlingDeserializer

콤비네이션피자라지 2025. 2. 11. 17:36

https://velog.io/@kshired/Spring-Kafka-ErrorHandlingDeserializer

 

Spring Kafka ErrorHandlingDeserializer

Kafka consumer를 사용하다보면, JsonDeserializer를 많이 사용하게 됩니다.하지만 가끔 정의한 schema와 다른 value가 의도치않게 들어오는 경우가 생기는데 ( 이벤트 수기 발행이라던지, 프로듀서 쪽에서

velog.io

 

ErrorHandlingDeserializer 의 동작방식은 위 페이지에서 자세하게 잘 정리해주셨습니다. 
요번에 해당 옵션을 적용하면서, 
갑자기 warn log가 엄청나게 쌓이는 현상이 발생했는데요, 
적용했던 내용, 문제 원인과 조치 방법을 공유하려 합니다. 


적용 내용

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Object> consumerKafkaFactory() {
		ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
		factory.setRecordMessageConverter(new StringJsonMessageConverter());
		return factory;
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap-server:9092"));
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "auto-offset-reset");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
		props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
		props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
		return props;
	}


위 처럼, Deserializer 설정을 추가 한 뒤, Bean 등록을 해주었습니다. 

해당 옵션이 켜지게 되면, consumer 는 checkNullKeyForExceptions 옵션이 true로 설정됩니다.

이 옵션은 ErrorHandlingDeserializer 를 사용하는 한, 변경 false 로 변경 할 수 없습니다. 

			this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
					|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
							consumerProperties, false,
							applicationContext == null
									? getClass().getClassLoader()
									: applicationContext.getClassLoader());

 

문제 상황

운영 부서 특성상, kafka record 발행 시, key 값을 optional 로 발행해주고 있었는데요, 
순서가 중요한 토픽 -> key 값에 id 

순서가 중요하지 않은 토픽 -> key 값에 null
ErrorHandlingDeserializer 의 코드 상, null 값 record 가 들어왔을 때 warn 로그를 찍어주고있습니다. 

private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {

			if (cRecord.value() instanceof DeserializationException ex) {
				throw ex;
			}
			if (cRecord.key() instanceof DeserializationException ex) {
				throw ex;
			}
			if (cRecord.value() == null && this.checkNullValueForExceptions) {
				checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
			}
			if (cRecord.key() == null && this.checkNullKeyForExceptions) {
				checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
			}
			doInvokeOnMessage(cRecord);
			if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
				ackCurrent(cRecord);
			}
		}

 

SereializationUtils 코드 상, header 가 DeserializationExceptionHeader 아닐 시, 로깅합니다. 

@Nullable
	public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
			String headerName, LogAccessor logger) {

		Header header = record.headers().lastHeader(headerName);
		if (!(header instanceof DeserializationExceptionHeader)) {
			// 여기 부분이 찍힘
			logger.warn(
					() -> String.format("Foreign deserialization exception header in (%s) ignored; possible attack?",
							KafkaUtils.format(record)));
			return null;
		}
		if (header != null) {
			byte[] value = header.value();
			DeserializationException exception = byteArrayToDeserializationException(logger, header);
			if (exception != null) {
				Headers headers = new RecordHeaders(record.headers().toArray());
				headers.remove(headerName);
				exception.setHeaders(headers);
			}
			return exception;
		}
		return null;
	}

 

null 키로 역직렬화 된 레코드 정보가 있지만, commit 과정을 넘어가는 것에 대한 로깅으로 보시면 되겠습니다.

저희 부서에서는 warning 로그 도 저장소에 쌓고있는데요. 
이 코드가 배포되면서 급격하게 사용량이 증가하기 시작하였습니다.  😭

 

해결

따라서  log4j상에서 해당 패키지를 exclude 시킬지, key deserializer 에는 ErrorHandlingDeserializer 를 적용하지 않을지 고민하다가, 후자로 결정하여 적용하였습니다.