자바, 스레드풀
주의 사항!
- 이 글은 제가 직접 공부하는 중에 작성되고 있습니다.
- 따라서 제가 이해하는 그대로의 내용이 포함됩니다.
- 따라서 이 글은 사실과는 다른 내용이 포함될 수 있습니다.
병렬 작업 처리가 많아지면 스레드 개수가 증가되고 그에 따른 스레드 생성과 스케줄링으로 인해 CPU가 바빠져 메모리 사용량이 늘어납니다. 따라서 애플리케이션의 성능이 저하됩니다. 갑작스러운 병렬 작업의 폭증으로 인한 스레드의 폭증을 막으려면 스레드 풀(Thread Pool)을 사용해야 합니다.
스레드 풀은 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해 놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 많아 처리합니다. 작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리합니다. 그렇기 때문에 작업 처리 요청이 폭증되어도 스레드의 전체 개수가 늘어나지 않으므로 애플리케이션의 성능이 급격히 저하되지 않습니다.
자바는 스레드 풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서 ExecutorService 인터페이스와 Executors 클래스를 제공하고 있습니다. Exxecutors의 다양한 정적 메서드를 이용해서 ExecutorService 구현 객체를 만들 수 있는데, 이것이 바로 스레드 풀입니다.
1. 스레드 풀 생성
ExecutorService 구현 객체는 Executors 클래스의 다음 두 가지 메서드 중 하나를 이용해서 간편하게 생성할 수 있습니다.
메서드(매개 변수) | 초기 스레드 수 | 코어 스레드 수 | 최대 스레드 수 |
newCachedThreadPool() | 0 | 0 | Integer.MAX-VALUE |
newFixedThreadPool(int nThreads) | 0 | nThreads | nThreads |
초기 스레드 수는 ExecutorService 구현 객체가 생성될 때 기본적으로 생성되는 스레드 수를 말하고, 코어 스레드 수는 스레드 수가 증가된 후 사용되지 않는 스레드를 스레드 풀에서 제거할 때 최소한 유지해야 할 스레드 수를 말합니다. 최대 스레드 수는 스레드 풀에서 관리하는 최대 스레드 수입니다.
newCachedThreadPool() 메서드로 생성된 스레드 풀은 초기 스레드 개수와 코어 스레드 개수는 0개이고, 스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시켜 작업을 처리합니다. 이론적으로는 int 값이 가질 수 있는 최댓값만큼 스레드가 추가되지만, 운영체제의 성능과 상황에 따라 달라집니다. 1개 이상의 스레드가 추가되었을 경우 60초 동안 추가된 스레드가 아무 작업을 하지 않으면 추가된 스레드를 종료하고 풀에서 제거합니다. 다음은 newCachedThreadPool()을 호출해서 ExecutorService 구현 객체를 얻는 코드입니다.
ExecutorService executorService = Executors.newCachedThreadPool();
newFixedThreadPool(int nThreads) 메서드로 생성된 스레드 풀의 초기 스레드 개수는 0개이고, 코어 스레드 수는 nThreads입니다. 스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시키고 작업을 처리합니다. 최대 스레드 개수는 매개 값으로 준 nThreads입니다. 이 스레드 풀은 스레드가 작업을 처리하지 않고 놀고 있더라도 스레드 개수가 줄지 않습니다. 다음은 CPU 코어의 수만큼 최대 스레드를 사용하는 스레드 풀을 생성합니다.
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
newCachedThreadPool() 메서드와 newFixedThreadPool() 메서드를 사용하지 않고 코어 스레드 개수와 최대 스레드 개수를 설정하고 싶다면 직접 ThreadPoolExecutor 객체를 생성하면 됩니다. 사실 위 두 가지 메서드도 내부적으로 ThreadPoolExecutor객체를 생성해서 리턴합니다. 다음은 초기 스레드 개수가 0개, 코어 스레드 개수가 3개, 최대 스레드 개수가 100개인 스레드 풀을 생성합니다. 그리고 코어 스레드 3개를 제외한 나머지 추가된 스레드가 120초 동안 놀고 있을 경우 해당 스레드를 제거해서 스레드 수를 관리합니다.
ExecutorService threadPool = new ThreadPoolExecutor(
3, //코어 스레드 개수
100, //최대 스레드 개수
120L, //놀고 있는 시간
TimeUnit.SECONDS, //놀고 있는 시간 단위
new SynchronousQueue<Runnable>() //작업 큐
);
2. 스레드 풀 종료
스레드 풀의 스레드는 기본적으로 데몬 스레드가 아니기 때문에 main 스레드가 종료되더라도 작업을 처리하기 위해 계속 실행 상태로 남아있습니다. 그래서 main() 메서드가 실행이 끝나도 애플리케이션 프로세스는 종료되지 않습니다. 애플리케이션을 종료하려면 스레드 풀을 종료시켜 스레드들이 종료 상태가 되도록 처리해주어야 합니다. ExecutorService는 종료와 관련해서 다음 세 개의 메서드를 제공하고 있습니다.
리턴 타입 | 메서드(매개 변수) | 설명 |
void | shutdown() | 현재 처리 중인 작업뿐만 아니라 작업 큐에 대기하고 있는 모든 작업을 처리한 뒤에 스레드풀을 종료시킵니다. |
List<Runnable> | shutdownNow() | 현재 작업 처리 중인 스레드를 interrupt해서 작업 중지를 시도하고 스레드풀을 종료시킵니다. 리턴 값은 작업 큐에 있는 미처리된 작업(Runnable)의 목록입니다. |
boolean | awaitTermination(long timeout, TimeUnit unit) | shutdown() 메서드를 호출한 이후, 모든 작업 처리를 timeout 시간 내에 완료하면 true를 리턴하고, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false를 리턴합니다. |
남아 있는 작업을 마무리하고 스레드 풀을 종료할 때는 shutdown()을 일반적으로 호출하고, 남아있는 작업과는 상관없이 강제로 종료할 때에는 shutdownNow()를 호출합니다.
executorService.shutdown(); //남은 작업을 모두 처리하고 종료
executorService.shutdownNow(); //남은 작업 상관없이 강제 종료
3. 작업 생성
하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현합니다. Runnable과 Callable의 차이점은 작업 처리 완료 후 리턴 값이 있느냐 없느냐입니다. 다음은 작업을 정의하기 위해 Runnable 구현 클래스를 작성하는 방법입니다.
Runnable task = new Runnable() {
@Override
public void run() {
//스레드가 처리할 작업 내용
}
}
다음은 작업을 정의하기 위해 Callable 구현 클래스를 작성하는 방법입니다.
Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception {
//스레드가 처리할 작업 내용
return T;
}
}
Runnable의 run() 메서드는 리턴 값이 없고, Callable의 call() 메서드는 리턴 값이 있습니다. call()의 리턴 타입은 implements Callable <T>에서 지정한 T 타입입니다. 스레드 풀의 스레드는 작업 큐에서 Runnable 또는 Callable 객체를 가져와 run()과 call() 메서드를 실행합니다.
4. 작업 처리 요청
작업 처리 요청이란 ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말합니다. ExecutorService는 작업 처리 요청을 위해 다음 두 가지 종류의 메서드를 제공합니다.
리턴 타입 | 메서드(매개 변수) | 설명 |
void | execute(Runnable command) | Runnable을 작업 큐에 저장합니다. 작업 처리 결과는 얻을 수 없습니다. |
Future<?> Future<V> Future<V> |
submit(Runnable task) submit(Runnable task, V result) submit(Callable<V> task) |
Runnable 또는 Callable을 작업 큐에 저장합니다. 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있습니다. |
execute() 메서드와 submit() 메서드의 차이점은 두 가지입니다. 하나는 execute() 메서드는 작업 처리 결과를 받지 못하지만 submit() 메서드는 작업 처리 결과를 받을 수 있도록 Future를 리턴한다는 것입니다.
또 다른 차이점은 다음과 같습니다. execute() 메서드는 작업 처리 도중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드 풀에서 제거됩니다. 따라서 스레드 풀은 다른 작업 처리를 위해 새로운 스레드를 생성합니다. 반면에 submit() 메서드는 작업 처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용됩니다. 그렇기 때문에 가급적이면 스레드의 생성 오버 헤더를 줄이기 위해서 submit()을 사용하는 것이 좋습니다.
다음 예제는 Runnable 작업을 정의할 때 Integer.parseInt("삼")을 넣어 NumberFormatException이 발생하도록 유도했습니다. 10개의 작업을 execute()와 submit() 메서드로 각각 처리 요청했을 경우 스레드 풀의 상태를 살펴보겠습니다. 먼저 execute() 메서드로 작업 처리를 요청한 경우입니다.
//Main.java
package Example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for(int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
//스레드 총 개수 및 작업 스레드 이름 출력
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
int poolSize = threadPoolExecutor.getPoolSize();
String threadName = Thread.currentThread().getName();
System.out.println("[총 스레드 개수 : " + poolSize + "] 작업 스레드 이름 : " + threadName);
//예외 발생시킴
int value = Integer.parseInt("삼");
}
};
executorService.execute(runnable);
//executorService.submit(runnable);
try {
Thread.sleep(10); //콘솔에 출력 시간을 주기 위해 0.01초 일시 정지시킴
} catch(InterruptedException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}
/*
실행결과
[총 스레드 개수 : 1] 작업 스레드 이름 : pool-1-thread-1
Exception in thread "pool-1-thread-1" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-3
Exception in thread "pool-1-thread-3" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-2
Exception in thread "pool-1-thread-2" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-4
Exception in thread "pool-1-thread-4" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-5
Exception in thread "pool-1-thread-5" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-6
Exception in thread "pool-1-thread-6" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-7
Exception in thread "pool-1-thread-7" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-8
Exception in thread "pool-1-thread-8" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-9
Exception in thread "pool-1-thread-9" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-10
Exception in thread "pool-1-thread-10" java.lang.NumberFormatException: For input string: "삼"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Integer.parseInt(Integer.java:652)
at java.base/java.lang.Integer.parseInt(Integer.java:770)
at ExampleJava/Example.Main$1.run(Main.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
*/
스레드 풀의 스레드 최대 개수 2는 변함이 없지만, 실행 스레드의 이름을 보면 모두 다른 스레드가 작업을 처리하고 있습니다. 이것은 작업 처리 도중 예외가 발생했기 때문에 해당 스레드는 제거되고 새 스레드가 계속 생성되기 때문입니다.
이번에는 submit() 메서드로 작업 처리를 요청한 경우입니다.
//Main.java
package Example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for(int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
//스레드 총 개수 및 작업 스레드 이름 출력
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
int poolSize = threadPoolExecutor.getPoolSize();
String threadName = Thread.currentThread().getName();
System.out.println("[총 스레드 개수 : " + poolSize + "] 작업 스레드 이름 : " + threadName);
//예외 발생시킴
int value = Integer.parseInt("삼");
}
};
//executorService.execute(runnable);
executorService.submit(runnable); //submit()으로 작업 요청
try {
Thread.sleep(10); //콘솔에 출력 시간을 주기 위해 0.01초 일시 정지시킴
} catch(InterruptedException e) {
e.printStackTrace();
}
}
executorService.shutdown();
}
}
/*
실행결과
[총 스레드 개수 : 1] 작업 스레드 이름 : pool-1-thread-1
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-2
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-1
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-2
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-1
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-2
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-1
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-2
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-1
[총 스레드 개수 : 2] 작업 스레드 이름 : pool-1-thread-2
*/
실행결과를 보면 예외가 발생하더라도 스레드가 종료되지 않고 계속 재사용되어 다른 작업을 처리하고 있는 것을 볼 수 있습니다.
5. 블로킹 방식의 작업 완료 통보
ExecutorService의 submit() 메서드는 매개 값으로 준 Runnable 또는 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴합니다.
리턴 타입 | 메서드(매개 변수) | 설명 |
Future<?> | submit(Runnable task) | Runnable 또는 Callable을 작업 큐에 저장합니다. 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있습니다. |
Future<V> | submit(Runnable task, V result) | |
Future<V> | submit(Callable<V> task) |
Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가(지연했다가 = 블로킹되었다가) 최종 결과를 얻는 데 사용됩니다. 그래서 Future를 지연 완료(Pending completion) 객체라고 합니다. Future의 get() 메서드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴합니다. 이것이 블로킹을 사용하는 작업 완료 통보 방식입니다. 다음은 Future가 가지고 있는 get() 메서드를 설명한 표입니다.
리턴 타입 | 메서드(매개 변수) | 설명 |
V | get() | 작업이 완료될 때까지 블로킹되었다가 처리 결과 V를 리턴합니다. |
V | get(long timeout, TimeUnit unit) | timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 TimeoutException을 발생시킵니다. |
리턴 타입인 V는 submit(Runnable task, V result)의 두 번째 매개 값인 V 타입이거나 submit(Callable <V> task)의 타입 파라미터 V 타입입니다. 다음은 세 가지 submit() 메서드 별로 Future의 get() 메서드가 리턴하는 값이 무엇인지 보여줍니다.
메서드 | 작업 처리 완료 후 리턴 타입 | 작업 처리 도중 예외 발생 |
submit(Runnable task) | future.get() -> null | future.get() -> 예외 발생 |
submit(Runnable task, Integer result) | future.get() -> int 타입 값 | future.get() -> 예외 발생 |
submit(Callable<String> task) | future.get() -> String 타입 값 | future.get() -> 예외 발생 |
future를 이용한 블로킹 방식의 작업 완료 통보에서 주의할 점이 있습니다. 작업을 처리하는 스레드가 작업을 완료하기 전까지는 get() 메서드가 블로킹되므로 다른 코드를 실행할 수 없습니다. 만약 UI를 변경하고 이벤트를 처리하는 스레드가 get() 메서드를 호출하면 작업을 완료하기 전까지 UI를 변경할 수도 없고 이벤트를 처리할 수도 없게 됩니다. 그렇기 때문에 get() 메서드를 호출하는 스레드는 새로운 스레드이거나 스레드 풀의 또 다른 스레드가 되어야 합니다. 다음은 새로운 스레드를 생성해서 호출하는 예입니다.
new Thread(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
그리고 다음은 스레드 풀의 또 다른 스레드가 호출하는 예입니다.
executorService.submit(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
Future 객체는 작업 결과를 얻기 위한 get() 메서드 이외에도 다음과 같은 메서드를 제공합니다.
리턴 타입 | 메서드(매개 변수) | 설명 |
boolean | cancel(boolean mayInterruptIfRunning) | 작업 처리가 진행 중일 경우 취소시킵니다. |
boolean | isCancelled() | 작업이 취소되었는지 여부를 리턴합니다. |
boolean | isDone() | 작업 처리가 완료되었는지 여부를 리턴합니다. |
cancel() 메서드는 작업을 취소하고 싶을 경우 호출할 수 있습니다. 작업이 시작되기 전이라면 mayInterruptIfRunning 매개 값과는 상관없이 작업 취소 후 true를 리턴 하지만, 작업이 진행 중이라면 mayInterruptIfRunning 매개 값이 true일 경우에만 작업 스레드를 interrupt 합니다. 작업이 완료되었을 경우 또는 어떤 이유로 인해 취소될 수 없다면 cancel() 메서드는 false를 리턴합니다. isCancelled() 메서드는 작업이 완료되기 전에 취소되었을 경우에만 true를 리턴합니다. isDone() 메서드는 작업이 정상적, 예외, 취소, 등 어떤 이유에서건 작업이 완료되었다면 true를 리턴합니다.
5. 1. 리턴 값이 없는 작업 완료 통보
리턴 값이 없는 작업일 경우는 Runnable 객체로 생성하면 됩니다. 결과 값이 없는 작업 처리 요청은 submit(Runnable task) 메서드를 이용하면 됩니다. 결과 값이 없음에도 불구하고 다음과 같이 Future 객체를 리턴하는데, 이것은 스레드가 작업 처리를 정상적으로 완료했는지, 아니면 작업 처리 도중에 예외가 발생했는지 확인하기 위함입니다.
Future future = executorService.submit(task);
작업 처리가 정상적으로 완료되었다면 Future의 get() 메서드는 null을 리턴 하지만 스레드가 작업 처리 도중 interrupt 되면 InterruptedException 예외를 발생시키고, 작업 처리 도중 예외가 발생하면 ExecutionException 예외를 발생시킵니다. 따라서 다음과 같이 예외 처리 코드가 필요합니다.
try {
future.get();
} catch (InterruptedException e) {
//작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
} catch (ExecutionException e) {
//작업 처리 도중 예외가 발생할 경우 실행할 코드
}
다음 예제는 리턴 값이 없고 단순히 1부터 10까지의 합을 출력하는 작업을 Runnable 객체로 생성하고, 스레드 풀의 스레드가 처리하도록 요청한 것입니다.
//Main.java
package Example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
System.out.println("[작업 처리 요청]");
Runnable runnable = new Runnable() {
@Override
public void run() {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
System.out.println("[처리 결과] " + sum);
}
};
Future<?> future = executorService.submit(runnable);
try {
future.get();
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생함] " + e.getMessage());
}
executorService.shutdown();
}
}
/*
실행결과
[작업 처리 요청]
[처리 결과] 55
[작업 처리 완료]
*/
5. 2. 리턴 값이 있는 작업 완료 통보
스레드 풀의 스레드가 작업을 완료한 후에 애플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable로 생성하면 됩니다. 다음은 Callable 객체를 생성하는 코드인데, 주의할 점은 제네릭 타입 파라미터 T는 call() 메서드가 리턴하는 타입이 되도록 해야 합니다.
Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception {
//스레드가 처리할 작업 내용
return T;
}
};
Callable 작업의 처리 요청은 Runnable 작업과 마찬가지로 ExecutorService의 submit() 메서드를 호출하면 됩니다. submit() 메서드는 작업 큐에 Callable 객체를 저장하고 즉시 Future <T>를 리턴합니다. 이때 T는 call() 메서드가 리턴하는 타입입니다.
Future<T> future = executorService.submit(task);
스레드 풀의 스레드가 Callable 객체의 call() 메서드를 모두 실행하고 T 타입의 값을 리턴하면, Future <T>의 get() 메서드는 블로킹이 해제되고 T 타입의 값을 리턴하게 됩니다.
try {
T result = future.get();
} catch(InterruptedException e) {
//작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
} catch(ExecutionException e) {
//작업 처리 도중 예외가 발생된 경우 실행할 코드
}
다음 예제는 1부터 10까지의 합을 리턴하는 작업을 Callable 객체로 생성하고, 스레드 풀의 스레드가 처리하도록 요청한 것입니다.
//Main.java
package Example;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
System.out.println("[작업 처리 요청]");
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
return sum;
}
};
Future<Integer> future = executorService.submit(callable);
try {
int sum = future.get();
System.out.println("[처리 결과] " + sum);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
System.out.println("[실행 예외 발생함] " + e.getMessage());
}
executorService.shutdown();
}
}
/*
실행결과
[작업 처리 요청]
[처리 결과] 55
[작업 처리 완료]
*/
5. 3. 작업 처리 결과를 외부 객체에 저장
상황에 따라서 스레드가 작업한 결과를 외부 객체에 저장해야 할 경우도 있습니다. 예를 들어 스레드가 작업 처리를 완료하고 외부 Result 객체에 작업 결과를 저장하면, 애플리케이션이 Result 객체를 사용해서 어떤 작업을 진행할 수 있을 것입니다. 대게 Result 객체는 공유 객체가 되어, 두 개 이상의 스레드 작업을 취합할 목적으로 이용됩니다.
이런 작업을 하기 위해서 ExecutorService의 submit(Runnable task, V result) 메서드를 사용할 수 있는데, V가 바로 Result 타입이 됩니다. 메서드를 호출하면 즉시 Future <V>가 리턴되는데 Future의 get() 메서드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 V 타입 객체를 리턴합니다. 리턴된 객체는 submit()의 두 번째 매개 값으로 준 객체와 동일한데, 차이점은 스레드 처리 결과가 내부에 저장되어 있다는 것입니다.
Result result = new Result();
Runnable task = new Task(result);
Future<Result> future = executorService.submit(task, result);
result = future.get();
작업 객체는 Runnable 구현 클래스로 생성합니다. 그런데 주의해야 할 점이 있습니다. 스레드에서 결과를 저장하기 위해 외부 Result 객체를 사용해야 하므로 생성자를 통해 Result 객체를 주입받도록 해야 합니다.
class Task implements Runnable {
Result result;
Task(Result result) {
this.result = result;
}
@Override
public void run() {
//작업 코드
//처리 결과를 result로 저장
}
}
다음 예제는 1부터 10까지의 합을 계산하는 두 개의 작업을 스레드 풀에 처리 요청하고, 각각의 스레드가 작업 처리를 완료한 후 산출된 값을 외부 Result 객체에 누적하도록 했습니다.
//Main.java
package Example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
System.out.println("[작업 처리 요청]");
class Task implements Runnable {
Result result;
Task(Result result) {
this.result = result;
}
@Override
public void run() {
int sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
result.addValue(sum);
}
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = executorService.submit(task1, result);
Future<Result> future2 = executorService.submit(task2, result);
try {
result = future1.get();
result = future2.get();
System.out.println("[처리 결과] " + result.accumValue);
System.out.println("[작업 처리 완료]");
} catch (Exception e) {
e.printStackTrace();
System.out.println("[실행 예외 발생함] " + e.getMessage());
}
executorService.shutdown();
}
}
class Result {
int accumValue;
synchronized void addValue(int value) {
accumValue += value;
}
}
/*
실행결과
[작업 처리 요청]
[처리 결과] 110
[작업 처리 완료]
*/
5. 4. 작업 완료 순으로 통보
작업 요청 순서대로 작업 처리가 완료되는 것은 아닙니다. 작업의 양과 스레드 스케줄링에 따라서 먼저 요청한 작업이 나중에 완료되는 경우도 발생합니다. 여러 개의 작업들이 순차적으로 처리도리 필요성이 없고, 처리 결과도 순차적으로 이용할 필요가 없다면 작업 처리가 완료된 것부터 결과를 얻어 이용하면 됩니다. 스레드 풀에서 작업 처리가 완료된 것만 통보받는 방법이 있는데, CompletionService를 이용하는 것입니다. CompletionService는 처리 완료된 작업을 가져오는 poll()과 take() 메서드를 제공합니다.
리턴 타입 | 메서드(매개 변수) | 설명 |
Future<V> | poll() | 완료된 작업의 Future를 가져옵니다. 완료된 작업이 없다면 즉시 null을 리턴합니다. |
Future<V> | poll(long timeout, TimeUnit unit) | 완료된 작업의 Future를 가져옵니다. 완료된 작업이 없다면 timeout까지 블로킹합니다. |
Future<V> | take() | 완료된 작업의 Future를 가져옵니다. 완료된 작업이 없다면 있을 때까지 블로킹합니다. |
Future<V> | submit(Callable<V> task) | 스레드풀에 Callable 작업 처리를 요청합니다. |
Future<V> | submit(Runnable task, V result) | 스레드풀에 Runnable 작업 처리를 요청합니다. |
CompletionService 구현 클래스는 ExecutorCompletionService <V>입니다. 객체를 생성할 때 생성자 매개 값으로 ExecutorService를 제공하면 됩니다.
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
CompletionService<V> completionService = new ExecutorCompletionService<V>(
executorService
);
poll() 메서드와 take() 메서드를 이용해서 처리 완료된 작업의 Future를 얻으려면 CompletionService의 submit() 메서드로 작업 처리 요청을 해야 합니다.
completionService.submit(Callable<V> task);
completionService.submit(Runtime task, V result);
다음은 take() 메서드를 호출하여 완료된 Callable 작업이 있을 때까지 블로킹되었다가 완료된 작업의 Future를 얻고, get() 메서드로 결괏값을 얻어내는 코드입니다. while문은 애플리케이션이 종료될 때까지 반복 실행해야 하므로 스레드 풀의 스레드에서 실행하는 것이 좋습니다.
executorService.submit(new Runnable() {
@Override
public void run() {
while(true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("[처리 결과] " + value);
} catch (Exception e) {
break;
}
}
}
});
tak() 메서드가 리턴하는 완료된 작업은 submit()으로 처리 요청된 작업의 순서가 아님을 명심해야 합니다. 더 이상 완료된 작업을 가져올 필요가 없다면 take() 블로킹에서 빠져나와 while문을 종료해야 합니다. ExecutorService의 shutdownNow() 메서드를 호출하면 take()에서 InterruptedException 예외가 발생하고 catch 절에서 break가 되어 while문을 종료하게 됩니다.
다음 예제는 3개의 Callable 작업을 처리 요청하고 처리가 완료되는 순으로 작업의 결괏값을 콘솔에 출력하도록 했습니다.
//Main.java
package Example;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executorService);
System.out.println("[작업 처리 요청]");
for(int i = 0; i < 3; i++) {
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i = 1; i<= 10; i++) {
sum += i;
}
return sum;
}
});
}
System.out.println("[처리 완료된 작업 확인]");
executorService.submit(new Runnable() {
@Override
public void run() {
while(true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("[처리 결과] " + value);
} catch (Exception e) {
break;
}
}
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("[실행 예외 발생함] " + e.getMessage());
}
executorService.shutdown();
}
}
/*
실행결과
[작업 처리 요청]
[처리 완료된 작업 확인]
[처리 결과] 55
[처리 결과] 55
[처리 결과] 55
*/
6. 콜백 방식의 작업 완료 통보
콜백이란 애플리케이션이 스레드에게 작업 처리를 요청한 후, 스레드가 작업을 완료하면 특정 메서드를 자동 실행하는 기법을 말합니다. 이때 자동 실행되는 메서드를 콜백 메서드라고 합니다.
블로킹 방식의 진행 순서는 이렇습니다.
- 메인 스레드에서 스레드 풀에 작업을 요청한다.
- 작업 요청한 스레드 풀에서 Future를 리턴한다.
- 메인 스레드에서 Future.get() 메서드를 호출한다.
- 스레드 풀의 작업이 완료될 때까지 메인 스레드는 블로킹된다.
- 스레드 풀의 작업이 완료되면 메인 스레드의 블로킹이 해제된다.
하지만 콜백 방식의 진행 순서는 이렇습니다.
- 메인 스레드에서 스레드 풀에 작업을 요청한다.
- 작업 요청한 스레드 풀에서 Future를 리턴한다.
- 메인 스레드와 스레드 풀 각자 독립적으로 자기 작업을 처리한다.
- 스레드 풀의 작업이 완료되면 메인 스레드에서 콜백 메서드가 실행된다.
블로킹 방식은 작업 처리를 요청한 후 작업이 완료될 때까지 블로킹되지만, 콜백 방식은 작업 처리를 요청한 후 결과를 기다릴 필요 없이 다른 기능을 수행할 수 있습니다. 그 이유는 작업 처리가 완료되면 자동적으로 콜백 메서드가 실행되어 결과를 알 수 있기 때문입니다.
하지만 아쉽게도 ExecutorService는 콜백을 위한 별도의 기능을 제공하지 않습니다. 하지만 Runnable 구현 클래스를 작성할 때 콜백 기능을 구현할 수 있습니다. 먼저 콜백 메서드를 가진 클래스가 있어야 하는데, 직접 정의해도 좋고 java.nio.channels.CompletionHandler를 이용해도 좋습니다. 이 인터페이스는 NIO 패키지에 포함되어 있는데 비동기 통신에서 콜백 객체를 만들 때 사용됩니다.
다음은 CompletionHandler 객체를 생성하는 코드입니다.
CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {}
@Override
public void failed(Throwable exc, A attachment) {}
};
CompletionHandler는 completed() 메서드와 failed() 메서드가 있습니다. completed() 메서드는 작업을 정상 처리 완료했을 때 호출되는 콜백 메서드이고, failed() 메서드는 작업 처리 도중 예외가 발생했을 때 호출되는 콜백 메서드입니다. CompletionHandler의 V 타입 파라미터는 결괏값의 타입이고, A는 첨붓값의 타입입니다. 첨붓값은 콜백 메서드에 결괏값 이외에 추가적으로 전달하는 객체라고 생각하면 됩니다. 만약 첨붓값이 필요 없다면 A는 Void로 지정해주면 됩니다. 다음은 작업 처리 결과에 따라 콜백 메서드를 호출하는 Runnable 객체입니다.
Runnable task = new Runnable() {
@Override
public void run() {
try {
//작업 처리
V result = ...;
callback.completed(result, null);
} catch (Exception e) {
callback.failed(e, null);
}
}
};
작업 처리가 정상적으로 완료되면 completed() 콜백 메서드를 호출해서 결괏값을 전달하고, 예외가 발생하면 failed() 콜백 메서드를 호출해서 예외 객체를 전달합니다.
다음은 두 개의 문자열을 정수화해서 더하는 작업을 처리하고 결과를 콜백 방식으로 통보합니다. 첫 번째 작업은 "3", "3"을 주었고 두 번째 작업은 "3", "삼"을 주었습니다. 첫 번째 작업은 정상적으로 처리되기 때문에 completed()가 자동으로 호출되고, 두 번째 작업은 NumberFormatException이 발생되어 failed() 메서드가 호출됩니다.
//Callback.java
package Example;
import java.nio.channels.CompletionHandler;
public class Callback implements CompletionHandler<Integer, Void>{
@Override
public void completed(Integer result, Void attachment) {
System.out.println("completed() 실행 : " + result);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("failed() 실행 : " + exc.toString());
}
}
//Main.java
package Example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Callback callback = new Callback();
executorService.submit(doWorkTask("3", "3", callback));
executorService.submit(doWorkTask("3", "삼", callback));
executorService.shutdown();
}
public static Runnable doWorkTask(final String x, final String y, Callback callback) {
Runnable task = new Runnable() {
@Override
public void run() {
try {
int intX = Integer.parseInt(x);
int intY = Integer.parseInt(y);
int result = intX + intY;
callback.completed(result, null);
} catch(NumberFormatException e) {
callback.failed(e, null);
}
}
};
return task;
}
}
/*
실행 결과
completed() 실행 : 6
failed() 실행 : java.lang.NumberFormatException: For input string: "삼"
*/