poniedziałek, 23 stycznia 2012

EIwS-Cert - współbieżność w Java (cz. 2)

Współbieżna realizacja zleconych zadań nie ogranicza się jedynie do wykonania ich przez egzekutor, który posiada wolny wątek (więcej o tym traktuje poprzedni post). Realizację zadań można również zaplanować! Aby skorzystać z tej możliwości należy użyć dedykowany do tego rodzaju zadań egzekutor - ScheduledExecutorService

Implementacja wymienionego interfejsu dostarczana jest za pomocą: Executors.newScheduledThreadPool. Sam ScheduledExecutorService rozszerza ExecutorService dodając nowe metody pozwalające określić kiedy zadanie ma zostać wykonane, jak często ma być wykonywane lub w jakich odstępach czasu.
  • schedule(Callable, long, TimeUnit) - rejestruje do wykonania zadanie zwracające wynik (Callable), które staje się aktywne (gotowe do wykonania) po określonym czasie.
  • schedule(Runnable, logn, TimeUnit) - podobnie jak metoda powyżej rejestruje zwykłe zadanie (Runnable), które staje się aktywne po określonym czasie.
  • scheduleAtFixRate - rejestruje zadanie (Runnable), które będzie wykonywane cyklicznie co określony czas, a pierwsze wykonanie zostaje opóźnione o czas podany w parametrach wejściowych.
  • scheduleWithWixDelay - rejestruje zadanie (Runnable), które będzie wykonywane co określony czas od momentu zakończenia poprzedniego wykonania, metoda pozwala sparametryzować opóźnienie pierwszego wykonania.
Przykład planowanego wykonania zadania (wszystkie przykład dostępne są na Google Project Hosting):

   1:  @Test
   2:  public void testScheduledExecutorServiceCallback() throws InterruptedException, ExecutionException {
   3:      // given
   4:      Random random = new Random();
   5:      final Integer expectedValue = random.nextInt();
   6:      ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
   7:   
   8:      // when
   9:      ScheduledFuture<Integer> resultHolder = executorService.schedule(new Callable<Integer>() {
  10:          @Override
  11:          public Integer call() throws Exception {
  12:              return expectedValue;
  13:          }
  14:      }, 1, TimeUnit.SECONDS);
  15:      Date taskScheduleDate = new Date();
  16:      executorService.shutdown();
  17:      Integer currentValue = resultHolder.get();
  18:      Date taskExecutionFinishedDate = new Date();
  19:      long timeDifference = taskExecutionFinishedDate.getTime() - taskScheduleDate.getTime();
  20:   
  21:      // then
  22:      Assert.assertTrue(timeDifference < 2000 && timeDifference > 0);
  23:      Assert.assertEquals(expectedValue, currentValue);
  24:   
  25:      Assert.assertTrue(resultHolder.isDone());
  26:      Assert.assertFalse(resultHolder.isCancelled());
  27:  }
W liniach 9-14 następuje zlecenie realizacji zadania aktywowanego po jednej sekundzie, którego jedynym celem jest zwrócenie liczby. Po zleceniu zadania rejestrujemy czas wykonania (linia 15), natomiast po pobraniu wyniku (linia 17 - przypominam, że jest to operacja blokująca bieżący wątek dopóki  zadanie nie zostanie zrealizowane) rejestrowany jest czas końca przetwarzania zadania (linia 18). Z racji tego, że nie ma gwarancji na uruchomienie i wykonanie zadania w jedną sekundę od zlecenia sprawdzamy czas przybliżony, który jest większy o 0 milisekund i jednocześnie mniejszy od 2000 milisekund (linia 22).

Przykład anulowania zaplanowanego zadania:

   1:  @Test
   2:  public void testScheduledExecutorServiceCallbackTaskCancellation() throws InterruptedException, ExecutionException {
   3:      // given
   4:      Random random = new Random();
   5:      final Integer expectedValue = random.nextInt();
   6:      ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
   7:   
   8:      // when
   9:      ScheduledFuture<Integer> resultHolder = executorService.schedule(new Callable<Integer>() {
  10:          @Override
  11:          public Integer call() throws Exception {
  12:              return expectedValue;
  13:          }
  14:      }, 1, TimeUnit.SECONDS);
  15:      executorService.shutdown();
  16:      boolean cancellationResult = resultHolder.cancel(true);
  17:      executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  18:      
  19:      // then
  20:      try {
  21:          resultHolder.get();
  22:          Assert.fail();
  23:      } catch (CancellationException e) {
  24:      }
  25:      Assert.assertTrue(cancellationResult);
  26:  }
Zaplanowanie realizacji zadania następuje w linii 9. W linii 16 dokonywane jest anulowanie zleconego zadania z wymuszeniem jego przerwania jeżeli przetwarzanie już się rozpoczęło. Kolejna linia służy do zaprezentowania, że w egzekutorze nie ma oczekujących i wykonywanych zadań. Następnie weryfikowana jest próba pobrania wyniku na uchwycie zleconego zadania. Próba ta nie może się powieść, ponieważ zadanie zostało już anulowane, w związku z tym wywołanie metody get wyrzuca wyjątek CancellationException. Ostatnim testem jest weryfikacja tego czy rzeczywiście anulowanie wykonywane w linii 16 zakończyło się poprawnie.

Kolejny przykład przedstawia zlecenie zadania wykonywanego dokładnie co 500 milisekund.

   1:  @Test
   2:  public void testScheduledExecutorServiceRepeatAtFixRate() throws InterruptedException, ExecutionException {
   3:      // given
   4:      Random random = new Random();
   5:      ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
   6:      // when
   7:      executorService.scheduleAtFixedRate(new Runnable() {
   8:   
   9:          private int i = 1;
  10:   
  11:          @Override
  12:          public void run() {
  13:              log.debug("Iteration[{}]", i++);
  14:              try {
  15:                  Thread.sleep(300);
  16:              } catch (InterruptedException e) {
  17:   
  18:              }
  19:          }
  20:      }, 200, 500, TimeUnit.MILLISECONDS);
  21:      log.debug("Starting...");
  22:   
  23:      Thread.sleep(4000);
  24:   
  25:      executorService.shutdown();
  26:      log.debug("... finished!");
  27:      executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  28:  }
Wywołanie metody scheduleAtFixedRate (linia 7) rejestruje zadanie wykonywane co określony czas (w tym  wypadku 500 milisekund - linia 20) z ustawionym czasem opóźnienia pierwszego uruchomienia (200 milisekund - linia 20). Wątek główny jest następnie usypiany na 4 sekundy - linia 23.

Intrygujące jest to - jak długo zadanie będzie ponawiane? Nie znalazłem odpowiedzi na to pytanie w API. Podejrzewałem, że powtórzenia będą tak długo wykonywane, aż nie zostanie zakończony proces Java. Okazuje się jednak, że metoda shutdown egzekutora oprócz zamknięcia możliwości rejestracji nowych zadań, uniemożliwia rejestrowanie wykonywania kolejnych cykli już zleconych zadań.

