Java & Spring/Java

Java - 비동기처리(CompletableFuture)

DJ.Kang 2025. 3. 6. 22:14

CompletableFuture 정의

  • 비동기 작업을 처리하는 클래스로, 기존 Future의 단점을 해결하며, 논블로킹(Non-blocking) 방식으로 실행된다.

□ 단점

  • 비즈니스 로직과 쓰레드관리 로직이 병합되어 관심사분리가 되지않고있다.

□ 실습코드

- 전역변수

static final long startTime = System.currentTimeMillis();

 


- 간단한 연산 메서드

private static double plus(double a, double b) {
        double plus = a + b;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("쓰레드 인터럽트 발생", e);
        }
        log.info("{} + {} = {}, 소요시간: {}",a, b, plus, System.currentTimeMillis() - startTime);
        return plus;
    }

 

- thenAccept()

public static void main(String[] args) throws InterruptedException {
        final ExecutorService executorService = Executors.newCachedThreadPool();

        for(int i = 0; i < 5; i++){
            CompletableFuture.supplyAsync(
                    () -> plus(Math.round(random() * 10), Math.round(random() * 10)), executorService)
                            .thenAccept(t -> log.info(String.valueOf(t)));

        }
        log.info("메인 스레드 실행 소요시간: {}", System.currentTimeMillis() - startTime);
        Thread.sleep(3000);
        executorService.shutdown();
    }

  • 바로 메인 스레드가 실행
  • 각 작업이 비동기로 실행되고 결과값을 기다리지 않고 실행이 완료된 후 결과값을 반환해줌

- thenCompose()

비동기 작업을 이어서 실행할 때 사용하는 메서드로, 연속적인 비동기 작업을 순차적으로 처리하는 데 유용

public static void main(String[] args) throws InterruptedException {
        final ExecutorService executorService = Executors.newCachedThreadPool();

        CompletableFuture<Double> completableFuture1 = CompletableFuture.supplyAsync(
                () -> plus(Math.round(random() * 10), Math.round(random() * 10)), executorService);

        completableFuture1.thenCompose(
                t -> CompletableFuture.supplyAsync(
                        () -> plus(t, Math.round(random() * 10)), executorService
                )
        ).thenAccept(t -> log.info(String.valueOf(t)));

        log.info("메인 스레드 실행 소요시간: {}", System.currentTimeMillis() - startTime);
        Thread.sleep(3000);
        executorService.shutdown();
    }

  • completableFuture이 비동기로 실행전달하고 메인 스레드가 먼저 실행됨
  • 처음 plus()가 실행되어 11이란 값을 반환
  • 처음 반환된 값을 첫번째 매개변수로 전달하여 두번째 plus()실행

- thenCombine()

public static void main(String[] args) throws InterruptedException {
        final ExecutorService executorService = Executors.newCachedThreadPool();

        CompletableFuture<Double> completableFuture1 = CompletableFuture.supplyAsync(
                () -> plus(Math.round(random() * 10), Math.round(random() * 10)), executorService);

        CompletableFuture<Double> completableFuture2 = CompletableFuture.supplyAsync(
                () -> plus(Math.round(random() * 10), Math.round(random() * 10)), executorService);

        completableFuture1.thenCombine(
                completableFuture2,
                (t1, t2) -> plus(t1, t2)
        ).thenAccept(t -> log.info(String.valueOf(t)));

        log.info("메인 스레드 실행 소요시간: {}", System.currentTimeMillis() - startTime);
        Thread.sleep(3000);
        executorService.shutdown();
    }

  • completableFuture1과 completableFuture2 비동기 실행
  • completableFuture1의 결과와 completableFuture2의 결과를 combine하여 결과 도출