https://velog.io/@kshired/Spring-Kafka-ErrorHandlingDeserializer

 

Spring Kafka ErrorHandlingDeserializer

Kafka consumer를 사용하다보면, JsonDeserializer를 많이 사용하게 됩니다.하지만 가끔 정의한 schema와 다른 value가 의도치않게 들어오는 경우가 생기는데 ( 이벤트 수기 발행이라던지, 프로듀서 쪽에서

velog.io

 

ErrorHandlingDeserializer 의 동작방식은 위 페이지에서 자세하게 잘 정리해주셨습니다. 
요번에 해당 옵션을 적용하면서, 
갑자기 warn log가 엄청나게 쌓이는 현상이 발생했는데요, 
적용했던 내용, 문제 원인과 조치 방법을 공유하려 합니다. 


적용 내용

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, Object> consumerKafkaFactory() {
		ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
		factory.setRecordMessageConverter(new StringJsonMessageConverter());
		return factory;
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap-server:9092"));
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "auto-offset-reset");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
		props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
		props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
		return props;
	}


위 처럼, Deserializer 설정을 추가 한 뒤, Bean 등록을 해주었습니다. 

해당 옵션이 켜지게 되면, consumer 는 checkNullKeyForExceptions 옵션이 true로 설정됩니다.

이 옵션은 ErrorHandlingDeserializer 를 사용하는 한, 변경 false 로 변경 할 수 없습니다. 

			this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
					|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
							consumerProperties, false,
							applicationContext == null
									? getClass().getClassLoader()
									: applicationContext.getClassLoader());

 

문제 상황

운영 부서 특성상, kafka record 발행 시, key 값을 optional 로 발행해주고 있었는데요, 
순서가 중요한 토픽 -> key 값에 id 

순서가 중요하지 않은 토픽 -> key 값에 null
ErrorHandlingDeserializer 의 코드 상, null 값 record 가 들어왔을 때 warn 로그를 찍어주고있습니다. 

private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {

			if (cRecord.value() instanceof DeserializationException ex) {
				throw ex;
			}
			if (cRecord.key() instanceof DeserializationException ex) {
				throw ex;
			}
			if (cRecord.value() == null && this.checkNullValueForExceptions) {
				checkDeser(cRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
			}
			if (cRecord.key() == null && this.checkNullKeyForExceptions) {
				checkDeser(cRecord, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
			}
			doInvokeOnMessage(cRecord);
			if (this.nackSleepDurationMillis < 0 && !this.isManualImmediateAck) {
				ackCurrent(cRecord);
			}
		}

 

SereializationUtils 코드 상, header 가 DeserializationExceptionHeader 아닐 시, 로깅합니다. 

@Nullable
	public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record,
			String headerName, LogAccessor logger) {

		Header header = record.headers().lastHeader(headerName);
		if (!(header instanceof DeserializationExceptionHeader)) {
			// 여기 부분이 찍힘
			logger.warn(
					() -> String.format("Foreign deserialization exception header in (%s) ignored; possible attack?",
							KafkaUtils.format(record)));
			return null;
		}
		if (header != null) {
			byte[] value = header.value();
			DeserializationException exception = byteArrayToDeserializationException(logger, header);
			if (exception != null) {
				Headers headers = new RecordHeaders(record.headers().toArray());
				headers.remove(headerName);
				exception.setHeaders(headers);
			}
			return exception;
		}
		return null;
	}

 

null 키로 역직렬화 된 레코드 정보가 있지만, commit 과정을 넘어가는 것에 대한 로깅으로 보시면 되겠습니다.

저희 부서에서는 warning 로그 도 저장소에 쌓고있는데요. 
이 코드가 배포되면서 급격하게 사용량이 증가하기 시작하였습니다.  😭

 

해결

따라서  log4j상에서 해당 패키지를 exclude 시킬지, key deserializer 에는 ErrorHandlingDeserializer 를 적용하지 않을지 고민하다가, 후자로 결정하여 적용하였습니다. 

HTTP Transfer-Encoding

HTTP 통신시 인코딩 형태를 지정하는 설정 값. 헤더에 설정할 수 있다.
(HTTP 1.1 부터 지원, 설정 시 Content-Length 헤더는 반드시 없어야 함.)
종류 : chunked, compress, deflate, gzip

 

 

 

테스트 

과제 진행 전, API에 응답값을 추가해야 할 일이 생겼었습니다. 
대략적으로 응답 객체가 5 -> 100개로 추가되는 상황이었는데요. 
데이터 크기가 한 번에 크게 증가하다 보니 어디서든 Limit 이 걸리지 않을까 우려되어 테스트를 진행하게 되었고, 

"옵션(Transfer-Encoding) 으로 인해 이슈 없음" 의 결론을 짓게 되었습니다.
테스트 과정을 말씀드리겠습니다.

 

 

nginx 설치

 

nginx, tomcat 서버 구성

1.  reverse-proxy 서버 구조를 구성하기 위해서 nginx 를 설치해줍니다.

$ brew install nginx

$ nginx -V
nginx version: nginx/1.27.3
built by clang 16.0.0 (clang-1600.0.26.4)
built with OpenSSL 3.4.0 22 Oct 2024
TLS SNI support enabled
configure arguments: --prefix=/opt/homebrew/Cellar/nginx/1.27.3 --sbin-path=/opt/homebrew/Cellar/nginx/1.27.3/bin/nginx --with-cc-opt='-I/opt/homebrew/opt/pcre2/include -I/opt/homebrew/opt/openssl@3/include' --with-ld-opt='-L/opt/homebrew/opt/pcre2/lib -L/opt/homebrew/opt/openssl@3/lib' --conf-path=/opt/homebrew/etc/nginx/nginx.conf --pid-path=/opt/homebrew/var/run/nginx.pid --lock-path=/opt/homebrew/var/run/nginx.lock --http-client-body-temp-path=/opt/homebrew/var/run/nginx/client_body_temp --http-proxy-temp-path=/opt/homebrew/var/run/nginx/proxy_temp --http-fastcgi-temp-path=/opt/homebrew/var/run/nginx/fastcgi_temp --http-uwsgi-temp-path=/opt/homebrew/var/run/nginx/uwsgi_temp --http-scgi-temp-path=/opt/homebrew/var/run/nginx/scgi_temp --http-log-path=/opt/homebrew/var/log/nginx/access.log --error-log-path=/opt/homebrew/var/log/nginx/error.log --with-compat --with-debug --with-http_addition_module --with-http_auth_request_module --with-http_dav_module --with-http_degradation_module --with-http_flv_module --with-http_gunzip_module --with-http_gzip_static_module --with-http_mp4_module --with-http_random_index_module --with-http_realip_module --with-http_secure_link_module --with-http_slice_module --with-http_ssl_module --with-http_stub_status_module --with-http_sub_module --with-http_v2_module --with-http_v3_module --with-ipv6 --with-mail --with-mail_ssl_module --with-pcre --with-pcre-jit --with-stream --with-stream_realip_module --with-stream_ssl_module --with-stream_ssl_preread_module

 

2. 설치가 완료되었으면, nginx.conf 파일을 수정하여 upstream 설정을 추가해줍니다.

#user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}

http {

    upstream backend {
        server 127.0.0.1:8080;
    }	


    bytes_sent   1;

    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
	    proxy_pass http://backend;	
            root   html;
            index  index.html index.htm;
        }

        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

        # proxy the PHP scripts to Apache listening on 127.0.0.1:80
        #
        #location ~ \.php$ {
        #    proxy_pass   http://127.0.0.1;
        #}

        # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
        #
        #location ~ \.php$ {
        #    root           html;
        #    fastcgi_pass   127.0.0.1:9000;
        #    fastcgi_index  index.php;
        #    fastcgi_param  SCRIPT_FILENAME  /scripts$fastcgi_script_name;
        #    include        fastcgi_params;
        #}

        # deny access to .htaccess files, if Apache's document root
        # concurs with nginx's one
        #
        #location ~ /\.ht {
        #    deny  all;
        #}
    }


    # another virtual host using mix of IP-, name-, and port-based configuration
    #
    #server {
    #    listen       8000;
    #    listen       somename:8080;
    #    server_name  somename  alias  another.alias;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}


    # HTTPS server
    #
    #server {
    #    listen       443 ssl;
    #    server_name  localhost;

    #    ssl_certificate      cert.pem;
    #    ssl_certificate_key  cert.key;

    #    ssl_session_cache    shared:SSL:1m;
    #    ssl_session_timeout  5m;

    #    ssl_ciphers  HIGH:!aNULL:!MD5;
    #    ssl_prefer_server_ciphers  on;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}
    include servers/*;
}

 

3. 설정을 완료 했으면 nginx 재기동  $ nginx -s reload  해줍니다.

 

Spring Boot App

API 서버 역할을 할 MVC 앱을 작성합니다. (기본 port : 8080)

@RestController
public class SimpleApiController {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Response {
        List<String> stringList = new ArrayList<>();
    }

    private void addStringList(List<String> stringList) {
        stringList.add("긴문자열")
	}
    
    @GetMapping(
        value = "/test",
        produces = MediaType.APPLICATION_JSON_VALUE)
    public Response test() {
        Response response = new Response();
        for (int i = 0; i < 1000; i++) {
            addStringList(response.stringList);
        }
        return response;
    }    
}

테스트를 위해 64MB 의 크기를 갖도록 API 를 만들었습니다. 



이제 호출해봅시다. 

 

200 응답을 받았습니다. 데이터의 크기가 64MB가 되는데도 에러가 발생하지 않았습니다. 
응답이 오기까지는 약 5초정도 걸렸는데요. 왜 그런지 응답 값을 분석해보았습니다.

Connection: keep-alive
Transfer-Encoding: chunked

위 설정으로 인하여, 커넥션이 열려있는 상태로 응답을 chunked 단위로 계속 받아오는구나! 추측할 수 있었습니다!허나, 뭔가 찝찝한 기분이 들어서 좀 더 세밀한 분석을 해보기 위하여 WireShark 를 이용해봅시다.

 

WireShark 분석

  • 첫요청 을 시작으로 클라이언트에게 필요한 데이터 인코딩 방법을 응답 헤더로 전달하고있습니다.

  • 그 후 쭉 Chunk를 전달하다

  • 마지막 응답 종료 패킷을 끝으로 통신이 종료됩니다. 

왜 Chunk 사이즈는 16384, 16k 일까요?

nginx 기본 설정 client_body_buffer_size :16k  이어서 입니다.

번역이 헷갈릴 수 있는데, 입력값 중 아무 숫자 한개만 1을 더해서 곱했을 때 최댓값을 구하는 문제이다.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
 
public class CodeForce {
    public static void main(String[] args) throws IOException {
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        int T = Integer.parseInt(br.readLine());
        for (int i = 0; i < T; i++) {
            String s = br.readLine();
            Integer arr[] = Arrays.stream(br.readLine().split(" "))
                .map(Integer::valueOf)
                .toList()
                .toArray(new Integer[0]);
            int maximum = 0;
            for (int j = 0; j < arr.length; j++) {
                int candidate = 1;
                for (int k = 0; k < arr.length; k++) {
                    if (j == k) {
                        candidate = candidate * (arr[k] + 1);
                    } else {
                        candidate = candidate * arr[k];
                    }
                }
 
                maximum = Math.max(maximum, candidate);
            }
 
            System.out.println(maximum);
        }
    }
}

'CodeForce > Round 913' 카테고리의 다른 글

A - Rook  (0) 2025.01.02

 

현재 위치를 d5 라고 가정할 때,
시작점을 기준으로  [d][1~8] , [a~h][5] 순회하면서, 자신의 위치[d5]를 제외한 좌표를 출력해주면 된다.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
 
public class CodeForce {
 
    public static void main(String[] args) throws IOException {
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        int T = Integer.parseInt(br.readLine());
        for (int i = 0; i < T; i++) {
            String s = br.readLine();
            char x = s.charAt(0);
            int y = s.charAt(1) - '0';
 
            for (int j = 1; j <= 8; j++) {
                if (j != y) {
                    System.out.println(String.format("%c%d", x, j));
                }
            }
 
            for (char j = 'a'; j < 'a' + 8; j++) {
                if (j != x) {
                    System.out.println(String.format("%c%d", j, y));
                }
            }
        }
    }
}

'CodeForce > Round 913' 카테고리의 다른 글

B - Good Kid  (0) 2025.01.02

Concept

"Making API clients easier"

Retrofit, JAXRS-2.0, WebScoekt 에 영감을 받아서 제작된 자바 클라이언트 라이브러리 입니다.
Netflix Denominator를 Http API 에 조금더 쉽게 매핑하는것을 시작으로 탄생하게 되었습니다.

Why Feign?

feign은 cxf, jersey를 사용하여 Rest, Soap 서비스를 제공 합니다.
손쉽게 http api code를 작성할 수 있고, 커스텀 인코딩/디코딩, 에러 핸들링을 지원합니다.

 

 

How To Work?

어노테이션 기반으로 동작합니다. 여러 템플릿을 제공하며 request, argument에 사용가능합니다.
텍스트 기반의 API를 만들기는 어렵지만 api작업 진행중 반복되는 코드 작성을 줄여줍니다.
유닛테스트 또한 간단화 할 수 있습니다.

 

 

Support

v10.x : java 8이상 필요, jdk6 이하 버전은 v9.x 를 사용바랍니다.

 

 

 

Use Case

Gradle

implementation 'io.github.openfeign:feign-core:13.5'
implementation 'io.github.openfeign:feign-gson:9.4.0' // decoding 위해 추가

 

Client

import feign.Param;
import feign.RequestLine;


public interface FeignTestClient {

    @RequestLine("GET /feign/{productId}")
    Product getProduct(@Param("productId") String productId);

    @RequestLine("POST /feign")
    void saveProduct(Product product);
}

 

 

Server

@RestController
public class FeignTestController {

    @GetMapping("/feign/{productId}")
    public Product getProducts(@PathVariable String productId) {
        return new Product(1L, BigDecimal.valueOf(350L));
    }

    @PostMapping("/feign")
    public void saveProduct(Product product) {
        System.out.println(product);
    }
}

 

 

Test

    public static void main(String[] args) {
        FeignTestClient feignTestClient = Feign.builder()
            .decoder(new GsonDecoder())
            .target(FeignTestClient.class, "http://127.0.0.1:8080");

        Product product = feignTestClient.getProduct("1");
        System.out.println(product);
    }

 

'백엔드 > java' 카테고리의 다른 글

Full GC가 터지는 서버를 고쳐보자!  (0) 2022.07.31

 

오늘은 24년 12월 둘째주에 업무를 진행하면서 발생했던 문제 상황과 해결한 방법을 공유하려 한다.
특정 유저가 닉네임 ( DB ) 를 업데이트 했는데, 닉네임 API 를 호출하면 이전 닉네임이 응답값으로 오는 현상이 발생했다.
히스토리 상 DB 업데이트도 성공적으로 완료되었기 때문에 문제될만한 지점이 없어보였다.

 


 

문제상황 요약

간단한 닉네임 업데이트

 

유저는 자신의 닉네임을 자유롭게 변경할수 있다!
하지만, 우리 서비스에 닉네임을 조회하는 API 가 있었고, 서비스의 급 성장으로 인해
닉네임 API 에 캐싱을 적용하게 되었었다.

 

 

 

닉네임 API 호출량 증가로 캐싱 12시간 적용!

 

문제가 되는 코드는 아래와 같았다.

  1. 트랜잭션 시작
  2. 유저를 찾는다.
  3. 닉네임을 수정하고 DB 업데이트 한다.
  4. 닉네임 캐싱 API 를 evict 처리 한다.
  5. 트랜잭션이 종료 되고 update 쿼리가 수행된다.
위 상황에서 4번이 완료되면 캐시 서버에서 해당 프로파일은 삭제된다.
이 때 5번 트랜잭션이 종료되기 전에 다시 닉네임 캐싱 API 가 호출되면, 업데이트 전 프로파일 정보로 새로 캐싱하게 된다.
최종적으로 DB와 캐시 값이 일치하지 않는 상황이 발생한다.

 


 

해결 방법

 

@Transactional 과 외부 시스템 통신 코드를 하나로 묶어서 자주 발생하는 문제중에 하나인데, 이럴 경우

Transactional 동작에 맞춰서 외부 시스템 통신코드를 실행해주는게 편리하다.

 

  1. @Transactional 종료 시, 성공적으로 commit 이 발생했다면 캐시를 evict
  2. exception 이 발생했다면 캐시 유지.

 

afterCommit 이벤트 등록

Spring 에서는 현재 상태의 Transaction 에 따라 이벤트를 등록하는 방법을 제공한다.

현재 트랜잭션 상태에 있다면, afterCommit 이벤트 리스너를 등록해서, 커밋이 완료 된 후에만 캐시를 evict 하게 할 수 있다.

annotation based 로 해결하는 방법으로는 TransactionEventListener를 이용하는 방법도 있다.

 

 

깨달은 점 : @Transactional 안에서 외부 시스템을 호출 시 주의하자.

 

 

+ Recent posts