티스토리 뷰
목차
1. 트랜잭셔널 메시징
2. Amazon SQS
3. SQSListener
Github OAuth2 인증을 통해 테이블에 Account 계정을 생성하고 Github 계정 정보가 변경된 경우 Member 마이크로 서비스의 Account 레플리카를 업데이트 하는 기능을 개발한다. 트랜잭셔널 아웃박스 패턴과 폴링 발행기 패턴을 활용해 DB 업데이트와 메시지 발행에서 마이크로 서비스간의 데이터 일관성이 깨지는 문제를 해결한다.
DB 업데이트와 메시지 전송을 한 트랜잭션으로 묶지 않으면 마이크로 서비스간 데이터 일관성이 깨지는 문제가 발생할 수 있다. 예를 들어, DB 업데이트 후 메시지를 발행했지만 트랜잭션을 커밋하기 전에 서비스에 문제가 생겨 서버가 종료되는 경우 메시지는 전송되었지만 DB에는 반영되지 않는 문제가 생길 수 있다.
DB를 업데이트하는 트랜잭션의 일부로 메시지를 전송하도록 하더라도 메시지 전송이 실패하는 경우 트랜잭션이 롤백 되도록 할 수는 있지만 메시지는 전송되고 DB 업데이트에는 실패하는 경우가 발생할 수 있다.
따라서, 변경되는 DB의 트랜잭션으로 메시지를 함께 저장하면 데이터 일관성을 유지할 수 있다.
1. 트랜잭셔널 메시징
1) 트랜잭셔널 아웃박스 패턴 (Transactional Outbox Pattern)
DB 업데이트와 이벤트 생성을 트랜잭션으로 묶고 이벤트 리스너에서 도메인 이벤트를 받아 Outbox 테이블에 저장하도록 작성한다.
[ CustomOAuth2UserService ]
@RequiredArgsConstructor
@Service
public class CustomOAuth2UserService implements OAuth2UserService<OAuth2UserRequest, OAuth2User> {
private final OAuth2UserDetailsService oAuth2UserDetailsService;
private final ObjectMapper objectMapper;
@Override
public OAuth2User loadUser(OAuth2UserRequest userRequest) throws OAuth2AuthenticationException {
// ...
OAuth2UserDetails userDetails = oAuth2UserDetailsService
.saveOrUpdateAccount(objectMapper.convertValue(attributes, Map.class));
}
}
- Github AOuth2 로그인이 성공하면 CustomOAuth2UserService에서 Github 계정 정보를 가져와 LoginAccountService.saveOrUpdateAccount()를 호출해 DB에 저장한다.
[ LoginAccountService, AccountEventPublisher ]
@RequiredArgsConstructor
@Transactional
@Service
public class LoginAccountService implements OAuth2UserDetailsService {
private final AccountRepository accountRepository;
private final AccountEventPublisher accountEventPublisher;
@Override
public LoginAccount saveOrUpdateAccount(Map<String, Object> accountAttributes) {
LoginAccount loginAccount = LoginAccount.of(accountAttributes);
Optional<Account> optional = accountRepository.findByUserName(loginAccount.getUserName());
Account account = null;
if (optional.isPresent()) {
account = optional.get();
updateAccount(account, loginAccount);
accountEventPublisher.publishUpdatedEvent(account);
}
if (optional.isEmpty()) {
account = createAccount(loginAccount);
accountRepository.save(account);
accountEventPublisher.publishCreatedEvent(account);
}
return LoginAccount.of(account);
}
}
@RequiredArgsConstructor
@Transactional
@Service
public class AccountEventPublisher {
private final ApplicationEventPublisher eventPublisher;
public void publishEvent(Account account, EventType eventType) {
AccountEventMapper eventMapper = new AccountEventMapper(account, eventType);
eventPublisher.publishEvent(eventMapper.toAccountEvent());
}
public void publishUpdatedEvent(Account account) {
publishEvent(account, EventType.UPDATED);
}
public void publishCreatedEvent(Account account) {
publishEvent(account, EventType.CREATED);
}
}
- .LoginAccountService는 DB의 Account 테이블을 변경하고 AccountCreated 혹은 AccountUpdated 이벤트를 생성한다.
- Account 테이블의 변경과 ApplicationEventPublisher의 publishEvent()는 한 트랜잭션으로 묶여있다.
[ AccountOutboxService ]
@Slf4j
@RequiredArgsConstructor
@Transactional
@Service
public class AccountOutboxService {
private final AccountOutboxRepository accountOutboxRepository;
@EventListener
public void saveOutbox(AccountEvent accountEvent) {
OutboxMapper outboxMapper = new OutboxMapper();
AccountOutbox outbox = outboxMapper.createOutbox(AccountOutbox.class, accountEvent);
accountOutboxRepository.save(outbox);
}
}
- OutboxService에서 발행한 Account 이벤트를 받아 Outbox 테이블에 저장한다.
- @Transactional, @EventListener
- @Transactional 로 트랜잭션을 확장해 이벤트 발행과 Outbox 저장을 하나의 트랜잭션으로 묶는다.
- @saveOutbox에서 ObjectMapper가 동작하거나 Outbox 테이블을 변경하는 중에 예외가 발생하면 Account의 DB 변경 트랜잭션도 롤백된다.
2) 메시지 발행
폴링 발행기 패턴 (Polling Publisher Pattern)
- Outbox 테이블을 지속적으로 Polling 해서 메시지 브로커로 전송한다.
- DB를 자주 폴링하면 비용이 발생하고 NoSQL DB는 쿼리 능력에 따라 사용 가능 여부가 결정된다.
- 따라서 규모가 작을 경우 사용할 수 있고 DB 트랜잭션 로그 테일링이 좀 더 정교하고 성능이 좋은 방법이다.
로그 테일링 패턴 (Log Tailing Pattern)
- 트랜잭션 로그 마이너로 DB 트랜잭션 로그를 읽어 Outbox 테이블의 변경분을 하나씩 메시지 브로커로 전송한다.
- 트랜잭션 로그 마이너는 디비지움, DynamoDB 스트림즈 등이 있다.
예제에서는 메시지 발행에 단순하게 적용할 수 있는 방법인 폴링 발행기 패턴을 활용한다.
[ MessageRelayService ]
@RequiredArgsConstructor
@Transactional
@Service
public class MessageRelayService {
private final AccountOutboxRepository accountOutboxRepository;
private final AccountProducer accountProducer;
@Value("${cdc.batch_size}")
private int batchSize;
@Scheduled(fixedDelayString = "${cdc.polling_ms}")
public void sendAccountEvent() {
List<AccountOutbox> outboxes = accountOutboxRepository.
findAllByPublishedIsFalseOrderByIdAsc(Pageable.ofSize(batchSize))
.toList();
accountProducer.send(outboxes);
outboxes.forEach(Outbox::publish);
}
}
@EnableScheduling
@SpringBootApplication
public class PapacoAuthServiceApplication {
public static void main(String[] args) {
SpringApplication.run(PapacoAuthServiceApplication.class, args);
}
}
- 간단하게 스프링의 Scheduling 기능을 사용해 Outbox 테이블을 폴링하도록 했다.
- @Scheduled(fixedDelayString = "${cdc.polling_ms}")
- application.properties에 작성한 시간을 주기로 동작한다.
- findAllByPublishedIsFalseOrderByIdAsc()
- published 값이 false인 (전송되지 않은) 데이터만 조회한다.
- accountProducer.send(outboxes);
- AccountProducer는 인터페이스다.
- 인터페이스 구현체인 AccountSQSProducer를 통해 Amazon SQS로 메시지를 발행한다.
- outboxes.forEach(Outbox::publish);
- 전송한 Outbox 는 필드를 업데이트한다.
[ Outbox, AccountOutbox ]
@Getter
@MappedSuperclass
@EntityListeners(AuditingEntityListener.class)
public abstract class Outbox {
@CreatedDate
LocalDateTime createdAt;
boolean published;
LocalDateTime publishedAt;
public void publish() {
this.published = true;
this.publishedAt = LocalDateTime.now();
}
public abstract Outbox create(DomainEvent event, String payload);
}
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
public class AccountOutbox extends Outbox {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private Long aggregateId;
@Column(nullable = false)
private String aggregateType;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
private EventType eventType;
@Lob
@Column(nullable = false)
private String payload;
public AccountOutbox(Long aggregateId, String aggregateType, EventType eventType, String payload) {
this.aggregateId = aggregateId;
this.aggregateType = aggregateType;
this.eventType = eventType;
this.payload = payload;
}
@Override
public Outbox create(DomainEvent event, String payload) {
return new AccountOutbox(event.getAggregateId(), event.getClass().getSimpleName(), event.getEventType(), payload);
}
}
- createdAt, published, publishedAt
- 이벤트 재발행을 위한 필드
- 이벤트 타입, 시간, 애그리거트 속성으로 조회해 재발행할 수 있다.
- 이벤트 재발행은 배치를 통해 이루어진다.
[ application.properties ]
cdc.polling_ms=5000
cdc.batch_size=50
- 5초마다 Outbox 테이블을 폴링한다.
account 테이블 업데이트와 outbox 테이블 저장이 같은 DB 트랜잭션으로 묶여있기 때문에 메시지는 전송되었지만 account 만 업데이트되는 문제가 해결되었고, 메시지 전송과 outbox 테이블 삭제가 한 트랜잭션으로 묶여있기 때문에 메시지 전송에 실패하면 outbox 테이블도 삭제되지 않는다.
2. Amazon SQS
SQS 접근을 위한 IAM 사용자와 SQS를 생성해 애플리케이션에서 메시지를 전송한다.
MemberService에서는 SQS를 폴링해 Account 레플리카를 업데이트한다.
1) IAM 사용자 생성
SQS 서비스에 접근하기 위한 IAM 사용자그룹을 다음과 같이 두 개의 권한을 갖도록 생성한다.
- AmazonEC2FullAccess
- AdministratorAccess
사용자 그룹에 IAM 사용자를 생성한다.
액세스 유형 프로그래밍 방식 액세스로 선택한다.
생성된 사용자의 액세스 키 ID와 비밀 액세스 키는 애플리케이션에서 SQS에 메시지를 전송하는데 사용한다.
2) Amazon SQS 생성
fifo 방식의 SQS를 생성한다.
2) 애플리케이션 적용
[ build.gradle ]
dependencyManagement {
imports {
mavenBom "io.awspring.cloud:spring-cloud-aws-dependencies:2.3.3"
}
}
dependencies {
// aws
implementation 'io.awspring.cloud:spring-cloud-aws-context'
implementation 'io.awspring.cloud:spring-cloud-starter-aws-messaging'
}
스프링에서 aws를 사용하기 위해 의존성을 추가한다.
- io.awspring.cloud:spring-cloud-aws-context
- io.awspring.cloud:spring-cloud-starter-aws-messaging
[ application.properties ]
cloud.aws.region.static=ap-northeast-2
cloud.aws.region.auto=false
cloud.aws.stack.auto=false
cloud.aws.credentials.access-key=[access-key]
cloud.aws.credentials.secret-key=[secret-key]
cloud.aws.end-point.uri=[queue-end-point-uri]
- access-key : IAM 사용자 access key
- secret-key : IAM 사용자 secret key
- queue-end-point-uri : SQS URL
[ SQSConfig ]
@Configuration
public class SQSConfig {
@Value("${cloud.aws.credentials.access-key}")
private String accessKey;
@Value("${cloud.aws.credentials.secret-key}")
private String secretKey;
@Value("${cloud.aws.region.static}")
private String region;
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync){
return new QueueMessagingTemplate(amazonSQSAsync());
}
private AmazonSQSAsync amazonSQSAsync() {
BasicAWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonSQSAsyncClientBuilder
.standard()
.withRegion(region)
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build();
}
}
- QueueMessagingTemplate을 스프링 빈으로 등록한다.
- IAM 정보와 국가 정보를 함께 넣어준다.
[ AccountSQSProducer ]
@Slf4j
@RequiredArgsConstructor
@Component
public class AccountSQSProducer implements AccountProducer {
private final QueueMessagingTemplate queueMessagingTemplate;
@Value("${cloud.aws.end-point.uri}")
private String endpoint;
@Transactional
@Override
public void send(List<AccountOutbox> outboxes) {
outboxes.forEach(outbox -> {
Message<String> message = MessageBuilder.withPayload(outbox.getPayload())
.setHeader(SqsMessageHeaders.SQS_GROUP_ID_HEADER, String.valueOf(outbox.getAggregateId()))
.setHeader(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, String.valueOf(outbox.getId()))
.setHeader(OutboxMessageHeaders.AGGREGATE_TYPE, outbox.getAggregateType())
.build();
queueMessagingTemplate.send(endpoint, message);
log.info("Message: {}, {}, {}", outbox.getAggregateId(), outbox.getId(), message);
});
}
}
- FIFO 방식으로 메시지를 전달하기 위해 두 개의 헤더가 필요하다.
- SQS_GROUP_ID_HEADER : account의 id, 메시지 그룹을 만들어 메시지가 순서대로 처리되도록 한다.
- SQS_DEDUPLICATION_ID_HEADER : outbox 의 id, 중복 메시지 전송을 5분간 방지한다.
3. SQSListener
Member Service에서 SQS를 폴링해 Auth Service에서 전송한 메시지를 가져온다.
Account 이벤트를 읽어 Member Service 내의 Account 레플리카를 업데이트한다.
[ build.gradle, application.properties ]
Auth Service의 설정과 동일하다.
[ AccountSQSConsumer ]
메시지는 전송했는데 outbox 테이블을 삭제하는 과정에서 문제가 발생하면 이 후에 메시지를 중복 전송하는 문제가 생길 수 있다.
Amazon SQS의 FIFO 방식으로 메시지를 전송할 때 SQS_DEDUPLICATION_ID_HEADER 정보를 함께 넣어 주는데 이 값은 메시지가 중복인지를 확인하는데 사용된다. 이는 중복 메시지를 5분 동안 전송하지 않는데 사용되기 때문에 이 후에는 중복 메시지가 전송될 수 있다. 따라서 확실한 처리는 메시지를 받는 Member Service에서 해주어야 한다.
@Slf4j
@RequiredArgsConstructor
@Transactional
@Service
public class AccountSQSConsumer implements AccountConsumer {
private final ProcessedEventRepository eventRepository;
private final AccountRepository accountRepository;
private final ObjectMapper objectMapper;
@Override
@SqsListener(value = "sqs-login-account-update.fifo", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receive(String payload, @Headers Map<String, String> headers) {
Long messageId = Long.getLong(headers.get(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER));
Long aggregateId = Long.getLong(headers.get(SqsMessageHeaders.SQS_GROUP_ID_HEADER));
String aggregateType = headers.get(OutboxMessageHeaders.AGGREGATE_TYPE);
checkDuplicate(messageId, aggregateType);
process(payload, headers);
saveProcessedMessage(messageId, aggregateId, aggregateType);
}
@Override
public void checkDuplicate(Long duplicateId, String aggregateType) {
if (eventRepository.findByIdAndAggregateType(duplicateId, aggregateType).isPresent()) {
throw new MessageReceiveDuplicatedException(duplicateId);
}
}
@Override
public void process(String payload, Map<String, String> headers) {
try {
AccountEvent accountEvent = objectMapper.readValue(payload, AccountEvent.class);
accountRepository.findById(accountEvent.getAggregateId())
.ifPresentOrElse(
entity -> entity.update(accountEvent.getName(), accountEvent.getEmail()),
() -> saveAccount(accountEvent)
);
} catch (JsonProcessingException | IllegalArgumentException e) {
log.error("payload={}, headers={}", payload, headers, e);
throw new MessageProcessingFailedException(e);
}
}
private void saveAccount(AccountEvent accountEvent) {
Account account = accountEvent.toAccount();
accountRepository.save(account);
}
@Override
public void saveProcessedMessage(Long messageId, Long aggregateId, String aggregateType) {
eventRepository.save(new ProcessedMessage(messageId, aggregateId, aggregateType));
}
}
- @SqsListener
- value: SQS 큐 이름
- deletionPolicy: 메시지 삭제 방식 정의
- SqsMessageDeletionPolicy.ON_SUCCESS : receive() 메소드에서 예외가 발생하지 않으면 삭제
- checkDuplicate() : 메시지를 중복 수신했는지 확인한다.
- process() : 메시지를 받아 처리한다.
- SQSMessageProcessingFaildException
- 다음 두 개의 예외가 발생했을 때 잡아서 명시적으로 커스텀 예외로 처리한다.
- objectMapper.readValue(payload, AccountEvent.class) → JsonProcessingException
- accountRepository.save(account) → IllegalArgumentException
- 다음 두 개의 예외가 발생했을 때 잡아서 명시적으로 커스텀 예외로 처리한다.
- 예외가 발생하지 않으면 Account를 업데이트 한다.
- SQSMessageProcessingFaildException
- saveProcessedMessage() : 메시지 처리가 완료되었으면 수신한 Account 이벤트를 저장한다
- 메시지 중복 체크에 활용한다.
참고
'MSA' 카테고리의 다른 글
비동기 메시징(SNS-SQS Fanout) (0) | 2022.12.02 |
---|
- Total
- Today
- Yesterday
- 이벤트 스토밍
- 학습 테스트
- 클린코드
- JPA
- Stream
- MySQL
- 스프링 카프카 컨슈머
- H2
- 스프링 예외 추상화
- Spring Boot
- Git
- 트랜잭셔널 아웃박스 패턴
- named query
- HTTP 헤더
- TDD
- http
- spring rest docs
- 육각형 아키텍처
- 폴링 발행기 패턴
- clean code
- ATDD
- java8
- Spring
- mockito
- 도메인 모델링
- Ubiquitous Language
- 계층형 아키텍처
- Spring Data JPA
- 마이크로서비스 패턴
- kafka
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |