sobota, 28 stycznia 2012

EIwS-Cert - współbieżność w Springu (cz. 1)

Kontynuując przygotowania do certyfikatu z Enterprise Integration with Spring chciałbym przedstawić sposób w jaki w Springu można zapewnić równoległe wykonywanie wielu zadań - współbieżność. W poprzednich wpisach z serii (wpis I i wpis II) przybliżono temat współbieżności w Javie. Spring na bazie tego co udostępnia Java (ale nie tylko) utworzył swoją warstwę bean'ów pozwalających zrealizować przetwarzanie równoległe.

Podstawowym interfejsem, który udostępnia funkcjonalność wykonywania zadań jest TaskExecutor. Deklaruje on jedną metodę execute i co więcej, od Springa w wersji 3.0, rozszerza interfejs Executor'a z Javy.
Hierarchia dziedziczenia i implementacji interfejsu TaskExecutor
Powyżej zaprezentowana jest hierarchia dziedziczenia oraz implementacji interfejsu TaskExecutor - będzie ona punktem startowym do przybliżenia poszczególnych implementacji.
  1. SyncTaskExecutor - prosta implementacja, która uruchamia zadanie w ramach wątku zlecającego.  Nie ma tutaj żadnej asynchroniczności. Jaki ma cel taka implementacja - przede wszystkim została utworzona dla przypadków testowych. W Javie nie ma Executor'a odpowiadającego tej implementacji Spring'a. Jednak utworzenie jej nie jest trudne - wystarczy w przeciążonej metodzie execute wywoływać metodę run na przekazanym obiekcie Runnable. Właśnie w taki sposób działa SyncTaskExecutor.
  2. Interfejs AsyncTaskExecutor - deklaruje asynchroniczność implementacji, to znaczy - wprowadza metody, które obsługują uruchamianie zadań przez inne wątki i pozwalają wyciągnąć wynik realizacji tych zadań.
    1. execute - rozszerzona metoda o timeout uruchomienia podawany w milisekundach.
    2. submit(Runnable) - metoda analogiczna do execute z interfejsu TaskExecutor z tym, że zwraca uchwyt do zadania, pozwalający na odczytanie stanu zadania (czy zadanie jest anulowane, czy zostało wykonane itp.)
    3. submit(Callable) - metoda zwraca uchwyt do obiektu pozwalającego na wydobycie deklarowanego wyniku zadania.
  3. SimpleAsyncTaskExecutor - klasa implementuje wymieniony wyżej interfejs. Jej cechą szczególną jest to, że dla każdego zleconego zadania tworzy nowy wątek, który ma to zadanie wykorzystać - nie reużywa utworzonych wcześniej wątków. Udostępnia również możliwość wstrzymywania realizacji zadań w momencie przekroczenia pewnego limitu (właściwość concurrencyLimit), jednak wykorzystywanie tego rozwiązania nie jest zalecane. O wiele lepiej wykorzystać TaskExecutor'y obsługujące pule wątków. W tym wypadku Java również nie dostarcza odpowiedniej implementacji. Można ją jednak uzyskać w niemal równie prosty sposób co odpowiednik SyncTaskExecutor'a - z każdym wywołaniem metod execute lub submit należy pobrać z fabryki kolejną instancję wątku i zlecić jej uruchomienie przekazanego zadania.
  4. TaskExecutorAdapter - jest adapterem, który pod Springowym interfejsem AsyncTaskExecutor ukrywa implementacje dowolnego Executor'a z Javy.
  5. WorkManagerTaskExecutor z pakietu jca - omówiony zostanie na samym końcu.
  6. Interfejs SchedulingTaskExecutor - oznacza implementacje jako potrafiące poradzić sobie z planowaniem wykonywania zleconych zadań. Nie deklaruje żadnych (ważnych z punktu widzenia klienta) metod.
  7. ConcurrentTaskExecutor - to jest kolejny adapter, który pod Springowym interfejsem SchedulingTaskExecutor, ukrywa implementacje dowolnego Executor'a z Javy. Dodatkowo implementacja ta umożliwia dostęp do samego Executor'a.
  8. ConcurrentTaskScheduler - kolejny adapter, który pod Springowym interfejsem SchedulingTaskExecutor, ukrywa implementację dowolnego SchedulerExecutorService z Javy. Tak jak w poprzedniej implementacji, ta również umożliwia dostęp do wykorzystywanego Executor'a.
  9. ConcurrentTaskExecutor oraz ThreadPooTaskExecutor z pakietu backportconcurrent - kolejne adaptery, które obsługują swoje odpowiedniki w Javie z pakietu: edu.emory.mathcs.backport.java.util.concurrent.
  10. ThreadPoolTaskExecutor - adapter dla Javowej wersji ThreadPoolExecutor. Udostępnia jako właściwości parametry adaptowanego Executor'a.
  11. ThreadPoolTaskScheduler - adapter dla Javowej wersji ScheduledThreadPoolExecutor. Udostępnia jako właściwości parametry adaptowanego SchedulingTaskExecutor'a.
  12. TimerTaskExecutor - oznaczony jako deprecated. Nie będzie omawiany.
  13. WorkManagerTaskExecutor z pakietu jca - deleguje wywołania do WorkManager'a dostępnego w ramach JCA 1.5.
