piątek, 20 stycznia 2012

EIwS-Cert - współbieżność w Java (cz. 1)

Jednym z pierwszych tematów, który omawiany jest na szkoleniu EIwS jest współbieżność. Zanim przedstawię sposoby w jakie Spring realizuje wielowątkowość proponuję zapoznać się z tym jak radzi sobie z tematem Java.

Spójrzmy najpierw na to co zawiera pakiet java.util.concurrent. Jest tam wiele interfejsów oraz klas.  Podstawowa jednostka wykonywanej pracy - interfejs Runnable - znajduje się jednak w innym pakiecie: java.lang. Jedyne co ona definiuje to metodę (void run()), która wywoływana jest przez wątki realizujące zadanie.

Executor - jest to interfejs, którego implementacje odpowiadają za przyjęcie do realizacji zadania. Oddziela on mechanizm przyjmujący zadanie od mechanizmu wykonującego zlecone zadanie. Implementacje interfejsu muszą dostarczać jedną metodę: void execute(Runnable command).

ExecutorService rozszerza omówiony powyżej interfejs Executor o dodatkowe metody pozwalające na zarządzanie stanem mechanizmu wątkowego. 
  • awaitTermination oznacza wstrzymanie bieżącego wątku do momentu zakończenia realizacji zleconych do wykonania executor'owi zadań. 
  • shutdown zakazuje executor'owi przyjmować do realizacji nowe zadania. 
  • isShutdown informuje o tym, czy executor został zamknięty czy nie. 
  • isTerminated informuje o tym, czy wszystkie zadanie po zleceniu zamknięcia executor'a zostały przez niego zrealizowane. 
Wprawne oko zauważy tam również dodatkowe metody, w obecnej chwili ich znajomość nie jest nam potrzebna - przedstawię je w dalszej części artykułu.

Executors jest fabryką odpowiedzialną za dostarczenie implementacji wyżej wymienionych interfejsów. 
  • newCachedThreadPool zwraca egzekutora, który powołuje wątki realizujące zadania na bieżąco z tym, że jeżeli jakiś wątek wcześniej powołany jest wolny to ma on pierwszeństwo przy realizacji zlecanego zadania. Ten egzekutor nie kolejkuje w żaden sposób zadań, w jego wypadku istnieje ryzyko, że przy ciągłym zlecaniu dużej liczby, długi zadań zostanie powołanych do życia bardzo dużo wątków.
  • newFixedThreadPool zwraca egzekutora, który ma z góry ustaloną liczbę wątków w puli, które zajmują się realizacją zleconych zadań. Jeżeli liczba zadań przekracza liczbę wątków w puli to kolejne napływające są kolejkowane.
  • newSingleThreadExecutor zwraca egzekutora, który realizuje zadania za pomocą jednego wątku. W przypadku, gdy wątek egzekutora jest zajęty to napływające zadania są kolejkowane.
Każda z wymienionych wyżej metod ma swój odpowiednik, który pozwala przekazać fabrykę odpowiedzialną za tworzenie wątków egzekutora (ThreadFactory).

Spójrzmy na prosty przykład:


   1:  @Test
   2:  public void testRunnableForFixedThreadPool() throws InterruptedException {
   3:      // given
   4:      ExecutorService executorService = Executors.newFixedThreadPool(10);
   5:   
   6:      // when
   7:      int i = 0;
   8:      for (i = 0; i < 20; i++) {
   9:          executorService.execute(new DoNothingTaskRunnable(i + 1, new DoNothingTaskImpl()));
  10:      }
  11:      
  12:      Assert.assertFalse(executorService.isShutdown());
  13:      Assert.assertFalse(executorService.isTerminated());
  14:      
  15:      executorService.shutdown();
  16:      
  17:      Assert.assertTrue(executorService.isShutdown());
  18:      Assert.assertFalse(executorService.isTerminated());
  19:   
  20:      try {
  21:          executorService.execute(new DoNothingTaskRunnable(i + 1, new DoNothingTaskImpl()));
  22:          Assert.fail();
  23:      } catch (RejectedExecutionException e) {
  24:   
  25:      }
  26:      executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
  27:   
  28:      // then
  29:      
  30:      Assert.assertTrue(executorService.isShutdown());
  31:      Assert.assertTrue(executorService.isTerminated());
  32:  }

Powyższy przykład w pełnej wersji udostępniony jest na Google Project Hosting w projekcie Enterprise Integration with Spring Examples - jest to jedna z metod testowych klasy: JavaConcurrencyImplTest. W linii 4 korzystając z fabryki egzekutorów pobierana jest instancja egzekutora ze stałą liczbą wątków (10 - wątków obsługujących zadania zlecone do wykonania egzekutorowi). Następnie od linii 7 do 10 zlecana jest realizacja 20 zadań DoNothingTaskRunnable, które opakowują DoNothingTask. Zadania to nie wykonują nic poza wstrzymaniem obsługujące je wątki na losowy czas od 0 do 999 milisekund. Następnie w linii 15 za pomocą metody shutdown wywoływanej na egzekutorze zamykamy możliwość przyjmowania nowych zadań. Linie 20-25 pokazują, że zamkniętemu egzekutorowi nie można zlecić wykonania kolejnego zadania. 26 linia wstrzymuje wykonywanie bieżącego wątku do momentu zakończenia realizacji zleconych zadań przez wątki egzekutora.

Warto zwrócić uwagę na stan egzekutora w poszczególnych etapach przetwarzania zadań. Zaraz po zleceniu zadań do realizacji (linie12-13) egzekutor jest otwarty, a przetwarzanie nie jest zakończone. Następnie (po zamknięciu możliwości zlecania kolejnych zadań - shutdown()) egzekutor jest zamknięty, a przetwarzanie wciąż nie jest zakończone. W końcu, po zrealizowaniu przez wątki wszystkich zadań (wstrzymanie bieżącego wątku za pomocą awaitTermination) egzekutor osiąga stan zamknięty i zakończony - linie 30-31.

Przyjrzyjmy się jeszcze jednej metodzie testowej:


   1:  @Test
   2:  public void testRunnableForSingleThreadPool() throws InterruptedException {
   3:      // given
   4:      ExecutorService executorService = Executors.newSingleThreadExecutor();
   5:   
   6:      // when
   7:      for (int i = 0; i < 5; i++) {
   8:          executorService.execute(new DoNothingTaskRunnable(i + 1, new DoNothingTaskImpl()));
   9:      }
  10:      executorService.shutdown();
  11:      executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
  12:   
  13:      // then
  14:  }
W tym wypadku z fabryki pobieranych jest egzekutor (newSingleThreadExecutor), który powiązany jest wyłącznie z jednym wątkiem realizującym zlecone zadania. Przykładowe uruchomienie testu wygląda w następujący sposób:


0    [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - Runnable start:[1] ...
31   [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - ... finish:[1] 
31   [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - Runnable start:[2] ...
218  [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - ... finish:[2] 
218  [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - Runnable start:[3] ...
625  [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - ... finish:[3] 
625  [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - Runnable start:[4] ...
1359 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - ... finish:[4] 
1359 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - Runnable start:[5] ...
1375 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.DoNothingTaskRunnable  - ... finish:[5] 

Jak widać wszystkie 5 zleconych zadań zostało zrealizowanych przez jeden wątek o nazwie: pool-1-thread-1.

Zdarzają się sytuacje w których wątek zlecający realizację zadania egzekutorowi musi dowiedzieć się o wyniku przetwarzania tego zadania. Aby zrealizować to wymaganie zadanie, zamiast implementować interfejs Runnable, musi zaimplementować interfejs Callable. Interfejs ten wymaga dostarczenia metody V call() throws Exception.
ExecutorService deklaruje kilka metod obsługujących Callable. Wszystkie te metody zwracają w odpowiedzi uchwyt do miejsca w którym zapisany zostanie wynik realizacji  zadania - obiekt implementujący interfejs  Future. Interfejs ten dostarcza kilka przydatnych metod:

  • cancel - metoda próbuje anulować zadanie z którym dany Future jest powiązany.
  • isCancelled - zwraca informacje o tym czy zadanie zostało anulowane czy nie.
  • isDone - zwraca informacje o tym czy zadanie zostało poprawnie zakończone.
  • get - najważniejsza metoda interfejsu, zwraca wynik wykonania zadania. Jeżeli zadanie w momencie wywołania metody get jest jeszcze wykonywane przez inny wątek to wątek odpytujący się o wynik jest wstrzymywany do momentu pojawienia się wyniku.
  • get - druga metoda get dzięki której można określić czas oczekiwania na wynik, w przypadku jego przekroczenia metoda wyrzuca wyjątek TimeoutException.
Przyjrzyjmy się teraz poniższym przykładom - kolejnym metodą testowym z klasy JavaConcurrencyImplTest.

   1:  @Test
   2:  public void testCallback() throws InterruptedException, ExecutionException {
   3:      // given
   4:      Random random = new Random();
   5:      final Integer callbackResult = random.nextInt();
   6:   
   7:      ExecutorService executorService = Executors.newSingleThreadExecutor();
   8:   
   9:      // when
  10:      Future<Integer> resultHolder = executorService.submit(new Callable<Integer>() {
  11:          @Override
  12:          public Integer call() throws Exception {
  13:              Thread.sleep(1000);
  14:              return callbackResult;
  15:          }
  16:      });
  17:   
  18:      Assert.assertFalse(resultHolder.isCancelled());
  19:      Assert.assertFalse(resultHolder.isDone());
  20:   
  21:      executorService.shutdown();
  22:   
  23:      Integer actualResult = resultHolder.get();
  24:   
  25:      // then
  26:      Assert.assertEquals(callbackResult, actualResult);
  27:   
  28:      Assert.assertTrue(resultHolder.isDone());
  29:      Assert.assertFalse(resultHolder.isCancelled());
  30:  }
W liniach od 10 do 16 widać zlecany do realizacji Callable, który ma zwrócić wynik klasy Integer. Zadanie (linia 13-14) nie robi nic poza wstrzymaniem przetwarzania na 1 sekundę i zwróceniem wcześniej wyliczonego wyniku (linia 5). Wątek zlecający w tym czasie weryfikuje stan wyniku (linia 18 i 19) - zadanie nie zostało anulowane i nie zostało wykonane. Następnie w linii 21 zamykany jest egzekutor, a w linii 23 następuje próba pobrania wyniku przetwarzania. Wątek wstrzymywany jest do momentu, aż zadanie zostanie zakończone. Kiedy wynik zostanie pobrany w linii 26 jest on porównywany z wartością referencyjną. Na samym końcu weryfikowany jest stan zadania - zostało ono wykonane (linia 28) i nie zostało anulowane (linia 29).

Jeszcze jeden przykład, którego celem jest zaprezentowanie pobierania wyniku z określonym czasem oczekiwania.

   1:  @Test
   2:  public void testCallbackResultWaitTimeout() throws InterruptedException, ExecutionException {
   3:      // given
   4:      Random random = new Random();
   5:      final Integer callbackResult = random.nextInt();
   6:   
   7:      ExecutorService executorService = Executors.newSingleThreadExecutor();
   8:   
   9:      // when
  10:      Future<Integer> resultHolder = executorService.submit(new Callable<Integer>() {
  11:          @Override
  12:          public Integer call() throws Exception {
  13:              Thread.sleep(1000);
  14:              return callbackResult;
  15:          }
  16:      });
  17:   
  18:      Assert.assertFalse(resultHolder.isCancelled());
  19:      Assert.assertFalse(resultHolder.isDone());
  20:   
  21:      executorService.shutdown();
  22:   
  23:      try {
  24:          resultHolder.get(100, TimeUnit.MILLISECONDS);
  25:          Assert.fail();
  26:      } catch (TimeoutException e) {
  27:   
  28:      }
  29:   
  30:      // then
  31:      Assert.assertFalse(resultHolder.isDone());
  32:      Assert.assertFalse(resultHolder.isCancelled());
  33:      
  34:      Integer actualResult = resultHolder.get();
  35:      
  36:      Assert.assertEquals(callbackResult, actualResult);
  37:   
  38:      Assert.assertTrue(resultHolder.isDone());
  39:      Assert.assertFalse(resultHolder.isCancelled());
  40:  }
Poprzedni przykład został rozszerzony o próbę pobrania wyniku (linie 23-28). Bieżący wątek czeka na wynik jedynie przez 100 milisekund. Wynik nie będzie dostarczony ponieważ wątek wykonujący zadanie jest wstrzymany na 1000 milisekund (oczywiście przy pewnych warunkach ten test nie przejdzie, jest to jednak mało-prawdopodobne).

Interfejs ExecutorService udostępnia jeszcze kilka innych metod, jednak nie są one tak istotne jak te które przedstawiłem.

Brak komentarzy:

Prześlij komentarz