Jednym z pierwszych tematów, który omawiany jest na szkoleniu EIwS jest współbieżność. Zanim przedstawię sposoby w jakie Spring realizuje wielowątkowość proponuję zapoznać się z tym jak radzi sobie z tematem Java.
Spójrzmy najpierw na to co zawiera pakiet java.util.concurrent. Jest tam wiele interfejsów oraz klas. Podstawowa jednostka wykonywanej pracy - interfejs Runnable - znajduje się jednak w innym pakiecie: java.lang. Jedyne co ona definiuje to metodę (void run()), która wywoływana jest przez wątki realizujące zadanie.
Executor - jest to interfejs, którego implementacje odpowiadają za przyjęcie do realizacji zadania. Oddziela on mechanizm przyjmujący zadanie od mechanizmu wykonującego zlecone zadanie. Implementacje interfejsu muszą dostarczać jedną metodę: void execute(Runnable command).
ExecutorService rozszerza omówiony powyżej interfejs Executor o dodatkowe metody pozwalające na zarządzanie stanem mechanizmu wątkowego.
- awaitTermination oznacza wstrzymanie bieżącego wątku do momentu zakończenia realizacji zleconych do wykonania executor'owi zadań.
- shutdown zakazuje executor'owi przyjmować do realizacji nowe zadania.
- isShutdown informuje o tym, czy executor został zamknięty czy nie.
- isTerminated informuje o tym, czy wszystkie zadanie po zleceniu zamknięcia executor'a zostały przez niego zrealizowane.
Wprawne oko zauważy tam również dodatkowe metody, w obecnej chwili ich znajomość nie jest nam potrzebna - przedstawię je w dalszej części artykułu.
Executors jest fabryką odpowiedzialną za dostarczenie implementacji wyżej wymienionych interfejsów.
- newCachedThreadPool zwraca egzekutora, który powołuje wątki realizujące zadania na bieżąco z tym, że jeżeli jakiś wątek wcześniej powołany jest wolny to ma on pierwszeństwo przy realizacji zlecanego zadania. Ten egzekutor nie kolejkuje w żaden sposób zadań, w jego wypadku istnieje ryzyko, że przy ciągłym zlecaniu dużej liczby, długi zadań zostanie powołanych do życia bardzo dużo wątków.
- newFixedThreadPool zwraca egzekutora, który ma z góry ustaloną liczbę wątków w puli, które zajmują się realizacją zleconych zadań. Jeżeli liczba zadań przekracza liczbę wątków w puli to kolejne napływające są kolejkowane.
- newSingleThreadExecutor zwraca egzekutora, który realizuje zadania za pomocą jednego wątku. W przypadku, gdy wątek egzekutora jest zajęty to napływające zadania są kolejkowane.
Każda z wymienionych wyżej metod ma swój odpowiednik, który pozwala przekazać fabrykę odpowiedzialną za tworzenie wątków egzekutora (ThreadFactory).
Spójrzmy na prosty przykład:
Powyższy przykład w pełnej wersji udostępniony jest na Google Project Hosting w projekcie Enterprise Integration with Spring Examples - jest to jedna z metod testowych klasy: JavaConcurrencyImplTest. W linii 4 korzystając z fabryki egzekutorów pobierana jest instancja egzekutora ze stałą liczbą wątków (10 - wątków obsługujących zadania zlecone do wykonania egzekutorowi). Następnie od linii 7 do 10 zlecana jest realizacja 20 zadań DoNothingTaskRunnable, które opakowują DoNothingTask. Zadania to nie wykonują nic poza wstrzymaniem obsługujące je wątki na losowy czas od 0 do 999 milisekund. Następnie w linii 15 za pomocą metody shutdown wywoływanej na egzekutorze zamykamy możliwość przyjmowania nowych zadań. Linie 20-25 pokazują, że zamkniętemu egzekutorowi nie można zlecić wykonania kolejnego zadania. 26 linia wstrzymuje wykonywanie bieżącego wątku do momentu zakończenia realizacji zleconych zadań przez wątki egzekutora.
1: @Test
2: public void testRunnableForFixedThreadPool() throws InterruptedException {
3: // given
4: ExecutorService executorService = Executors.newFixedThreadPool(10);
5:
6: // when
7: int i = 0;
8: for (i = 0; i < 20; i++) {
9: executorService.execute(new DoNothingTaskRunnable(i + 1, new DoNothingTaskImpl()));
10: }
11:
12: Assert.assertFalse(executorService.isShutdown());
13: Assert.assertFalse(executorService.isTerminated());
14:
15: executorService.shutdown();
16:
17: Assert.assertTrue(executorService.isShutdown());
18: Assert.assertFalse(executorService.isTerminated());
19:
20: try {
21: executorService.execute(new DoNothingTaskRunnable(i + 1, new DoNothingTaskImpl()));
22: Assert.fail();
23: } catch (RejectedExecutionException e) {
24:
25: }
26: executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
27:
28: // then
29:
30: Assert.assertTrue(executorService.isShutdown());
31: Assert.assertTrue(executorService.isTerminated());
32: }
Powyższy przykład w pełnej wersji udostępniony jest na Google Project Hosting w projekcie Enterprise Integration with Spring Examples - jest to jedna z metod testowych klasy: JavaConcurrencyImplTest. W linii 4 korzystając z fabryki egzekutorów pobierana jest instancja egzekutora ze stałą liczbą wątków (10 - wątków obsługujących zadania zlecone do wykonania egzekutorowi). Następnie od linii 7 do 10 zlecana jest realizacja 20 zadań DoNothingTaskRunnable, które opakowują DoNothingTask. Zadania to nie wykonują nic poza wstrzymaniem obsługujące je wątki na losowy czas od 0 do 999 milisekund. Następnie w linii 15 za pomocą metody shutdown wywoływanej na egzekutorze zamykamy możliwość przyjmowania nowych zadań. Linie 20-25 pokazują, że zamkniętemu egzekutorowi nie można zlecić wykonania kolejnego zadania. 26 linia wstrzymuje wykonywanie bieżącego wątku do momentu zakończenia realizacji zleconych zadań przez wątki egzekutora.
Warto zwrócić uwagę na stan egzekutora w poszczególnych etapach przetwarzania zadań. Zaraz po zleceniu zadań do realizacji (linie12-13) egzekutor jest otwarty, a przetwarzanie nie jest zakończone. Następnie (po zamknięciu możliwości zlecania kolejnych zadań - shutdown()) egzekutor jest zamknięty, a przetwarzanie wciąż nie jest zakończone. W końcu, po zrealizowaniu przez wątki wszystkich zadań (wstrzymanie bieżącego wątku za pomocą awaitTermination) egzekutor osiąga stan zamknięty i zakończony - linie 30-31.
Przyjrzyjmy się jeszcze jednej metodzie testowej:
1: @Test
2: public void testRunnableForSingleThreadPool() throws InterruptedException {
3: // given
4: ExecutorService executorService = Executors.newSingleThreadExecutor();
5:
6: // when
7: for (int i = 0; i < 5; i++) {
8: executorService.execute(new DoNothingTaskRunnable(i + 1, new DoNothingTaskImpl()));
9: }
10: executorService.shutdown();
11: executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
12:
13: // then
14: }
0 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - Runnable start:[1] ...
31 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - ... finish:[1]
31 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - Runnable start:[2] ...
218 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - ... finish:[2]
218 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - Runnable start:[3] ...
625 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - ... finish:[3]
625 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - Runnable start:[4] ...
1359 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - ... finish:[4]
1359 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - Runnable start:[5] ...
1375 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable - ... finish:[5]
Jak widać wszystkie 5 zleconych zadań zostało zrealizowanych przez jeden wątek o nazwie: pool-1-thread-1.
Zdarzają się sytuacje w których wątek zlecający realizację zadania egzekutorowi musi dowiedzieć się o wyniku przetwarzania tego zadania. Aby zrealizować to wymaganie zadanie, zamiast implementować interfejs Runnable, musi zaimplementować interfejs Callable
ExecutorService deklaruje kilka metod obsługujących Callable
- cancel - metoda próbuje anulować zadanie z którym dany Future
jest powiązany. - isCancelled - zwraca informacje o tym czy zadanie zostało anulowane czy nie.
- isDone - zwraca informacje o tym czy zadanie zostało poprawnie zakończone.
- get - najważniejsza metoda interfejsu, zwraca wynik wykonania zadania. Jeżeli zadanie w momencie wywołania metody get jest jeszcze wykonywane przez inny wątek to wątek odpytujący się o wynik jest wstrzymywany do momentu pojawienia się wyniku.
- get - druga metoda get dzięki której można określić czas oczekiwania na wynik, w przypadku jego przekroczenia metoda wyrzuca wyjątek TimeoutException.
1: @Test
2: public void testCallback() throws InterruptedException, ExecutionException {
3: // given
4: Random random = new Random();
5: final Integer callbackResult = random.nextInt();
6:
7: ExecutorService executorService = Executors.newSingleThreadExecutor();
8:
9: // when
10: Future<Integer> resultHolder = executorService.submit(new Callable<Integer>() {
11: @Override
12: public Integer call() throws Exception {
13: Thread.sleep(1000);
14: return callbackResult;
15: }
16: });
17:
18: Assert.assertFalse(resultHolder.isCancelled());
19: Assert.assertFalse(resultHolder.isDone());
20:
21: executorService.shutdown();
22:
23: Integer actualResult = resultHolder.get();
24:
25: // then
26: Assert.assertEquals(callbackResult, actualResult);
27:
28: Assert.assertTrue(resultHolder.isDone());
29: Assert.assertFalse(resultHolder.isCancelled());
30: }
W liniach od 10 do 16 widać zlecany do realizacji Callable, który ma zwrócić wynik klasy Integer. Zadanie (linia 13-14) nie robi nic poza wstrzymaniem przetwarzania na 1 sekundę i zwróceniem wcześniej wyliczonego wyniku (linia 5). Wątek zlecający w tym czasie weryfikuje stan wyniku (linia 18 i 19) - zadanie nie zostało anulowane i nie zostało wykonane. Następnie w linii 21 zamykany jest egzekutor, a w linii 23 następuje próba pobrania wyniku przetwarzania. Wątek wstrzymywany jest do momentu, aż zadanie zostanie zakończone. Kiedy wynik zostanie pobrany w linii 26 jest on porównywany z wartością referencyjną. Na samym końcu weryfikowany jest stan zadania - zostało ono wykonane (linia 28) i nie zostało anulowane (linia 29).
Jeszcze jeden przykład, którego celem jest zaprezentowanie pobierania wyniku z określonym czasem oczekiwania.
1: @Test
2: public void testCallbackResultWaitTimeout() throws InterruptedException, ExecutionException {
3: // given
4: Random random = new Random();
5: final Integer callbackResult = random.nextInt();
6:
7: ExecutorService executorService = Executors.newSingleThreadExecutor();
8:
9: // when
10: Future<Integer> resultHolder = executorService.submit(new Callable<Integer>() {
11: @Override
12: public Integer call() throws Exception {
13: Thread.sleep(1000);
14: return callbackResult;
15: }
16: });
17:
18: Assert.assertFalse(resultHolder.isCancelled());
19: Assert.assertFalse(resultHolder.isDone());
20:
21: executorService.shutdown();
22:
23: try {
24: resultHolder.get(100, TimeUnit.MILLISECONDS);
25: Assert.fail();
26: } catch (TimeoutException e) {
27:
28: }
29:
30: // then
31: Assert.assertFalse(resultHolder.isDone());
32: Assert.assertFalse(resultHolder.isCancelled());
33:
34: Integer actualResult = resultHolder.get();
35:
36: Assert.assertEquals(callbackResult, actualResult);
37:
38: Assert.assertTrue(resultHolder.isDone());
39: Assert.assertFalse(resultHolder.isCancelled());
40: }
Interfejs ExecutorService udostępnia jeszcze kilka innych metod, jednak nie są one tak istotne jak te które przedstawiłem.
Brak komentarzy:
Prześlij komentarz