안녕하세요 회사와 함께 성장하고 싶은 KOSE입니다.

 

Redis는 인메모리 데이터베이스로 Key-Value 형태로 데이터를 저장하고 조회할 수 있습니다.

Spring에서는 CrudRepository나 RedisTemplate 등으로 Redis에 데이터를 저장할 수 있습니다.

저는 토이 프로젝트에서 간편하게 값을 저장 및 조회할 수 있고 어노테이션으로 인덱스를 간편하게 생성할 수 있는 CrudRepository를 사용하였습니다. 하지만 프로젝트 규모가 커지다보니 MSA 아키텍처 간에 객체에 대한 유기적인 사용이 필요하였습니다.

 

그 예로 AccessToken과 같은 인증 객체가 있습니다. 인증 객체는 회원이 로그인에 성공하면 인증 토큰을 발급합니다. 인증 토큰을 발급하는 주체가 A 서버라고 한다면, 다른 B 혹은 C 등의 서버에서 인증 토큰 객체를 조회하여야 할 수 있습니다.  이 경우 " _class" 형태로 저장된 값은 다른 B, C 서버에서 객체로 역직렬화를 어렵게 합니다. 일반적으로 역직렬화는 클래스 정보가 매핑되어야 객체로의 변환이 가능합니다. 하지만 " _class"로 저장된 값을 서버별로 상이한 class로 매핑하는 과정은 어려운 일이고 해당 객체를 위해 통일된 객체를 선언하는 것도 한계가 있습니다.

 

따라서, 이번 글은 제가 겪었던 CrudRepository에서의 객체 저장 방식과 역직렬화의 한계를 살펴보고, Redistemplate로 수정하면서 자유롭게 객체로 역직렬화를 할 수 있었던 점, 나아가 분산락과 트랜잭션을 적용하고 ThreadLocal로 생명주기를 관리한 것에 대해서 정리하도록 하겠습니다.

 

 

 

1. 직렬화/역직렬화

 

직렬화(Serialization)는 자바 객체를 Byte 형태로 변환화는 과정을 의미합니다. Byte 형태로 객체를 데이터베이스에 저장하기 때문에 용량을 줄일 수 있고 데이터가 문자열 형태로 저장되기 때문에 Redis, MongoDB 등 다양한 저장소에 저장할 수 있습니다.

 

역직렬화는 직렬화 형태로 저장된 데이터를 자바(여기서는 스프링을 사용하므로 자바라고 하겠습니다) 객체로의 변환하는 것을 의미합니다. 직렬화된 데이터를 역직렬화할 때 장점은 데이터를 유연하게 객체화할 수 있다는 점입니다.

직렬화하는 대상에 사용되는 클래스 타입에 제약을 받지 않기에 필요하다면 에러가 발생하지 않는 선에서 유기적으로 객체를 수정할 수 있습니다.

 

 

 

2. CrudRepository의 저장방식

 

CrudRepository를 사용하면 객체를 데이터로 저장할 때 RedisHash 형태로 객체를 저장합니다. RedisHash는 Hash의 Field와 Value로 매핑되는 자료구조로 객체를 Hash화 하여 저장하는 개념입니다. 즉 key-value 구조 안에서 해시 구조를 만들어 저장하는데, 이러한 해시 구조를 활용할 때 장점은 객체의 field를 hash의 field로 저장하기 때문에 데이터를 빠르게 읽어올 수 있습니다.

 

<AccessToken.java>

 

@Getter
@NoArgsConstructor
public class AccessToken extends Token implements Serializable {

    private AccessToken(String id, String userId, long expiration) {
        super(id, userId, expiration);
    }
}

 

 

<AccessTokenRedisRepository.java>

 

public interface AccessTokenRedisRepository extends CrudRepository<AccessToken, String> {
}

 

<Redis-cli 결과>

 

 

객체가 Redis에 저장될 때, 각 필드에 매핑되는 값을 확인할 수 있습니다. CrudRepository를 활용할 때, @Indexed 어노테이션으로 간단하게 Index를 생성하여 값을 저장할 수 있습니다.

 

<Token.java>

 

@Getter
@NoArgsConstructor
@AllArgsConstructor
public abstract class Token {

    @Id
    private String id;

    @Indexed
    private String userId;

    @TimeToLive(unit = TimeUnit.MILLISECONDS)
    private long expiration;
}

 

인덱스는 RDBMS 데이터베이스에서 테이블 내 데이터를 빠른 검색과 정렬을 위한 데이터 구조입니다. 특정 열에 대한 인덱스를 생성하면 해당 열의 값들이 저장되어 빠르게 검색할 수 있습니다. 덱스가 아닌 컬럼에 대한 where 절의 검색 조건은 쿼리 실행 계획을 확인하면 전체 검색으로 실행되는 것을 볼 수 있습니다. 하지만, 인덱스로 선언된 컬럼을 검색하게 되면 값을 Hash화 하여 인덱스의 해시테이블에서 값을 가져오는 개념이기 때문에 빠르고 효율적인 검색이 가능합니다.

 

Redis에서 인덱스는 어떠한 개념일까요 ? RDBMS와 다르게 Redis는 Key-value 형태로 값을 저장합니다. 따로 인덱스 열을 생성하는 개념이 아니라 key-value로 인덱스 값과 해당 key가 참조하고 있는 객체의 Id를 value로 참조하고 있는 형태입니다. Sorted set, Set, Hash는 Redis에서 활용할 수 있는 인덱스의 자료구조 형태입니다.

 

AccessToken은 userId라는 필드를 가지고 있고 해당 필드가 @Indexed로 선언되어 있습니다. accessToken은 해당 토큰의 발급 주체가 누구이냐가 보안성에서 중요한 역할을 수행합니다. 뿐만 아니라 예상하지 못한 문제로 동일 userId로 여러 개의 토큰이 생성될 수 있으므로 이를 제어하기 위해 해당 값을 index 화해서 사용하였습니다.

 

이처럼 CrudRepository는 객체를 해시화하여 Redis에 저장하는 것과 Index를 활용하는 것을 용이하게 해주기 때문에 효율적으로 활용할 수 있습니다.

 

 

 

3. CrudRepository의 한계

 

CrudRepository의 단점은 객체를 해시화하여 저장하기 때문에 "_class"라는 값이 생성되게 됩니다. 이는 곧 유연한 확장성에 어려움을 줄 수 있습니다.  "_class"는  해시화된 데이터를 객체로 변환하는 과정에서 매핑 정보를 담고 있는 class 정보입니다. 만약 A 서버에서 Redis에 객체와 인덱스 정보까지 모두 저장하였다 하더라도 "_class"가 매핑되지 않게 구현된 B Server에서 해당 데이터를 CrudRepository.findById or findByUserId를 활용했을 때 객체를 찾을 수 없는 문제가 발생할 수 있습니다.

 

B 서버는 A 서버의 클래스 정보와 같지 않을 가능성이 매우 높으며 상황에 따라, 해시화되어 있는 필드에 정확하게 일치하는 필드가 없을 수도 있습니다. 즉 유동적으로 변경될 수 있는데, RedisHash로 저장된 값은 이러한 유연한 변경이 어렵습니다.

 

그렇다면 현재 주어진 프로젝트는 객체를 redis에 저장 가능해야하고, "_class"정보가 다르더라도 다른 객체에 매핑이 가능해야 하며, Index까지 활용할 수 있어야 한다면 어떠한 방식으로 객체를 저장해야 할까요?

 

 

 

4. Redistemplate 활용하기

 

Redistemplate를 활용하면 객체가 직렬화를 거치면서 클래스 매핑정보가 저장되지 않게 비교적 쉽게 구현할 수 있습니다. 그리고 역직렬화할 때는 매핑할 객체에 대한 class 정보를 추가할 수 있기에 유연하게 객체를 저장 및 조회할 수 있습니다. 또한 Redistemplate의 key-value를 활용하여 커스텀한 인덱스 형태로 값을 저장할 수 있습니다.

 

이제 코드로 적용해 보도록 하겠습니다.


<RedisConfig.java>

 

@Configuration
public class RedisConfig {

    @Value("${spring.data.redis.host}")
    private String host;

    @Value("${spring.data.redis.port}")
    private int port;

	-- 중략 --
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }
}

 

먼저 RedisTemplate를 활용하여 String의 key값으로 Object를 저장하도록 설정합니다.

 

redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());

 

Redis는 기본적으로 Byte 형태로 Redis에 저장합니다. 하지만 가독성을 위해, 사용자의 입장에서는 문자열로 보이는게 좋습니다. 따라서, redistemplate는 다음과 같이 StringRedisSerializer()를 제공합니다. 이는 바이트 형태로 보여지는 문자열을 사용자가 쉽게 확인할 수 있도록 보여줍니다.

 

StringRedisSerializer()를 사용하지 않은 경우

 

StringRedisSerializer()를 사용한 경우

 

 

만약 사용하지 않더라도, RedisTemplate는 defaultSerializer로 JdkSerializationRedisSerializer를 제공합니다. 따라서 필수값은 아니지만, 디버깅할 때 매우 유용하게 사용할 수 있으므로 StringRedisSerializer를 적용하였습니다.

 

 

<TokenRedissonRepository.java>

 

@Component
public interface TokenRedisTemplateRepository<T extends Token> {

    void saveToken(String key, T t) throws JsonProcessingException;

    T findTokenByKey(String key, Class<T> clazz) throws JsonProcessingException;

    void saveTokenIdx(String key, T t) throws JsonProcessingException;

    String findTokenIdxValue(String key, Class<T> clazz) throws JsonProcessingException;

    T findTokenByIdx(String key, Class<T> clazz) throws JsonProcessingException;

    void deleteToken(String key, Class<T> clazz) throws JsonProcessingException;
    
    void deleteTokenIdx(String key, Class<T> clazz) throws JsonProcessingException;

}

 

먼저, 제네릭으로 선언하여 Token을 상속하는 AccessToken과 RefreshToken이 TokenRedisRepository를 활용할 수 있도록 선언하였습니다. Crud에 필요한 save, find, delete 등의 기본적인 메서드와 인덱스를 활용하기 위해 인덱스를 저장하고 인덱스로 값을 검색하는 커스텀 메서드를 추가하였습니다.

 

이후, 구현체에서 필요한 코드를 작성하면 다음과 같습니다.

 

@Slf4j
@Component
@RequiredArgsConstructor
public class TokenRedisTemplateRepositoryImpl<T extends Token> implements TokenRedisTemplateRepository<Token> {
    private final RedisTemplate<String, Object> redisTemplate;

    private final ObjectMapper objectMapper;

    private final static String IDX = "idx";

    @Override
    public void saveToken(String key, Token token) throws JsonProcessingException {
        key = getKey(token.getClass().getSimpleName(), key);
        redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(token));
        redisTemplate.expire(key, token.getExpiration(), TimeUnit.MILLISECONDS);
    }

    @Override
    public Token findTokenByKey(String key, Class<Token> clazz) throws JsonProcessingException {
        key = getKey(clazz.getSimpleName(), key);
        return getObjectValue(key, clazz);
    }

    @Override
    public void saveTokenIdx(String key, Token token) throws JsonProcessingException {
        key = getKey(token.getClass().getSimpleName(), key, IDX);
        redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(token.getId()));
        redisTemplate.expire(key, token.getExpiration(), TimeUnit.MILLISECONDS);
    }

    @Override
    public String findTokenIdxValue(String key, Class<Token> clazz) throws JsonProcessingException {
        key = getKey(clazz.getSimpleName(), key, IDX);
        return getIdxValue(key);
    }

    @Override
    public Token findTokenByIdx(String key, Class<Token> clazz) throws JsonProcessingException {

        key = findTokenIdxValue(key, clazz);
        if (key == null) return null;

        return findTokenByKey(key, clazz);
    }

    @Override
    public void deleteToken(String key, Class<Token> clazz) throws JsonProcessingException {
        key = getKey(clazz.getSimpleName(), key);
        redisTemplate.delete(key);
    }

    @Override
    public void deleteTokenIdx(String key, Class<Token> clazz) throws JsonProcessingException {
        key = getKey(clazz.getSimpleName(), key, IDX);
        redisTemplate.delete(key);
    }

    private Token getObjectValue(String key, Class<Token> clazz) throws JsonProcessingException {
        String value = (String) redisTemplate.opsForValue().get(key);

        if (value == null || value.isEmpty()) return null;
        return objectMapper.readValue(value, clazz);
    }

    private String getKey(String... keys) {
        return String.join(":", keys);
    }

    private String getIdxValue(String key) throws JsonProcessingException {
        String value = (String) redisTemplate.opsForValue().get(key);
        if (ObjectUtils.isEmpty(value)) return null;
        return objectMapper.readValue(value, String.class);
    }
}

 

redisTemplate.opsForValue(). set()은 key와 객체를 redis에 직렬화하여 값을 저장하는 역할을 수행합니다.

class정보는 objectMapper.writeValueAsString() 단계에서 제거되며 직렬화된 바이트로 redis에 저장됩니다.

