기록하며 성장한다 - 개발, 회고

개발 회고록

[ 개발 회고록 ] Thread 와 Thread의 데이터 공유

전대홍 2024. 1. 2. 15:33

 

 

개별 TIL 블로그에도 정리한 내용이다.

https://jeondaehong.github.io/TIL/jekyll/2023-11-29-cs-12.html

 

CS - Process vs Thread Deep · Daehong TIL Blog

No results matching ""

jeondaehong.github.io

 


 

Thread 와 Thread 사이에 데이터를 공유해야 하는 일이 생겼다.

하나의 공유 데이터를 동기화 시키는 것이 아니라, A Thread 가 가진 데이터를 B Thread 로 옮겨주는 일이었다.

이 때 의문이 들었다.

어떻게 옮길 수 있을까?

방법은 여러가지가 있을 수 있다.

 

하나는 공유할 수 있는 static 변수를 활용하는 방법이고,

다른 하나는 Queue 를 활용하는 방법이었다.

물론 현업에서는 둘 다 잘 안쓰고, Kafka 를 통해 데이터를 넣고, 컨슈머가 가져가는 방법을 많이 사용한다고 한다.

그러나 의문이 들어서 개발 도중에 Queue 를 활용하여 데이터를 전달해보고 싶었다.

정확히는 는 Thread 도 1개, 쓰는 Thread 도 1개라면 Lock 을 통한 동시성 이슈를 처리해 주어야 할까? 에 대한 의문이 들었던 것이다.

 

읽는 Thread 와, 쓰는 Thread 가 많을 경우에는 당연히 동시성 이슈를 처리해주어야 한다.

그러나 읽는 Thread 도 1개, 쓰는 Thread 도 1개라면 Lock 을 통한 동시성 이슈를 처리해 주어야 할까?

 

다음은 일반적인 Queue 로 데이터를 전달한 테스트 코드이다.

public class ThreadTest {
    private static final Logger log = Logger.getGlobal();
    private static final int MAX_ITEMS = 200;
    private static final Queue<Integer> sharedQueue = new LinkedList<>();
    private static final ExecutorService executorService = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
            for (int i = 1; i <= MAX_ITEMS; i++) {
                sharedQueue.add(i);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
            for (int i = 0; i < MAX_ITEMS; i++) {
                if (!sharedQueue.isEmpty()) {
                    int item = sharedQueue.poll();
                    log.info("Popped: " + item);
                    // Simulate some work being done
                } else {
                    log.info("Empty");
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        CompletableFuture<Void> allOf = CompletableFuture.allOf(producerFuture, consumerFuture);

        // Block until both tasks are completed
        allOf.join();

        executorService.shutdown();
    }
}

 

결과는 아래와 같이 나왔다.

1월 04, 2024 9:57:08 오후 ThreadTest lambda$main$1
INFO: Popped: 1
1월 04, 2024 9:57:08 오후 ThreadTest lambda$main$1
INFO: Popped: 2
1월 04, 2024 9:57:08 오후 ThreadTest lambda$main$1
INFO: Popped: 3
.
.
.
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Popped: 196
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Popped: 197
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Popped: 198
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Empty
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Empty

왜 이런 문제가 발생한 걸까?

예를들어, int b = a + 1 에만 해도 연산이 4번이 들어간다.

Queue 에 대한 push poll 에도 수많은 연산이 들어간다.

그래서 push와 poll 각각 스레드가 1개씩이어도, 누가 먼저 시작하는가, 그리고 연산의 속도 등의 문제로 인해 값이 유실되는 문제가 발생하게 된다. ( push 보다 poll 이 먼저 동작하여 Empty 가 나오는 등 )

참고로 System.out.println 을 사용하면 출력 자체에 Blocking 이 걸려버리기 때문에, 더 안좋은 결과가 나와버린다.

필자는 log 를 활용하였다.

 

 

그래서 Lock 을 걸어보기로 하였다.

다음은 Queue 자체게 Lock 을 걸어서 활용한 코드이다.

public class ThreadTest {
    private static final Logger log = Logger.getGlobal();
    private static final int MAX_ITEMS = 200;
    private static final Queue<Integer> sharedQueue = new LinkedList<>();
    private static final ExecutorService executorService = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
            for (int i = 1; i <= MAX_ITEMS; i++) {
                synchronized (sharedQueue) {
                    sharedQueue.add(i);
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
            for (int i = 0; i < MAX_ITEMS; i++) {
                synchronized (sharedQueue) {
                    if (!sharedQueue.isEmpty()) {
                        int item = sharedQueue.poll();
                        log.info("Popped: " + item);
                        // Simulate some work being done
                    } else {
                        log.info("Empty");
                    }
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        CompletableFuture<Void> allOf = CompletableFuture.allOf(producerFuture, consumerFuture);

        // Block until both tasks are completed
        allOf.join();

        executorService.shutdown();
    }
}
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 1
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 2
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 3
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 4
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 5
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
.
.
.
INFO: Popped: 197
1월 04, 2024 9:53:54 오후 ThreadTest lambda$main$1
INFO: Popped: 198
1월 04, 2024 9:53:54 오후 ThreadTest lambda$main$1
INFO: Popped: 199
1월 04, 2024 9:53:54 오후 ThreadTest lambda$main$1
INFO: Popped: 200

이렇게 하였더니 문제는 해결되었다.

해당 Queue 에 들어갈 수 있는 스레드가 syncronized 로 인해 하나씩만 가능하기 때문이다.

물론, poll 이 먼저 동작을 하면 처음부터 Empty 가 나오는 문제가 생길 수 있다.

그러나 CPU 는 무조건 순차로 진행하기 때문에, 병렬 코드더라도 add 코드가 위에 있기 때문에, 미세하게 먼저 시작을 하게되고, 거기서 Lock 을 걸면서 add -> poll -> add -> poll 이렇게 들어가게 된다.

 

그럼 다른 방법도 있을까를 고민하고, 찾아보던 중동기화를 보장해주는 BigQueue 와 LonkedBlockingQueue 를 알게 되었다.

아래 코드로도 테스트를 진행하였다.

public class ThreadTest {
    private static final Logger log = Logger.getGlobal();
    private static final int MAX_ITEMS = 200;
    private static final BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();
    private static final ExecutorService executorService = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
            for (int i = 1; i <= MAX_ITEMS; i++) {
                sharedQueue.add(i);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
            for (int i = 0; i < MAX_ITEMS; i++) {
                if (!sharedQueue.isEmpty()) {
                    int item = sharedQueue.poll();
                    log.info("Popped: " + item);
                    // Simulate some work being done
                } else {
                    log.info("Empty");
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, executorService);

        CompletableFuture<Void> allOf = CompletableFuture.allOf(producerFuture, consumerFuture);

        // Block until both tasks are completed
        allOf.join();

        executorService.shutdown();
    }
}

 

결론은, 읽는 스레드와 쓰는 스레드가 하나씩만 있어도 Lock 이나 BigQueue 를 통한 이슈를 해결해주어야 한다는 것이다.

그리고 Queue 를 통하여 Thread 끼리 데이터를 주고 받을 수 있다는 것도 알 수 있었다.

물론 Kafka 를 공부하고, 그것을 활용하는 것이 현업에서는 좋겠지만

이렇게 원초적인 방법으로 개념을 잡는 것도 중요하다고 생각한다.