티스토리 뷰

우리 프로젝트에서는 여러개의 쓰레드를 이용하여 작업을 실행하기 위해 ThreadPoolExecutor를 이용하고 있다 쓰레드 풀의 개수는 고정적으로 사용하고 있으며, 해당 쓰레드를 실행하는 부분에서 쓰레드의 개수를 관리하며 쓰레드 풀 이상의 요청은 하지 않기 때문에 이론적으로는 쓰레드 풀이 터지는 경우는 없어야 했다. 하지만 여러 프로젝트를 진행하며 종종 쓰레드 풀이 터지곤 했는데 이번에 조금 더 깊이 파봤다

발생한 오류 는 아래와 같다

RejectedExecutionException: Task cohttp://m.xxx.xx.TrainWorker rejected from java.util.concurrent.ThreadPoolExecutor[Running, pool size = 16, active threads = 2, queued tasks = 1, completed tasks = 167]

 

 

오류가 발생한 스레드 풀을 만드는 코드는 아래와 같다 우리는 corePoolSize와 maximumPoolSize를 같은 값으로 사용하고 있다
-> 풀 사이즈는 고정이다(생성되거나 줄어들지 않는다)

    workerExecutor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1));
    workerExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());   // 작업큐가 꽉 찼을 때 Reject된 task가 RejectedExecutionException을 던짐

ThreadPool을 생성하는 코드를 파악해보자

>> workerExecutor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1));

인수의 의미를 모른다면 밑에 더보기를 클릭하자

더보기

인수의 의미(순서대로)

  • corePoolSize – 유휴 상태인 경우에도 풀에 유지할 스레드 수입니다.
  • maximumPoolSize – 풀에서 허용할 최대 스레드 수입니다.
  • keepAliveTime – 스레드 수가 코어보다 많은 경우 유휴 스레드가 새 작업을 기다리는 최대 시간입니다.
  • unit– keepAliveTime 인수  의 시간 단위   .
  • workQueue– Runnable 작업이 실행되기 전에  보관하는 데 사용할 대기열 .

인수의 의미만 봤을 때는 pool이 차는 순서가 아래와 같을꺼라고 오해하기 쉽다.

corePoolSize -> maximumPoolSize -> workQueue

하지만 실제로는 아래와 같이 workQueue를 먼저 채운다.

corePoolSize -> workQueue -> maximumPoolSize 
공식 문서 참고

이전에도 동일한 오류가 터졌었는데 이 코드를 디버깅하면서 acative threds 가 아직 꽉 차지 않았는데 queued tasks가 채워졌다는게 의문이었다 오류의 원인은 execute를 동시에 해서 그런가보다! 하고 execute 하는 메서드에 synchronized를 걸었다.
-> 잘못된 선택..

 

ThreadPoolExecutor.execute() 함수 분석

아래 함수는 threaPoolExecuter의 execute 함수이다. 대충 확인해보면 새로운 command가 들어왔을 때 아래와 같은 로직으로 진행된다.

1. worker의 개수가 corePoolSize 보다 작으면 thread를 그냥 추가한다.

2. 아니라면 워커큐에 저장하려고 command를 저장하려고 시도한다
-> 우리 코드의 경우 워커큐 사이즈 이상의 요청은 하지 않는다 따라서 이 코드는 무조건 성공해야 한다

3. 만약 2의 작업이 실패하면 최대 Thread 수 보다 작을 때 Thread를 추가
-> 이 코드는 타는일이 없어야 한다

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
            /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
            
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {	//최저 Thread 수 보다 작다면 Thread 추가 -> pass
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {	//Queue 에 저장(offer)하려고 시도 -> 여기서 실패하고 아래로 넘어감 왜? 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //위의 offer가  false를 리턴해야만, 최대 Thread 수 보다 작을때 Thread 추가한다. 
        // ***-> 우리는 core의 개수와 max의 개수가 동일하기 때문에 쓰레드 추가하려면 무조건 터짐!! 여기로 넘어오면 안됨
        else if (!addWorker(command, false))
            reject(command);
    }

1)            if (isRunning(c) && workQueue.offer(command)) {  //Queue 에 저장(offer)하려고 시도한다.
-> 여기서 실패하고 아래로 넘어간다 왜? 

위 코드가 문제같음 이 코드에서 넘어가면 안됨 우리는 corePoolSize 이상의 요청은 하지 않기 때문에 workerQueue(대기열)는 항상 비어있어야하지 않나?

디버깅 해보니 위 코드에서는 workerQueue의 개수가 1인데 다음로직인 아래 코드로 내려가니 workerQueue가 0이다

 2)           else if (!addWorker(command, false)) //위의 offer가  false를 리턴해야만, 최대 Thread 수 보다 작을때 Thread 추가한다.
-> 우리는 corePoolSize = maximumPoolSize = poolSize로 같기 때문에 queue를 사용하게 되면 maximumPoolSize를 초과하기 때문에 무조건 터진다!! 여기로 넘어오면 안됨

 

-> workerQueue가 1일 때 또 넣으려고 하니까 터지나보다!?

그래서 예상 시나리오를 아래처럼 생각했다

1. 1) 코드 실행할 때는workerQueue에 1개 있음(곧 쓰레드에서 가져갈 것)
2. workerQueue에서 빠짐
3. 2) 코드 실행 중 터짐(지금은 workerQueue가 비어있음)
-> 사실 여기까지 내려오면 안됨

 

 

해결 방법 : SynchronousQueue 전략

1. LinkedBlockingQueue(우리가 사용하던 전략)

LinkedBlockingQueue(1) : 1개의 큐를 두고 사용

2. SynchronousQueue

task를 큐에 들고있지 않고, 바로 스레드로 넘겨버린다
-> 대기하는 쓰레드와 매칭만 사켜줌. 내부적으로 저장하는 공간이 없음
=> 우리가 원하는 전략

✔ 우리는 사실 SynchronousQueue 를 써야 했던 상황이었다!

따라서 코드를 아래와 같이 바꿨고, 한달에도 여러번 터지던 쓰레드풀은 더 이상 터지지 않는다

    workerExecutor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.MINUTES, new SynchronousQueue<>());
    workerExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());   // 작업큐가 꽉 찼을 때 Reject된 task가 RejectedExecutionException을 던짐

 

 

댓글