1편에 이어 이제 api 요청을 kafka를 이용해서 로깅해보려 한다.
구상하는 구현 방식은
- api gateway 서버에서 인증 필터 전에 먼저 request 필터를 등록하고
- 해당 필터에서 요청에 대한 정보들을 Kafka에 지정한 요청 토픽에 발행하고
- 응답에 대한 정보도 동일하게 응답 토픽에 발행하려 한다.
Kafka 설치하기
게이트웨이 서버에 카프카를 설치하고 독립 브로커로 실행하기엔 ec2 인스턴스도 힘들어할거같으니..
GCP에서도 무료 계정으로 인스턴스를 하나 더 띄우고 거기에 kafka( 추후 모니터링 서버도 )올려주자
GCP는 400불을 공짜로 무료 지갑에 넣어주고 특정 리전에 생성한 e2 micro에 대해선 프리티어를 제공한다고 한다.
그런데 막상 생성해보면 예상 청구료가 0달러로 뜨진 않는다..;
여튼, GCP는 기본으로 SSH 연결을 위한 키를 제공해주지 않기에 내가 로컬 PC에서 직접 SSH키를 만들고 등록해줘야 한다.
$ ssh-keygen -t rsa -f PATH/FILENAME -C "email@gmail.com"
그 후 공개키를 열어 전부 복사해주고
왼쪽 메뉴 하단에 있는 Metadata에 들어가서 공개키를 붙여넣고 저장해주면 이제 vm에 로컬에서 직접 ssh연결이 가능해진다
인스턴스에 접속했다면 jdk를 설치해주고
카프카 공식 페이지 에서 다운 링크를 복사하고
wget -d 'link'
이런 식으로 카프카를 다운 받아준다.
지난번에 처음 카프카를 공부할 때엔 주키퍼를 이용해서 앙상블과, 카프카 클러스터를 구성했지만
해당 방식은 카프카에서 지향하는 바가 아니기도 하고 이미 해봤기도 했으니 이번에는 KRaft를 통해 카프카를 사용해보려 한다.
KRaft
기존의 zookeeper를 통한 클러스터 메타데이터 관리방식에서의 단점을 제거하기 위해 카프카 자체로 메타데이터를 관리하는 방식
- 주키퍼에 저장된 토픽,파티션에 대한 메타 데이터를 읽는 과정에서 오는 데이터 불일치 가능성, 병목현상을 없앨 수 있다
- 서로 다른 애플리케이션에 대해서 관리할 필요성을 하나 없애기에 운영에 대한 부담을 줄일 수 있다
- 모니터링 시 주키퍼와 카프카 모두를 모니터링 할 필요가 없어지기에 장애 대응을 빠르게 할 수 있다.
주키퍼 방식에서 브로커 중 하나가 컨트롤러의 역할을 주키퍼를 통해 선발하고 다른 브로커가 인식할 수 있게 하는 방식이라면
KRaft모드에선 여러 컨트롤러 후보가 존재하고 해당 후보 중 하나가 active상태로 변경되고 리더 역할을 수행하게 된다.
이제 간단한 설정과 함께 카프카를 실행해보자
export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
계속해서 클러스터 아이디를 유지하기 위해 생성한 uuid를 환경변수로 등록해주자
vi ~/.bashrc
export KAFKA_CLUSTER_ID=uuid
source ~/.bashrc
이런 식으로 환경변수를 저장해준 후
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
storage에 클러스터 id를 등록하면 끝난다.
(물론 로그 디렉토리가 tmp하위로 잡혀서 재부팅 시 삭제되는 불편함은 있지만..)
bin/kafka-server-start.sh config/kraft/server.properties
위의 명령으로 카프카 서버를 켜보면 정상적으로 구동되는 것을 확인할 수 있다.
Kafka 실행하기
우선 게이트웨이 서버에 카프카를 설치해서 브로커 2개로 클러스터를 구성해서 운영을 해볼 생각이다.
마찬가지로 게이트웨이 서버에도 카프카 서버를 설치하고
두 서버 모두에 /config/kraft/server/properties 파일을 열어
순서대로 node.id 를 채번해주고 (1,2)
controller.quorum.voters 에 1@1번노드주소:9093,2@2번노드주소:9093 이렇게 설정해준다
클러스터 내 브로커가 2개이므로 num.partitions, offsets.topic.replication.factor, transaction.state.log.replication.factor, transaction.state.log.min.isr 을 모두 2로 설정해줬다.
그리고 아까 등록했던 클러스터 아이디를 조회한 후
echo $KAFKA_CLUSTER_ID
게이트 웨이 서버에도 환경변수로 등록해주고 스토리지 포맷을 해준다.
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
cluster 연결 오류 해결기
위와 같이 Cluster 내 노드들이 서로에게 연결을 하지 못하는 오류가 발생했다.
액티브 될 컨트롤러를 선택하는 과정에서 서로 통신이 이루어져야 하고 그 후로도 둘 다 브로커의 역할도 하기에 계속 통신이 되어야 한다.
이 둘의 통신이 연결되지 않아 발생하는 문제라고 판단했고 핑을 보내봤다.
node 2 → node 1
node1 → node2
해당 방식으로 핑을 보냈을 때 gcp인스턴스에서 ec2로의 핑이 찍히질 않았다.
뭔가 방화벽에 막혀있다는 뜻이라 생각되어 이리저리 찾아보다보니
ec2에 기본적으로 ICMP가 켜져있지 않다는 걸 알 수 있었고
인바운드로 허용을 해주고 나니
ec2로 핑도 정상적으로 가는 것을 확인할 수 있었다.
문제는.. 그래도 서로 연결이 안된다는 것이다.
보안을 위해 서로의 ip만 허용하고 있었는데 테스트를 위해 이를 모든 ip에 대해서 9092-9093 포트 접근을 허용해보자
다시 두 인스턴스에서 카프카를 올려보면
여전히 문제는 동일하게 발생한다.
그렇다면, 같은 호스트 내에서는 클러스터가 형성되는 지 확인해보자
server.properties 파일을 열어 아래 처럼 로컬에서 두 컨트롤러이자 브로커를 이용해서 클러스터를 형성시키게 설정하자
(log.dirs도 수정해줘야 한다)
그리고, bin/kafka-server.start.sh 를 열어 heap size도 좀 줄여주자
그리고 터미널을 새로열어 server설정 파일을 복사하여 node.id=2, 브로커 포트를 9093, 컨트롤러 포트를 29094로 설정하고
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server2.properties
클러스터 아이디를 잡아준다.
둘 다 켜지기 전까진 똑같은 메세지가 나왔지만, 둘 다 켜지니 바로 카프카 서버가 정상적으로 구동되는 것을 확인할 수 있다.
켜진 김에 ec2 → gcp로 포트 연결을 확인해보면 정상적으로 연결되고 있는 것을 확인할 수 있다.
반대로 돌린 후 gcp → ec2로 포트를 확인해보면
연결이 되지 않는다.
즉, gcp는 인,아웃이 정상적으로 뚫려있는데 ec2가 인바운드를 받지 못하는 것으로 보인다..
뭔가 이상하다고 생각되어 이미 열려있던 8080포트로 서버를 하나 띄워놓고 연결을 시도해봤는데
이것도 연결이 되지 않는다..
그렇다면 기존 도메인으로 연결하던 포트로는 연결이 될까 ?
도메인으로는 정상적으로 연결이 된다.
그렇다는 뜻은 연결이 완전히 차단된 요상한? 상태는 아니라는 뜻이다.
뭔가 Nginx가 모든 포트로 향하는 요청을 잡아먹고 있는건가 싶어 nginx를 끄고 확인해봐도 동일하게 응답을 받지 못했다.
그러다 문득 ec2인스턴스와 gcp에 차이점을 생각하다보니 내가 ec2에 ufw를 켜놨던 걸 기억해냈다..;
ufw를 끄고 재부팅했더니.. 바로 된다………
설정을 다시 돌리고 서버를 띄워보면
.. 언제 그랬냐는듯이 서로 잘 찾아낸다.
그럼 이제 마지막으로 다시 9092,9093 포트를 서로의 ip에서만 접속되게끔 변경해주자
충분히 했으니 사진은 스킵하고 두 서버를 다시 껐다가 다시 켜보면
잘 연결된다.
-끝-
이제 오류도 잡아냈고 클러스터 연결도 잘 수행되고 있으니 다음으론 게이트웨이 로깅용 토픽을 만들고 gateway에서 카프카 메세지를 발행하고 모니터링 서버(gcp)에서 이걸 읽어보자
Gateway에 Kafka 설정하기
먼저 gateway에 카프카 프로듀서를 등록해보자
gradle에 카프카 의존성을 추가해주고
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
value로 넣어줄 카프카 서버 주소도 application.yml에 넣어준 후
spring:
kafka:
bootstrap-servers: kafka-server-address
KafkaConfig파일을 만들어주자
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootStrapAddress;
@Bean
public KafkaAdmin kafkaAdmin(){
Map<String,Object> configs=new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1(){
return TopicBuilder.name("GatewayAPI")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ProducerFactory<String,String> producerFactory(){
Map<String,Object> props=new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<String,String>(producerFactory());
}
}
간단하게 살펴보면 카프카 설정을 관리하는 KafkaAdmin을 빈으로 등록하고
사용할 토픽도 빈으로 등록해준다. (없으면 생성)
그리고 ProducerFactory에 우선 String으로 Serializer를 등록해주고
KafkaTemplate에 등록한다.
카프카 설정까지 완료했으니 이제 발행할 로그 메세지 (request, response) 를 만들어보자.
Request, Response 로깅하기
요청과 응답에 대한 로깅은 전역으로 사용할 것 이기에 GlobalFilter를 사용해보려한다.
@Slf4j
@Component
public class LoggingGlobalFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Map<String, Object> requestMap = new HashMap<>();
Map<String, Object> responseMap = new HashMap<>();
ServerHttpRequest request = getDecoratedRequest(exchange, requestMap);
ServerHttpResponse response = getDecoratedResponse(exchange, responseMap);
return chain.filter(exchange.mutate().request(request).response(response).build())
.then(Mono.fromRunnable(() -> {
StringBuilder sb = new StringBuilder();
sb.append("[").append(exchange.getRequest().getId()).append("] request info : { ");
for (String key : requestMap.keySet()) {
sb.append(key).append(" : ").append(requestMap.get(key)).append(" ");
}
sb.append("}");
log.info(sb.toString());
sb = new StringBuilder();
sb.append("[").append(exchange.getRequest().getId()).append("] response status : ").append(response.getStatusCode()).append(" response info : { ");
for (String key : responseMap.keySet()) {
sb.append(key).append(" : ").append(responseMap.get(key)).append(" ");
}
sb.append("}");
log.info(sb.toString());
}));
}
@Override
public int getOrder() {
return -1;
}
private ServerHttpRequest getDecoratedRequest(ServerWebExchange exchange, Map<String, Object> requestMap) {
ServerHttpRequest request = exchange.getRequest();
requestMap.put("method", request.getMethod());
requestMap.put("uri",request.getURI());
if(!request.getQueryParams().isEmpty())
requestMap.put("query", request.getQueryParams());
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
if (route != null) {
requestMap.put("routeId", route.getId());
requestMap.put("routeUri", route.getUri());
}
return new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
return super.getBody().publishOn(Schedulers.boundedElastic()).doOnNext(dataBuffer -> {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
String requestBody = removeWhiteSpacesFromJson(byteArrayOutputStream.toString(StandardCharsets.UTF_8));
requestMap.put("body", requestBody);
} catch (Exception e) {
log.error(e.getMessage());
}
});
}
};
}
private ServerHttpResponseDecorator getDecoratedResponse(ServerWebExchange exchange,
Map<String, Object> responseMap) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory dataBufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
DefaultDataBuffer joinedBuffers = new DefaultDataBufferFactory().join(dataBuffers);
byte[] content = new byte[joinedBuffers.readableByteCount()];
joinedBuffers.read(content);
responseMap.put("body", new String(content, StandardCharsets.UTF_8));
return dataBufferFactory.wrap(content);
})).onErrorResume(err -> {
log.error(err.getMessage());
return Mono.empty();
});
}else{
log.warn(exchange.getRequest().getId()+ "'s response has no body");
}
return super.writeWith(body);
}
};
}
private String removeWhiteSpacesFromJson(String json) {
ObjectMapper om = new ObjectMapper();
try {
JsonNode jsonNode = om.readTree(json);
return om.writeValueAsString(jsonNode);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
기본적으로 요청 자체가 Reactive 하기에 요청의 Body를 바로 꺼낼 수 없고 직접 조회하면 안된다.
(그러면 컨트롤러 단에서 못읽는다.)
그렇기에 위와 같이 Decorator를 이용해서 단순히 소비하는 것이 아닌 내용만 읽어보고 다시 포장해서? 넘겨주는 방식을 사용하여 요청과 응답을 소비하지 않고 로깅할 수 있게 된다.
이제 로그를 확인해보면
plain api 인 경우
2024-07-14T11:14:35.076+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-3] request info : { routeUri : <http://localhost:8088> routeId : backend-route method : GET uri : <http://localhost:8080/test/nothing> }
2024-07-14T11:14:35.077+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-3] response status : 200 OK response info : { body : nothingReturn }
queryParam이 존재하는 경우
2024-07-14T11:11:59.323+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-2] request info : { routeUri : <http://localhost:8088> routeId : backend-route method : GET query : {name=[qoqo12], age=[1234]} uri : <http://localhost:8080/test/query?name=qoqo12&age=1234> }
2024-07-14T11:11:59.325+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-2] response status : 200 OK response info : { body : qoqo121234 }
body가 존재하는 경우
2024-07-14T11:04:25.211+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-1] request info : { routeUri : <http://localhost:8088> routeId : backend-route method : POST body : {"username":"wowo","age":"444"} uri : <http://localhost:8080/test/body> }
2024-07-14T11:04:25.211+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-1] response status : 200 OK response info : { body : {"username":"wowo","age":444} }
잘 나오는 걸 확인할 수 있다.
추가적으로 400과 500 에러를 던져보면
2024-07-14T11:18:16.060+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-4] request info : { routeUri : <http://localhost:8088> routeId : backend-route method : GET query : {code=[400]} uri : <http://localhost:8080/test/makeError?code=400> }
2024-07-14T11:18:16.061+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-4] response status : 400 BAD_REQUEST response info : { body : 400Error }
2024-07-14T11:18:56.065+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-6] request info : { routeUri : <http://localhost:8088> routeId : backend-route method : GET query : {code=[500]} uri : <http://localhost:8080/test/makeError?code=500> }
2024-07-14T11:18:56.065+09:00 INFO 93558 --- [ctor-http-nio-3] c.g.gateway.filters.LoggingGlobalFilter : [deca9173-6] response status : 500 INTERNAL_SERVER_ERROR response info : { body : 500Error }
잘 로깅되는 모습을 확인할 수 있다.
이제 로그를 카프카에 발행해보자
Kafka Message Produce & Consume
우선 메세지를 생산하는건 gateway에서 수행하고, 해당 메세지들을 소비하는건 백엔드에서 하는 구조로 구현해보자.
메세지 발행을 필터에서 동기적으로 수행하게 되면 그만큼 api의 응답속도에 지연이 걸리게 되니 이걸 비동기 이벤트로 따로 빼서 구현해보자.
KafkaMEssageProduceEvent
@Getter
@Builder
public class KafkaMessageProduceEvent {
private String topic;
private String context;
}
KafkaMessageProduceEventPublisher
@Component
@RequiredArgsConstructor
public class KafkaMessageProduceEventPublisher {
private final ApplicationEventPublisher applicationEventPublisher;
public void publishKafkaMessage(KafkaMessageProduceEvent event){
applicationEventPublisher.publishEvent(event);
}
}
KafkaMessageProduceEventListener
@Component
@RequiredArgsConstructor
public class KafkaMessageProduceEventListener {
private final KafkaService kafkaService;
@Async
@EventListener
public void onKafkaMessageHandler(KafkaMessageProduceEvent event){
kafkaService.produceMessageToTopic(event.getTopic(), event.getContext());
}
}
KafkaService
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaService {
private final KafkaTemplate<String, String> kafkaTemplate;
public void produceMessageToTopic(String topic, String message){
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.whenComplete((result,ex)->{
if(ex==null){
log.debug("[producer] sent message=[{}] with offset=[{}]", message, result.getRecordMetadata().offset());
}
else log.warn("[producer] unable to send message.. {}",ex.getMessage());
});
}
}
간단하게 topic에 StringSerializer로 메세지를 발행하는 프로듀서를 만들었다. 여기서, 로그 토픽의 경우 requestId가 중요하지 그 순서는 중요하지 않다고 생각하여 key를 굳이 사용하지 않았다
(로그를 소비하는 시점에서 로그는 request id에 따라 request로그, response로그 여부만 묶어서 순서 상관 없이 추가해주면 되기에 굳이 Key값을 넣어줄 필요가 없다고 생각했다.)
그리고, 추후 카프카를 이용한 다른 기능을 구현할 수 있으니 토픽 하나에 대해서만 구체적으로 구현하지 않고 메세지를 특정 토픽에 보내는 이벤트 형식으로 만들었다.
BE 서버에서 메세지를 consume하는 부분을 이어서 간단하게 만들어보자
@Configuration
@Slf4j
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootStrapAddress;
@Bean
public ConsumerFactory<String,String> consumerFactory(){
Map<String,Object> props=new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"be");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory=new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
위와 같이 consumer설정을 해준다. 나는 여러 인스턴스에서 컨슈머가 동일 토픽을 바라보기에 (be 서버 자체가 2개의 oracle 인스턴스 모두에 실행되기 때문) 컨슈머 그룹을 동일하게 묶어주었다.
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumeService {
@KafkaListener(topics = "${kafka.topic.log}",groupId = "be")
public void consumeString(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) String partition){
log.info("[consumer] msg:{} from partition {}",message,partition);
}
}
실제 메세지 리스너의 경우 간단하게 메세지를 받아 그대로 로그에 찍어보는 방식으로 구현했다.
이제 api 요청을 보내고 메세지를 발행 및 소비해보면
gateway, be, kafka server 모두에서 정상적으로 메세지가 발행되고 소비되는 것을 확인할 수 있다.
- 카프카 서버 오류 해결기
- 카프카에 메세지를 써도 확인이 안되길래 뭐가 이상하다 싶어 찾아보다 보니..
- When you operate Apache Kafka® in KRaft mode, you must set the process.roles property. This property specifies whether the server acts as a controller, broker, or both, although currently both is not supported for production workloads.
- https://docs.confluent.io/platform/current/kafka-metadata/config-kraft.html
- 이런 말이 적혀있어 기존 2개의 인스턴스에서 combine모드로 실행하던 카프카를 broker, controller를 분리해서 각각 실행하는 방식으로 변경했다.
- 연결은 잘 된 것으로 확인됨..
- 2024-07-14T15:32:42.776+09:00 INFO 34605 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
- org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support METADATA
- 근데, 위와 같은 오류가 발생했다..
- 뭔가 카프카 서버 자체에 문제가 생기는 것 같아 보여 ec2인스턴스와 gcp 인스턴스 모두에 대해서 테스트가 필요하다고 생각했고 자주 재부팅을 할 예정이니
- rc.local 파일 실행으로 스토리지 포맷 과정을 단축시켜주자
- 현재 상황을 정리하면 ec2에 controller 1, broker 1 gcp에도 controller 1 , broker 1 이렇게 구성되어 있고 이들을 켜고 난 후 메세지를 발행해보면 아무런 반응이 없고 모니터링 지표상 부하가 100%까지 치솟는걸 알 수 있다.
- 그렇다면 이 Controller 2 - broker 2의 구성 자체가 문제일까 ?
- kafka kraft 공식 문서에도 나와있듯 여러개의 컨트롤러와 브로커로 이루어진 구조는 내구성이 좋아지는 구조이지 불가능한 구조는 아니라고 생각이 들었고, ec2 - gcp 인스턴스간 통신하는 부분에서 성능상 부하가 심할 수 있다고 생각해서 하나의 인스턴스에서 동일한 환경을 만들어 실험해보려 한다
- EC2 내에서 controller2 -broker2 구성하기
- controller를 2개 켜고 broker를 2개켜는 순간 먼저 켜진 브로커가 꺼지거나 두번째 실행한 브로커가 실행되지 않는 현상이 발생했다.
- 또한, 컨트롤러가 처음 실행 시 서로를 찾아 연결을 하지만 오류가 터지면 갑자기 서로에게서 연결을 해제하는 순환이 반복되었다.
- 인스턴스 성능의 문제일 수 있다 생각하여 ec2 인스턴스 내 로그 파일들을 지워가며 저장공간도 많이 확보했으나.. 문제는 여전히 발생했다.
- EC2 내에서 server (combine) 2개로 구성하기
- 동일했다.. 문제가 지속적으로 발생했고 부하가 100%에 도달하며 인스턴스가 뻗어버렸다
- EC2 내에서 controller1 - broker 1로 구성하기
- 정상적으로 토픽 생성도 되었고 메세지 생산,소비도 되었다.
- EC2 내에서 server (combine) 1개로 구성하기
- 정상적으로 수행되는 것 같았는데 이상하게 topic 생성은 되지만 producer나 consumer까지 실행하니 뻗어버렸다.
- 원인을 찾다보니
- ‣
- 여기서 extraConfig에 replication을 1로 주라는 이야기를 보고 줘보니 문제가 사라졌다..
- 이런 부분에서 더욱 성능에 대한 문제가 있다고 생각이 들었고 ec2 인스턴스의 경우 gateway역할을 수행하고 있기에 추가적인 부하를 주는 것은 좋지 않아 보여 gcp인스턴스에 kafka를 1개의 서버로 돌리는게 맞다고 판단헀다.
- GCP
- controller2 - broker2, server2 두 방식은 ec2와 같이 작동하지 않고 인스턴스가 뻗어버렸다.
- 단일 server로 시작하니 ec2와 마찬가지로 잘 작동했다..
- 카프카에 메세지를 써도 확인이 안되길래 뭐가 이상하다 싶어 찾아보다 보니..
회사일도 계속 해야 하다보니 균팡에 꽤나 소홀해져있었고 그 결과 프로젝트를 밀도있게 진행하지는 못했던 것 같다.. 그래서 그런지 유난히 트러블 슈팅에 애를 많이 썼던 것 같다..! 어쨌든 이제 gateway에서 요청과 응답에 대한 정보를 kafka를 통해 나타낼 수 있게 되었으니 다음 단계로 넘어가서 이제 백엔드 서버 자체의 상태를 모니터링해보자 ..!
참고
https://jybaek.gitbook.io/with-gcp/appendix/gce_to_ssh
https://kafka.apache.org/quickstart
https://www.geuni.tech/ko/kafka/kafka_introduce_install_cluster/
https://presentlee.tistory.com/12
https://aeong-dev.tistory.com/10
https://www.ipaddressguide.com/ping
https://inpa.tistory.com/entry/WEB-🌐-CIDR-이-무얼-말하는거야-⇛-개념-정리-계산법#192.168.10.70/26_풀이법_▼
https://blog.naver.com/anysecure3/221621485933
https://kafka.apache.org/35/documentation/#listener_configuration
https://stackoverflow.com/questions/76539641/kafka-without-zookeeper-brokers-failed-after-couple-min
'Gyunpang' 카테고리의 다른 글
9. pinpoint로 서버를 모니터링해보자 (0) | 2024.08.17 |
---|---|
프로젝트 구조 중간점검 (0) | 2024.07.27 |
7. Gateway 로깅 및 인증 기능 구현하기(1) (1) | 2024.07.23 |
(번외) docker container scale 조정 시 github action에서만 recreate된다.. (0) | 2024.04.13 |
6. 여러대의 인스턴스에 무중단 배포하기 (0) | 2024.04.06 |