값을 가져온 후 필요한 매핑 정보를 제공하기 위해 class 타입을 명시하여 캐스팅할 수 있도록 하였습니다.

redistemplate.delete() 메서드는 key 값에 저장된 value를 지우는 역할을 수행합니다.

 

 

 

6. RedisTemplate 트랜잭션과 동시성 제어하기 

 

accessToken과 accessToken의 index, refreshToken과 refreshToken의 index를 저장 혹은 삭제하는 메서드는 반드시 동일 트랜잭션 내에서 처리되어야 합니다.

뿐만 아니라 동시성 문제도 해결되어야 합니다. 만약 데이터 일관성이 깨지거나 동시성 문제가 발생한다면, index로 token의 key를 찾았지만 key의 값이 없는 문제가 발생하거나 accessToken 혹은 refreshToken만 저장되는 문제가 발생할 수 있습니다. 따라서, redistemplate의 트랜잭션 제어 메서드와 redissonClient를 활용하여 분산락을 적용하였습니다.

 

주의할 점은, 분산락과 트랜잭션의 순서입니다.

 

 

분산락으로 먼저 요청 토큰에 대한 락을 획득하고, redistemplate로 redis에 저장하기 전에 트랜잭션을 시작합니다.
이후, 값이 모두 온전히 저장되면 커밋하고 실패할 경우 롤백합니다 이후 분산락이 해제되는 순서가 보장되어야 합니다.

 

보통 이러한 과정은 횡단 관심사에 속합니다. 메인 핵심 비즈니스 로직은 token 정보를 redis에 저장하는 것입니다.

따라서, 분산락과 트랜잭션의 시작과 종료는 AOP로 적용할 수 있습니다.

 

AOP는 pointcuts과 aspect가 존재하며, pointcuts으로 aop에 적용할 프로젝트 경로, 클래스 혹은 메서드를 적용할 수 있습니다. 이후 aspect에서 부가 기능들을 작성하며 프록시 객체를 호출하여 처리할 수 있습니다.

 

저는 token 정보를 저장하기 위해 CustomAnnotation을 생성하였고 aspect 적용이 필요한 메서드는 어노테이션을 추가하여 aop를 적용하였습니다.

 

<RedisTransactional.java>

 

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisTransactional {
}

 

<Pointcuts.java> 

 

@Pointcut("@annotation(liar.memberservice.common.aop.anno.RedisTransactional)")
public void transactionMethod() {};

 

pointcut에서 해당 어노테이션이 저장된 프로젝트 경로를 설정합니다.
이후, Aspect는 transactionMethod()의 경로를 참조하여 aop를 적용할 수 있습니다.

 

여기서 앞 서 정의한 분산락과 트랜잭션의 순서를 고려하여 AOP를 적용해야 합니다.

Aspect는 클래스 단위로 @Order() 어노테이션으로 aop의 순서를 정의할 수 있습니다. 

먼저 시작되어야 할 분산락에 aop를 먼저 적용한 후, 이어서 트랜잭션 aop가 실행되도록 @Order로 순서를 정의하였습니다.

 

<RedissonClientLockAspect.java>

 

@Around("liar.memberservice.common.aop.Pointcuts.transactionMethod() && args(userId, authorities)")
    public AuthTokenDto saveToken(ProceedingJoinPoint joinPoint, String userId, List<Authority> authorities)
            throws Throwable {
        String lockKey = "SaveUserToken: " + userId;
        return getTokenIfExistOrSetAndGetPointTransactionRedissonRLock(joinPoint, lockKey);
    }


    private AuthTokenDto getTokenIfExistOrSetAndGetPointTransactionRedissonRLock(ProceedingJoinPoint joinPoint,
                                                                                 String lockKey) throws Throwable {
        RLock lock = redissonClient.getLock(lockKey);
        try {
            boolean isLocked = lock.tryLock(60, TimeUnit.SECONDS);
            if (!isLocked) throw new RedisLockException();
            Object proceed = joinPoint.proceed();
            if (proceed == null) {
                return null;
            }
            return (AuthTokenDto) proceed;
        } finally {
            if (lock.isHeldByCurrentThread()) lock.unlock();
        }
    }

 

먼저 userId로 락을 설정한 후 joinPoint.proceed()로 JDK dynamic proxy 혹은 CGLIB proxy 객체를 호출하여 로직을 수행합니다.

 