Czas na przykłady, które zaprezentują działanie kilku wyżej wymienionych implementacji - konkretnie: SyncTaskExecutor, SimpleAsyncTaskExecutor oraz ThreadPoolTaskExecutor. Pełen kod przykładów dostępny jest w projekcie: Enterprise Integration with Spring Examples.

Poniższy listing przedstawia konfigurację Spring'ową wykorzystaną w testach jednostkowych - TaskExecutorTestSpringConfiguration.

   1:  package pl.com.mbsoftware.eiws.concurrency;
   2:   
   3:  import org.springframework.context.annotation.Bean;
   4:  import org.springframework.context.annotation.Configuration;
   5:  import org.springframework.core.task.SimpleAsyncTaskExecutor;
   6:  import org.springframework.core.task.SyncTaskExecutor;
   7:  import org.springframework.core.task.TaskExecutor;
   8:  import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
   9:   
  10:  @Configuration
  11:  public class TaskExecutorTestSpringConfiguration {
  12:   
  13:      @Bean
  14:      public TaskExecutor firstExecutor() {
  15:          return new SyncTaskExecutor();
  16:      }
  17:   
  18:      @Bean
  19:      public TaskExecutor secondExecutor() {
  20:          return new SimpleAsyncTaskExecutor();
  21:      }
  22:   
  23:      @Bean
  24:      public ThreadPoolTaskExecutor thirdExecutor() {
  25:          ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  26:          executor.setCorePoolSize(3);
  27:          executor.setQueueCapacity(5);
  28:          executor.setWaitForTasksToCompleteOnShutdown(true);
  29:          return executor;
  30:      }
  31:   
  32:  }
Konfiguracja definiuje spring'owe bean'y trzech TaskExecutor'ów:

  • linie: 13-16 opisują bean'a firstExecutor będącego implementacją SyncTaskExecutor.
  • linie: 18-21 opisują bean'a secondExecutor będącego implementacją SimpleAsyncTaskExecutor.
  • linie: 23-30 opisują bean'a thirdExecutor będącego implementacją ThreadPoolTaskExecutor.
Wszystkie powyższe bean'y są wykorzystane w kolejnych metodach klasy testowej TaskExecutorTest.

   1:  @Test
   2:  public void testSyncTaskExecutor() {
   3:      final String currentThreadName = Thread.currentThread().getName();
   4:      log.debug("Starting execution in thread: [{}]", currentThreadName);
   5:      for (int i = 0; i < 5; i++) {
   6:          firstExecutor.execute(new Runnable() {
   7:   
   8:              @Override
   9:              public void run() {
  10:                  Assert.assertEquals(currentThreadName, Thread.currentThread().getName());
  11:                  log.debug("Run by thread: [{}]", Thread.currentThread().getName());
  12:              }
  13:          });
  14:      }
  15:      log.debug("... execution has finished in thrad: [{}]!", currentThreadName);
  16:  }
Powyższa metoda wykorzystuje implementację SyncTaskExecutor'a i zleca wykonanie 5 zadań. W linii 3 zapisywana jest informacja o nazwie bieżącego wątku, który obsługuje wykonanie metody. Natomiast w linii 10 sprawdzane jest czy wątek wykonujący zadanie to ten sam wątek, który to zadanie zlecał.
Poniżej przykładowy wynik wykonania testu.

0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Starting execution in thread: [main]
0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [main]
0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [main]
0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [main]
0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [main]
0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [main]
0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - ... execution has finished in thrad: [main]!
Jak widać na powyższym listingu wszystkie zadania zostały wykonane przez wątek main - ten sam, który te zadania zlecił.

Kolejny przykład prezentuje wykorzystanie implementacji SimpleAsyncTaskExecutor:

   1:  @Test
   2:  public void testSimpleAsyncTaskExecutor() {
   3:      final String currentThreadName = Thread.currentThread().getName();
   4:      log.debug("Starting execution in thread: [{}]", currentThreadName);
   5:      for (int i = 0; i < 5; i++) {
   6:          secondExecutor.execute(new Runnable() {
   7:   
   8:              @Override
   9:              public void run() {
  10:                  Assert.assertFalse(currentThreadName.equals(Thread.currentThread().getName()));
  11:                  log.debug("Run by thread: [{}]", Thread.currentThread().getName());
  12:              }
  13:          });
  14:      }
  15:   
  16:      log.debug("... execution has finished in thrad: [{}]!", currentThreadName);
  17:  }
Przykład bardzo podobny do poprzedniego. Różnica występuje w linii 10, gdzie sprawdzane jest czy wątek wykonujący zadanie jest różny od wątku, który to zadanie zlecił do wykonania. Poniżej wywołanie metody testowej:

0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Starting execution in thread: [main]
0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - ... execution has finished in thrad: [main]!
15   [SimpleAsyncTaskExecutor-1] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [SimpleAsyncTaskExecutor-1]
15   [SimpleAsyncTaskExecutor-5] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [SimpleAsyncTaskExecutor-5]
15   [SimpleAsyncTaskExecutor-4] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [SimpleAsyncTaskExecutor-4]
15   [SimpleAsyncTaskExecutor-3] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [SimpleAsyncTaskExecutor-3]
15   [SimpleAsyncTaskExecutor-2] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run by thread: [SimpleAsyncTaskExecutor-2]
Jak widać, każde zadanie zostało wykonane przez inny wątek, różny od tego zlecającego zdania do realizacji - main.

Ostatni przykład wykorzystuje implementację ThreadPoolTaskExecutor'a:

   1:  @Test
   2:  public void testThreadPoolTaskExecutor() throws Exception {
   3:      final String currentThreadName = Thread.currentThread().getName();
   4:      log.debug("Starting execution in thread: [{}]", currentThreadName);
   5:      for (int i = 0; i < 5; i++) {
   6:          thirdExecutor.execute(new Runnable() {
   7:   
   8:              @Override
   9:              public void run() {
  10:                  Assert.assertFalse(currentThreadName.equals(Thread.currentThread().getName()));
  11:                  log.debug("Run byn thread: [{}]", Thread.currentThread().getName());
  12:                  try {
  13:                      Thread.sleep(random.nextInt(1000));
  14:                  } catch (InterruptedException e) {
  15:                      throw new RuntimeException(e);
  16:                  }
  17:              }
  18:          });
  19:      }
  20:      Thread.sleep(2000);
  21:      log.debug("... execution has finished in thrad: [{}]!", currentThreadName);
  22:  }
TaskExecutor skonfigurowany jest w taki sposób, że posiada 3 wątki do wykonywania zleconych zadań. Zadania (zlecanych jest 5) wstrzymywane są na okres losowy (maksymalnie 1 sekundę - linia 13). Na samym końcu wątek główny czeka 2 sekundy dopóki wszystkie zadania nie zostaną przetworzone przez wątki egzekutora. Przykładowy wynik wykonania testu zamieszczono poniżej:

0    [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Starting execution in thread: [main]
0    [thirdExecutor-3] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run byn thread: [thirdExecutor-3]
0    [thirdExecutor-2] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run byn thread: [thirdExecutor-2]
0    [thirdExecutor-1] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run byn thread: [thirdExecutor-1]
172  [thirdExecutor-2] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run byn thread: [thirdExecutor-2]
719  [thirdExecutor-1] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - Run byn thread: [thirdExecutor-1]
2000 [main] DEBUG pl.com.mbsoftware.eiws.concurrency.TaskExecutorTest  - ... execution has finished in thrad: [main]!
W powyższym listingu można zauważyć, że zadania wykonywane są przez wątki inne niż wątek zlecający zadanie. Dodatkowo są tylko 3 wątki i dwa z nich wykonują po dwa zadania z puli 5 zleconych.

W następnym wpisie serii przedstawiony zostanie drugi główny interfejs Spring'owy dostarczający funkcjonalność planowania zadania.

Brak komentarzy:

Prześlij komentarz