Poniżej przykładowe wywołanie testu:

   1:  0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Starting...
   2:  203  [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[1]
   3:  703  [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[2]
   4:  1203 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[3]
   5:  1703 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[4]
   6:  2203 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[5]
   7:  2703 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[6]
   8:  3203 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[7]
   9:  3703 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[8]
  10:  4000 [main] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - ... finished!
Pierwsze uruchomienie zadania następuje po 200 milisekundach od jego zlecenia, natomiast każde kolejne jest uruchamiane po 500 milisekundach od uruchomienia poprzedniego cyklu - zachowanie takie zdefiniowano wywołując odpowiednio metodę scheduleAtFixedRate.

Kolejny przykład przedstawia zlecenie zadania wykonywanego cyklicznie w odstępie 500 od zakończenia poprzedniego wykonania.

   1:  @Test
   2:  public void testScheduledExecutorServiceRepeatWithFixedDelay() throws InterruptedException, ExecutionException {
   3:      // given
   4:      Random random = new Random();
   5:      ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
   6:      // when
   7:      executorService.scheduleWithFixedDelay(new Runnable() {
   8:   
   9:          private int i = 1;
  10:   
  11:          @Override
  12:          public void run() {
  13:              log.debug("Iteration[{}]", i++);
  14:              try {
  15:                  Thread.sleep(300);
  16:              } catch (InterruptedException e) {
  17:   
  18:              }
  19:          }
  20:      }, 200, 500, TimeUnit.MILLISECONDS);
  21:      log.debug("Starting...");
  22:   
  23:      Thread.sleep(4000);
  24:   
  25:      executorService.shutdown();
  26:      log.debug("... finished!");
  27:      executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  28:  }
Najważniejsza zmiana w stosunku do poprzedniego przykładu to wywoływanie metody scheduleWithFixedDelay zamiast scheduleAtFixedRate. Należy zwrócić uwagę na zadanie, które jest wstrzymywane na 300 milisekund - linia 15. Spójrzmy zatem na przykładowe wykonanie:

0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Starting...
203  [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[1]
1015 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[2]
1812 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[3]
2609 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[4]
3406 [pool-1-thread-1] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - Iteration[5]
4000 [main] DEBUG pl.com.mbsoftware.eiws.concurrency.JavaConcurrencyImplTest  - ... finished!
Jak widać pierwsze wykonanie zadania następuje po 200 milisekundach od zlecenia. Natomiast kolejne zadanie wykonywane jest po 500 milisekundach od zakończenia wykonywania poprzedniego. Należy zatem zsumować czas wykonywania samego zadania (300 milisekund) z czasem kolejnego uruchomienia (500 milisekund) - czyli 800 milisekund.

W następnej części omówiona zostanie współbieżność realizowana za pomocą Spring'a. Zanim to jednak nastąpi proponuję przyjrzenie się jeszcze implementacjom interfejsów: Executor, ExecutorService oraz ScheduledExecutorService.

Java dostarcza dwie implementacje wyżej wymienionych interfejsów. Pierwszą jest ThreadPoolExecutor, natomiast drugą ScheduledThreadPoolExecutor. Należy zwrócić uwagę, że ScheduledThreadPoolExecutor rozszerza ThreadPoolExecutor dostarczając metody zgodne z interfejsem ScheduledExecutorService, dlatego skoncentrujemy się jedynie na ogólnej implementacji.
Bezpośrednie wykorzystanie powyższej implementacji zalecane jest jedynie wtedy, kiedy egzekutory dostarczane przez klasę Executors nie zapewniają określonych wymagań (np. polityki odrzucania zadań).
Tworząc egzekutor należy podać kilka ważny parametrów charakteryzujących jego działanie.
  • corePoolSize - podstawowa liczba wątków, określa ile wątków jest trzymanych w puli nawet jeżeli są one niewykorzystywane. Jeżeli liczba działający wątków jest mniejsza o tej liczby to każde zlecenie zadania tworzy nowe wątek. 
  • maximumPoolSize - maksymalna liczba wątków w puli, po jej osiągnięciu każde nowe zadanie zostaje odrzucone.
  • keepAliveTime (razem z unit) - czas po którym wątek, który został utworzony po przekroczeniu corePoolSize zostaje usunięty z puli.
  • workQueue - kolejka, która przechowuje zadania zlecone do wykonania. Kolejka oraz parametry określające liczbę wątków w puli oddziałują na to kiedy nowe wątki są powoływane do życia. Jeżeli liczba działający wątków jest mniejsza od corePoolSize to dla nowozleconego zadania tworzony jest nowy wątek. Jeżeli liczba działający wątków jest równa corePoolSize to nowozlecone zadania trafia do kolejki. Jeżeli dodatkowo kolejka jest przepełniona to z każdym nowozleconym zadaniem powoływany jest nowy wątek - dopóki liczba wątków nie zrówna się z maximumPoolSize. Jeżeli liczba wątków jest równa maximuPoolSize oraz kolejka jest przepełniona to każde nowozlecone zadanie zostaje odrzucone zgodnie z polityką odrzucania zadań (domyślna polityka wyrzuca wyjątek RejectedExecutionException)
  • threadFactory - fabryka odpowiedzialna za tworzenie nowych wątków - dzięki niej można skonfigurować tworzone wątki.
  • handler - RejectedExecutionHandler - uchwyt odpowiedzialny za zachowanie w przypadku, gdy zlecane jest nowe zadanie i jednocześnie wszystkie wątki (maximumPoolSize) są zajęte a kolejka jest przepełniona. Istnieje kilka predefiniowanych polityk dla obsługi takiego przypadku:
    • AbortPolicy - wyrzuca wyjątek RejectedExecutionException, jest to domyślna polityka odrzucania zadań.
    • CallerRunsPolicy - uruchamia przetwarzanie w wątku zlecającym zadanie, chyba że egzekutor został zamknięty wtedy zadanie zostaje nieobsłużone i usunięte.
    • DiscardOldestPolicy - usuwa najstarsze nieobsłużone zadanie i w jego miejsce wprowadza nowoprzekazane zlecenie.
    • DiscardPolicy - usuwa zlecane zadanie.
Istnieje możliwość definiowania własnego handlera odrzucanych zadań - wystarczy zaimplementować RejectedExecutionHandler. Można utworzyć implementację, która np. zapisuje takie zadania do persystentej kolejki.

Implementacja posiada o wiele większe możliwości konfiguracyjne - zainteresowanych odsyłam do wskazanego już API.

Brak komentarzy:

Prześlij komentarz