이어서 트랜잭션이 적용될 수 있도록 aop를 추가합니다.

 

	@Around("liar.memberservice.common.aop.Pointcuts.transactionMethod() && args(userId, authorities)")
    public AuthTokenDto runWithTx(ProceedingJoinPoint joinPoint, String userId, List<Authority> authorities) throws Throwable {
        if (!txActive.get()) {
            redisTemplate.multi();
            txActive.set(true);
        }

        try {
            Object proceed = joinPoint.proceed();
            if (proceed != null) {
                return (AuthTokenDto) proceed;
            }
        } catch (Throwable throwable) {
            throw throwable;
        }
        return null;
    }

    @AfterReturning("liar.memberservice.common.aop.Pointcuts.transactionMethod()")
    public void commitTx() {
        if (txActive.get()) {
            try {
                redisTemplate.exec();
            } catch (Exception e) {
                redisTemplate.discard();
            } finally {
                txActive.remove();
            }
        }
    }
    
    @AfterThrowing("liar.memberservice.common.aop.Pointcuts.transactionMethod()")
    public void rollback() {
        try {
            redisTemplate.discard();
        }  catch (Exception e) {
            e.printStackTrace();
        } finally {
            txActive.remove();
        }
    }

 

txActive는 현재 트랜잭션을 나타내는 변수입니다. ThreadLocal로 스레드-세이프한 구조로 생성할 수 있습니다.

ThreadLocal.withInitial(() -> false)로 초기값을 false로 설정한 후 tx.Active가 아직 트랜잭션이 처리되지 않은 경우에
트랜잭션을 시작합니다.


이후 join.proceed()의 프록시 객체를 호출하며 문제가 없다면 @AfterReturning이 실행되며 트랜잭션을 종료하고 
메모리 누수를 막기 위해 생성한 ThreadLocal을 제거합니다. 

 

만약 이 과정에서 예외가 발생하면 rollback()메서드가 호출되거나 try-catch-finally 구문의

redistemplate.discard()로 요청을 롤백한 후 ThreadLocal을 제거합니다.

마지막으로 프록시 객체가 종료되면 먼저 요청했던 RedissonClient 분산락이 해제되며 메서드가 종료됩니다.

 

이제, 마지막으로 Service 로직을 작성하겠습니다.

 

@RedisTransactional
public AuthDto getTokenIfExistOrSetAndGet(String userId, List<Authority> authorities) throws JsonProcessingException {

    AuthDto authDto = findOrDeleteToken(userId);
    if (authDto != null) return authDto;

    return saveAndGetToken(userId, authorities);
}

private AuthDto findOrDeleteToken(String userId) throws JsonProcessingException {
    Token savedAccessToken = tokenRepository.findTokenByIdx(userId, AccessToken.class);
    Token savedRefreshToken = tokenRepository.findTokenByIdx(userId, RefreshToken.class);

    if (savedAccessToken != null && savedRefreshToken != null) {
        return new AuthDto(savedAccessToken.getId(), savedRefreshToken.getId(), userId);
    }

    else if (savedAccessToken != null) {
        tokenRepository.deleteToken(savedAccessToken.getId(), AccessToken.class);
        tokenRepository.deleteTokenIdx(savedAccessToken.getUserId(), AccessToken.class);
    }

    else if (savedRefreshToken != null) {
        tokenRepository.deleteToken(savedRefreshToken.getId(), RefreshToken.class);
        tokenRepository.deleteTokenIdx(savedRefreshToken.getUserId(), RefreshToken.class);
    }
    return null;
}

private AuthDto saveAndGetToken(String userId, List<Authority> authorities) throws JsonProcessingException {
    String accessToken = tokenProviderPolicy.createAccessToken(userId, authorities);
    String refreshToken = tokenProviderPolicy.createRefreshToken(userId, authorities);

    saveTokens(AccessToken.of(accessToken, userId, tokenProviderPolicy.getRemainingTimeFromToken(accessToken)),
            RefreshToken.of(refreshToken, userId, tokenProviderPolicy.getRemainingTimeFromToken(refreshToken)));

    return new AuthDto(accessToken, refreshToken, userId);
}

private void saveTokens(AccessToken accessToken, RefreshToken refreshToken) throws JsonProcessingException {
    tokenRepository.saveToken(accessToken.getId(), accessToken);
    tokenRepository.saveTokenIdx(accessToken.getUserId(), accessToken);
    tokenRepository.saveToken(refreshToken.getId(), refreshToken);
    tokenRepository.saveTokenIdx(refreshToken.getUserId(), refreshToken);
}

 

 

 

7. 테스트

 

먼저 싱글 스레드 방식에서 해당 코드가 문제가 없는지 테스트하고 멀티 스레드 환경으로 테스트를 마무리하도록 하겠습니다.

 

@Test
@DisplayName("토큰이 모두 없다면 생성하여 저장한다.")
public void getTokenIfExistOrSetAndGet_st() throws Exception {
    //given
    AuthTokenDto[] results = new AuthTokenDto[count];
    createMemberAndAuthorities();

    //when
    for (int i = 0; i < count; i++) {
        results[i] = tokenRedisService
                .getTokenIfExistOrSetAndGet(members[i].getUserId(), authorities.get(i));
    }

    //then
    assertionsGetTokenIfNotExists_And_notSameAllTokens(results);

}

private void assertionsGetTokenIfNotExists_And_notSameAllTokens(AuthTokenDto[] results) {
    Set<String> accessTokenSet = new HashSet<>();
    Set<String> refreshTokenSet = new HashSet<>();

    for (int i = 0; i < count; i++) {
        assertThat(results[i]).isNotNull();
        accessTokenSet.add(results[i].getAccessToken());
        refreshTokenSet.add(results[i].getRefreshToken());
    }
    assertThat(accessTokenSet.size()).isEqualTo(count);
    assertThat(refreshTokenSet.size()).isEqualTo(count);

}

private void createMemberAndAuthorities() {
    for (int i = 0; i < count; i++) {
        members[i] = memberRepository.save(Member.builder()
                        .email("kose" + i + "@naver.com")
                        .userId(UUID.randomUUID().toString())
                        .password(UUID.randomUUID().toString())
                .build());
        authorities
                .add(Arrays.asList(authorityRepository.save(new Authority(members[i], Authorities.ROLE_USER))));
    }
}

 

테스트를 진행한 결과, transaction에서 에러가 발생합니다. ㅜㅜ

눈물을 머금고 에러 로그를 분석해 보겠습니다. 

 

 

분명 aop에서 redisTemplate.multi() 실행 후, rollback 혹은 commit에서 redistemplate.discard() , redistemplate.exec()를 선언했는데 무슨 이유일까요?

 

스프링 Redis 공식문서에서 해당 문제를 해결할 수 있었습니다. 
redis에서 트랜잭션 지원을 받기 위해서는 명시적으로 redistemplate를 초기화하여 빈을 등록할 때, setEnableTransactionSupport(true)로 선언해주어야 한다고 합니다. 

 

 

 

 

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
    RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(connectionFactory);
    redisTemplate.setEnableTransactionSupport(true);

    redisTemplate.setKeySerializer(new StringRedisSerializer());
    redisTemplate.setValueSerializer(new StringRedisSerializer());
    redisTemplate.setHashKeySerializer(new StringRedisSerializer());
    redisTemplate.setHashValueSerializer(new StringRedisSerializer());
    return redisTemplate;
}

 

 

따라서, 코드를 수정하고 테스트를 다시 실행하면 분산락 실행 -> 트랜잭션 실행 -> 트랜잭션 커밋 or 롤백 -> 분산락 해제 순서로 적용된 것을 확인할 수 있습니다.

 

여러 개의 에러 테스트도 진행해야 하지만, 분량으로 인해 모든 테스트를 글로 남기기는 한계가 있어서 이 포스팅에서는 동시성 테스트 결과만 남기도록 하겠습니다.

 

@Test
@DisplayName("같은 아이디로 생성요청이 오면, 1회 토큰을 발급하고 나머지 토큰은 동일해야한다. mt")
public void getTokenIfExistOrSetAndGet_mt() throws Exception {
    //given
    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch latch = new CountDownLatch(count);

    AuthTokenDto[] results = new AuthTokenDto[count];
    createMemberAndAuthorities();

    //when
    for (int i = 0; i < count; i++) {
        int finalIdx = i;
        executorService.submit(() -> {
            try {
                results[finalIdx] = tokenRedisService
                        .getTokenIfExistOrSetAndGet(members[0].getUserId(), authorities.get(0));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                latch.countDown();
            }
        });
    }

    latch.await();
    //then

    assertionsCreateTokenOnlyFirstRequestAtSameUserId_and_sameTokensAtSameUserId(results);

}

