티스토리 뷰
ThreadPoolExecutor RejectedExecutionException 오류 - SynchronousQueue
rangrangerang 2023. 7. 11. 09:24우리 프로젝트에서는 여러개의 쓰레드를 이용하여 작업을 실행하기 위해 ThreadPoolExecutor를 이용하고 있다 쓰레드 풀의 개수는 고정적으로 사용하고 있으며, 해당 쓰레드를 실행하는 부분에서 쓰레드의 개수를 관리하며 쓰레드 풀 이상의 요청은 하지 않기 때문에 이론적으로는 쓰레드 풀이 터지는 경우는 없어야 했다. 하지만 여러 프로젝트를 진행하며 종종 쓰레드 풀이 터지곤 했는데 이번에 조금 더 깊이 파봤다
발생한 오류 는 아래와 같다
RejectedExecutionException: Task cohttp://m.xxx.xx.TrainWorker rejected from java.util.concurrent.ThreadPoolExecutor[Running, pool size = 16, active threads = 2, queued tasks = 1, completed tasks = 167]
오류가 발생한 스레드 풀을 만드는 코드는 아래와 같다 우리는 corePoolSize와 maximumPoolSize를 같은 값으로 사용하고 있다
-> 풀 사이즈는 고정이다(생성되거나 줄어들지 않는다)
workerExecutor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1));
workerExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 작업큐가 꽉 찼을 때 Reject된 task가 RejectedExecutionException을 던짐
ThreadPool을 생성하는 코드를 파악해보자
>> workerExecutor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1));
인수의 의미를 모른다면 밑에 더보기를 클릭하자
인수의 의미(순서대로)
- corePoolSize – 유휴 상태인 경우에도 풀에 유지할 스레드 수입니다.
- maximumPoolSize – 풀에서 허용할 최대 스레드 수입니다.
- keepAliveTime – 스레드 수가 코어보다 많은 경우 유휴 스레드가 새 작업을 기다리는 최대 시간입니다.
- unit– keepAliveTime 인수 의 시간 단위 .
- workQueue– Runnable 작업이 실행되기 전에 보관하는 데 사용할 대기열 .
인수의 의미만 봤을 때는 pool이 차는 순서가 아래와 같을꺼라고 오해하기 쉽다.
corePoolSize -> maximumPoolSize -> workQueue
하지만 실제로는 아래와 같이 workQueue를 먼저 채운다.
corePoolSize -> workQueue -> maximumPoolSize
이전에도 동일한 오류가 터졌었는데 이 코드를 디버깅하면서 acative threds 가 아직 꽉 차지 않았는데 queued tasks가 채워졌다는게 의문이었다 오류의 원인은 execute를 동시에 해서 그런가보다! 하고 execute 하는 메서드에 synchronized를 걸었다.
-> 잘못된 선택..
ThreadPoolExecutor.execute() 함수 분석
아래 함수는 threaPoolExecuter의 execute 함수이다. 대충 확인해보면 새로운 command가 들어왔을 때 아래와 같은 로직으로 진행된다.
1. worker의 개수가 corePoolSize 보다 작으면 thread를 그냥 추가한다.
2. 아니라면 워커큐에 저장하려고 command를 저장하려고 시도한다
-> 우리 코드의 경우 워커큐 사이즈 이상의 요청은 하지 않는다 따라서 이 코드는 무조건 성공해야 한다
3. 만약 2의 작업이 실패하면 최대 Thread 수 보다 작을 때 Thread를 추가
-> 이 코드는 타는일이 없어야 한다
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //최저 Thread 수 보다 작다면 Thread 추가 -> pass
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //Queue 에 저장(offer)하려고 시도 -> 여기서 실패하고 아래로 넘어감 왜?
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//위의 offer가 false를 리턴해야만, 최대 Thread 수 보다 작을때 Thread 추가한다.
// ***-> 우리는 core의 개수와 max의 개수가 동일하기 때문에 쓰레드 추가하려면 무조건 터짐!! 여기로 넘어오면 안됨
else if (!addWorker(command, false))
reject(command);
}
1) if (isRunning(c) && workQueue.offer(command)) { //Queue 에 저장(offer)하려고 시도한다.
-> 여기서 실패하고 아래로 넘어간다 왜?
위 코드가 문제같음 이 코드에서 넘어가면 안됨 우리는 corePoolSize 이상의 요청은 하지 않기 때문에 workerQueue(대기열)는 항상 비어있어야하지 않나?
디버깅 해보니 위 코드에서는 workerQueue의 개수가 1인데 다음로직인 아래 코드로 내려가니 workerQueue가 0이다
2) else if (!addWorker(command, false)) //위의 offer가 false를 리턴해야만, 최대 Thread 수 보다 작을때 Thread 추가한다.
-> 우리는 corePoolSize = maximumPoolSize = poolSize로 같기 때문에 queue를 사용하게 되면 maximumPoolSize를 초과하기 때문에 무조건 터진다!! 여기로 넘어오면 안됨
-> workerQueue가 1일 때 또 넣으려고 하니까 터지나보다!?
그래서 예상 시나리오를 아래처럼 생각했다
1. 1) 코드 실행할 때는workerQueue에 1개 있음(곧 쓰레드에서 가져갈 것)
2. workerQueue에서 빠짐
3. 2) 코드 실행 중 터짐(지금은 workerQueue가 비어있음)
-> 사실 여기까지 내려오면 안됨
해결 방법 : SynchronousQueue 전략
1. LinkedBlockingQueue(우리가 사용하던 전략)
LinkedBlockingQueue(1) : 1개의 큐를 두고 사용
2. SynchronousQueue
task를 큐에 들고있지 않고, 바로 스레드로 넘겨버린다
-> 대기하는 쓰레드와 매칭만 사켜줌. 내부적으로 저장하는 공간이 없음
=> 우리가 원하는 전략
✔ 우리는 사실 SynchronousQueue 를 써야 했던 상황이었다!
따라서 코드를 아래와 같이 바꿨고, 한달에도 여러번 터지던 쓰레드풀은 더 이상 터지지 않는다
workerExecutor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.MINUTES, new SynchronousQueue<>());
workerExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 작업큐가 꽉 찼을 때 Reject된 task가 RejectedExecutionException을 던짐
'ㄴspring boot' 카테고리의 다른 글
[Spring Boot] Multiple Datasource with Clickhouse (0) | 2024.06.17 |
---|---|
[spring boot] mybatis + jpa multi datasource 설정하기 (7) | 2021.02.15 |
- Total
- Today
- Yesterday
- 제우스8
- 쓰레드 변수
- 보관주기
- kafka without zookeeper
- volatile
- jeus8
- jeus8.5
- cleanup policies
- multiple datasource
- 카프카
- s3
- SynchronousQueue
- 도커
- 스레드 동기화
- 제우스 로그
- 제우스8.5
- 네트워크
- php
- docker
- 넥서스 보관주기
- 다중 데이터소스
- kafka with raft
- 주키퍼 없는 카프카
- 넥서스 파일 보관주기
- 티스토리챌린지
- 오블완
- cleanup policy
- API Gateway
- AWS
- db 두개
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |