
개별 TIL 블로그에도 정리한 내용이다.
https://jeondaehong.github.io/TIL/jekyll/2023-11-29-cs-12.html
CS - Process vs Thread Deep · Daehong TIL Blog
No results matching ""
jeondaehong.github.io
Thread 와 Thread 사이에 데이터를 공유해야 하는 일이 생겼다.
하나의 공유 데이터를 동기화 시키는 것이 아니라, A Thread 가 가진 데이터를 B Thread 로 옮겨주는 일이었다.
이 때 의문이 들었다.
어떻게 옮길 수 있을까?
방법은 여러가지가 있을 수 있다.
하나는 공유할 수 있는 static 변수를 활용하는 방법이고,
다른 하나는 Queue 를 활용하는 방법이었다.
물론 현업에서는 둘 다 잘 안쓰고, Kafka 를 통해 데이터를 넣고, 컨슈머가 가져가는 방법을 많이 사용한다고 한다.
그러나 의문이 들어서 개발 도중에 Queue 를 활용하여 데이터를 전달해보고 싶었다.
정확히는 는 Thread 도 1개, 쓰는 Thread 도 1개라면 Lock 을 통한 동시성 이슈를 처리해 주어야 할까? 에 대한 의문이 들었던 것이다.
읽는 Thread 와, 쓰는 Thread 가 많을 경우에는 당연히 동시성 이슈를 처리해주어야 한다.
그러나 읽는 Thread 도 1개, 쓰는 Thread 도 1개라면 Lock 을 통한 동시성 이슈를 처리해 주어야 할까?
다음은 일반적인 Queue 로 데이터를 전달한 테스트 코드이다.
public class ThreadTest {
private static final Logger log = Logger.getGlobal();
private static final int MAX_ITEMS = 200;
private static final Queue<Integer> sharedQueue = new LinkedList<>();
private static final ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
for (int i = 1; i <= MAX_ITEMS; i++) {
sharedQueue.add(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, executorService);
CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
for (int i = 0; i < MAX_ITEMS; i++) {
if (!sharedQueue.isEmpty()) {
int item = sharedQueue.poll();
log.info("Popped: " + item);
// Simulate some work being done
} else {
log.info("Empty");
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, executorService);
CompletableFuture<Void> allOf = CompletableFuture.allOf(producerFuture, consumerFuture);
// Block until both tasks are completed
allOf.join();
executorService.shutdown();
}
}
결과는 아래와 같이 나왔다.
1월 04, 2024 9:57:08 오후 ThreadTest lambda$main$1
INFO: Popped: 1
1월 04, 2024 9:57:08 오후 ThreadTest lambda$main$1
INFO: Popped: 2
1월 04, 2024 9:57:08 오후 ThreadTest lambda$main$1
INFO: Popped: 3
.
.
.
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Popped: 196
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Popped: 197
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Popped: 198
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Empty
1월 04, 2024 9:57:11 오후 ThreadTest lambda$main$1
INFO: Empty
왜 이런 문제가 발생한 걸까?
예를들어, int b = a + 1 에만 해도 연산이 4번이 들어간다.
Queue 에 대한 push poll 에도 수많은 연산이 들어간다.
그래서 push와 poll 각각 스레드가 1개씩이어도, 누가 먼저 시작하는가, 그리고 연산의 속도 등의 문제로 인해 값이 유실되는 문제가 발생하게 된다. ( push 보다 poll 이 먼저 동작하여 Empty 가 나오는 등 )
참고로 System.out.println 을 사용하면 출력 자체에 Blocking 이 걸려버리기 때문에, 더 안좋은 결과가 나와버린다.
필자는 log 를 활용하였다.
그래서 Lock 을 걸어보기로 하였다.
다음은 Queue 자체게 Lock 을 걸어서 활용한 코드이다.
public class ThreadTest {
private static final Logger log = Logger.getGlobal();
private static final int MAX_ITEMS = 200;
private static final Queue<Integer> sharedQueue = new LinkedList<>();
private static final ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
for (int i = 1; i <= MAX_ITEMS; i++) {
synchronized (sharedQueue) {
sharedQueue.add(i);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, executorService);
CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
for (int i = 0; i < MAX_ITEMS; i++) {
synchronized (sharedQueue) {
if (!sharedQueue.isEmpty()) {
int item = sharedQueue.poll();
log.info("Popped: " + item);
// Simulate some work being done
} else {
log.info("Empty");
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, executorService);
CompletableFuture<Void> allOf = CompletableFuture.allOf(producerFuture, consumerFuture);
// Block until both tasks are completed
allOf.join();
executorService.shutdown();
}
}
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 1
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 2
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 3
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 4
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
INFO: Popped: 5
1월 04, 2024 9:53:51 오후 ThreadTest lambda$main$1
.
.
.
INFO: Popped: 197
1월 04, 2024 9:53:54 오후 ThreadTest lambda$main$1
INFO: Popped: 198
1월 04, 2024 9:53:54 오후 ThreadTest lambda$main$1
INFO: Popped: 199
1월 04, 2024 9:53:54 오후 ThreadTest lambda$main$1
INFO: Popped: 200
이렇게 하였더니 문제는 해결되었다.
해당 Queue 에 들어갈 수 있는 스레드가 syncronized 로 인해 하나씩만 가능하기 때문이다.
물론, poll 이 먼저 동작을 하면 처음부터 Empty 가 나오는 문제가 생길 수 있다.
그러나 CPU 는 무조건 순차로 진행하기 때문에, 병렬 코드더라도 add 코드가 위에 있기 때문에, 미세하게 먼저 시작을 하게되고, 거기서 Lock 을 걸면서 add -> poll -> add -> poll 이렇게 들어가게 된다.
그럼 다른 방법도 있을까를 고민하고, 찾아보던 중동기화를 보장해주는 BigQueue 와 LonkedBlockingQueue 를 알게 되었다.
아래 코드로도 테스트를 진행하였다.
public class ThreadTest {
private static final Logger log = Logger.getGlobal();
private static final int MAX_ITEMS = 200;
private static final BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();
private static final ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
CompletableFuture<Void> producerFuture = CompletableFuture.runAsync(() -> {
for (int i = 1; i <= MAX_ITEMS; i++) {
sharedQueue.add(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, executorService);
CompletableFuture<Void> consumerFuture = CompletableFuture.runAsync(() -> {
for (int i = 0; i < MAX_ITEMS; i++) {
if (!sharedQueue.isEmpty()) {
int item = sharedQueue.poll();
log.info("Popped: " + item);
// Simulate some work being done
} else {
log.info("Empty");
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, executorService);
CompletableFuture<Void> allOf = CompletableFuture.allOf(producerFuture, consumerFuture);
// Block until both tasks are completed
allOf.join();
executorService.shutdown();
}
}
결론은, 읽는 스레드와 쓰는 스레드가 하나씩만 있어도 Lock 이나 BigQueue 를 통한 이슈를 해결해주어야 한다는 것이다.
그리고 Queue 를 통하여 Thread 끼리 데이터를 주고 받을 수 있다는 것도 알 수 있었다.
물론 Kafka 를 공부하고, 그것을 활용하는 것이 현업에서는 좋겠지만
이렇게 원초적인 방법으로 개념을 잡는 것도 중요하다고 생각한다.
'개발 회고록' 카테고리의 다른 글
[ 개발 회고록 ] 여러 대의 서버를 위한, AWS S3 활용 이미지 관리 (2) | 2024.01.02 |
---|---|
[ 개발 회고록 ] JPA를 활용한, 생성 및 수정 날짜 자동 처리를 위한 공통 Entity 개발. (1) | 2024.01.02 |
[ 개발 회고록 ] 비동기 메서드 동시성 문제 발견 및 수정 (0) | 2023.12.04 |
[ 개발 회고록 ] 병렬 프로그래밍과 비동기 구현 (0) | 2023.11.29 |
[ 개발 회고록 ] 게시글 평균 점수 기능을 구현해보자 (0) | 2023.11.27 |