private void assertionsCreateTokenOnlyFirstRequestAtSameUserId_and_sameTokensAtSameUserId(AuthTokenDto[] results) {
    Set<String> accessTokenSet = new HashSet<>();
    Set<String> refreshTokenSet = new HashSet<>();

    for (int i = 0; i < count; i++) {
        assertThat(results[i]).isNotNull();
        accessTokenSet.add(results[i].getAccessToken());
        refreshTokenSet.add(results[i].getRefreshToken());
    }
    assertThat(accessTokenSet.size()).isEqualTo(1);
    assertThat(refreshTokenSet.size()).isEqualTo(1);
}

 

 

 

동시성 테스트 결과 싱글 스레드에서 발생했던 에러가 동일하게 적용되었습니다. transaction 적용을 위해 redistemplate 초기화 과정에서 명시적으로 트랜잭션을 적용한다고 선언했지만 왜 이번에도 트랜잭션이 유효하지 않다는 문제가 발생한 것일까요?

 

아래부터는 제가 문제를 해결하는 과정에서 생각한 내용입니다. 사실과 다를 수 있습니다.!

 

이 문제는 매우 복잡한 문제로, 멀티 스레드 환경에서 발생하는 문제의 주요 원인은 스레드 간 공유되는 리소스에 대한 동시 접근으로 부터 기인합니다.

 

저는 ThreadLocal로 txActive 변수를 스레드별로 독립적으로 적용하고 있습니다. 하지만, redistemplate는 싱글톤으로 선언되었기 때문에 스레드 스위칭 과정에서 여전히 redistemplate가 공유되어 사용되고 있습니다. 따라서, 스레드 스위칭 과정에서 트랜잭션이 유효하지 않은 스레드가 다음 요청을 수행하고 있기 때문에 커밋이나 롤백되는 과정에서 트랜잭션이 유효하지 않다는 결과를 받은 것입니다.

 

그렇다면 RDBMS의 데이터 저장 기술을 제공하는 JPA는 어떻게 스레드 간 트랜잭션을 공유할 수 있는 것일까요?

Jpa로 RDBMS로 트랜잭션을 관리하면 스프링에서 제공하는 트랜잭션 기능을 사용하는데, 스프링 트랜잭션 매니저가 스레드에 대해서 개별적인 트랜잭션이 적용될 수 있도록 보장합니다. 따라서, 스레드 스위칭이 발생하더라도 이전 트랜잭션이 유효하기 때문에 커밋 및 롤백 시 유효한 데이터 관리가 가능한 것입니다.

 

하지만, Redis는 multi()로 명령어를 시작하는데, Jpa transaction과 다르게 스레드간 트랜잭션이 유효하지 않습니다. 따라서 스레드 스위칭이 발생하는 시점에 이전 트랜잭션이 유효하지 않기 때문에 이러한 에러가 발생합니다.

 

이 문제를 해결하기 위해서는 스레드간 개별적인 redistemplate 연결이 필요하고 개별적인 인스턴스 별로 각각 트랜잭션 연결이 유지되어야 했습니다.

 

 

 

8. ThreadLocal 범위 수정하기 & 테스트

 

멀티 스레드 환경에서 redistemplate에 적용되는 connection을 개별 스레드별로 관리되도록 처리하기 위해서는 ThreadLocal을 활용할 수 있습니다. 스레드 로컬은 주로 로깅이나 트랜잭션과 같이 스레드 간 변수 공유가 있으면 안되는 상황에 적용합니다. 즉 요청을 수행하는 스레드가 해당 요청을 마칠 때까지 단일 스레드 형식으로 처리됩니다. 이 기능을 활용한다면 멀티 스레드에서 발생하는 트랜잭션 공유 문제를 ThreadLocal의 개별 커넥션으로 해결할 수 있습니다.

 

 

 

이제 코드를 다시 수정하겠습니다.

 

@Slf4j
@Aspect
@Component
@Order(2)
@RequiredArgsConstructor
public class RedisTransactionCustom2Aspect {

    private final RedisConnectionFactory connectionFactory;
    private final ThreadLocal<RedisConnection> threadLocal = new ThreadLocal<>();

    private RedisConnection getRedisConnection() {
        RedisConnection redisConnection = threadLocal.get();
        if (redisConnection == null) {
            redisConnection = connectionFactory.getConnection();
            threadLocal.set(redisConnection);
        }

        return redisConnection;
    }

    @Around("liar.memberservice.common.aop.Pointcuts.transactionMethod() && args(userId, authorities)")
    public AuthTokenDto runWithTx(ProceedingJoinPoint joinPoint, String userId, List<Authority> authorities) throws Throwable {
        getRedisConnection().multi();

        try {
            Object proceed = joinPoint.proceed();

            if (proceed != null) {
                return (AuthTokenDto) proceed;
            }
        } catch (Throwable throwable) {
            throw throwable;
        }
        return null;
    }

    @AfterReturning("liar.memberservice.common.aop.Pointcuts.transactionMethod()")
    public void commitTx() {
        try {
            getRedisConnection().exec();
        } catch (Exception e) {
            getRedisConnection().discard();
        } finally {
            threadLocal.remove();
        }
    }
    
    @AfterThrowing("liar.memberservice.common.aop.Pointcuts.transactionMethod()")
    public void rollbackTx() {
        try {
            getRedisConnection().discard();
        }  catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadLocal.remove();
        }
    }
}

 

먼저 ThreadLocal로 RedisConnection의 생명주기를 관리합니다. 현재 ThreadLocal이 비어있다면, RedisConnectionFactory에서 connection을 가져옵니다, 그리고 현재 ThreadLocal에 커넥션을 연결합니다. 이후, 트랜잭션이 시작되는 aop 프록시가 실행될 때, 개별적인 connection이 진행됩니다. 각 스레드는 개별적인 커넥션으로 트랜잭션을 유지하므로 스레드 간 스위칭이 발생하더라도 threadLocal을 획득한 스레드가 해당 요청을 전적으로 관리할 수 있게 됩니다

 

정리하면, 이전 코드는 커넥션은 그대로 유지하되, 스레드 별로 개별적인 트랜잭션을 적용하여 싱글 스레드에서는 트랜잭션이 유지되었지만 멀티 스레드에서는 스레드간 데이터 공유로 인해, 트랜잭션이 적용되지 않는 스레드가 접근하여 커밋이나 롤백을 수행하므로 트랜잭션이 유효하지 않기에 에러가 발생한 것입니다.

 

하지만 수정한 코드는 커넥션 자체부터 스레드 로컬을 적용하였으므로 redistemplate이 생성되는 시점에서 각 커넥션은 개별 스레드만 접근할 수 있도록 처리된 것입니다.

 

 

드디어 길고 길었던 테스트가 성공합니다 ㅠㅠ!

 

 

 

9. 느낀 점

 

백엔드에서 동시성 문제와 트랜잭션 관리는 고질적인 문제이며 가장 어려운 문제임이 틀림없습니다. 이번 문제를 해결하며, 거의 20시간 가까이 소모했던 것 같습니다. 


먼저 crudRepository -> redistemplate로 리펙토링 -> 분산락 트랜잭션 관리 -> 동시성 문제를 위한 ThreadLocal 생명주기 관리까지 하나의 에러를 해결하는 과정에서 정말 많은 부분을 수정하고 배울 수 있었던 것 같습니다.

 

사실 토이프로젝트에서 동시성 문제를 고려하지 않고 작성되는 코드도 많이 있습니다. 하지만, 소비자 입장에서 생각해 봤을 때, 내 중요한 데이터가 원자성이나 무결성을 보장받지 못한다면 그 서버는 경쟁사회에서 도태될 것입니다.

 

백엔드는 화려하게 보이는 것은 없지만 소비자의 신뢰도를 좌지우지하는 정말 중요한 역할을 수행하는 것 같습니다.

이번 문제를 해결하며, 정말 포기하고 싶다는 머리 끝까지 들었습니다. 하지만, 시간이 걸리더라도 반드시 해결하겠다는 마음이 있었고, 이 기회로 저 스스로 한층 더 성장할 수 있으리라 확신을 할 수 있었습니다.

 

나아가 제가 입사하게 될 회사에서 만약 비슷한 문제가 발생한다면, 이번 경험을 토대로 빠른 문제 해결에 이바지할 수 있을 것이라는 생각을 하게 되었습니다.

 

아직 해결되지 않은 문제도 많이 있습니다. ThreadLocal로 매번 커넥션을 연결하는 과정은 서버 내부에서 많은 부하를 일으킬 수 있습니다. 이 문제를 해결하기 위해 또 많은 연구를 할 예정이고 글로 남기도록 하겠습니다.

 

길었지만 읽어주셔서 감사드립니다.!!!

 

 

+ Recent posts