Tworząc aplikację rozbitą na wiele wątków chcemy uzyskać jak największe przyśpieszenie poprzez zrównoleglenie wykonywania części programu. Niekiedy jednak trzeba zapewnić aby poszczególne wątki komunikowały się ze sobą lub koordynowały swoje działania. W najprostszym przypadku można użyć bloków lub metod synchronizowanych. Niekiedy potrzeba jednak użyć bardziej zaawansowanych metod, w takim przypadku możemy skorzystać z kolejki FIFO (first in, first out)

Wyobraźmy sobie taki przykład przepływu danych, często spotykanych w systemach przetwarzających dane:

  • mamy dostawców danych, którzy umieszczają dane w buforze
  • odbiorca danych przetwarza je i umieszcza w buforze wynikowym
  • z bufora wynikowego dane są przetwarzane już przez jeden wątek

przypomina to taki lejek przetwarzania danych, przy czym są narzucone pewne ograniczenia:

  • z bufora kolejkującego w danej chwili może korzystać jeden dostawca lub odbiorca danych
  • dostawca może umieścić dane jeżeli kolejka nie przekroczyła rozmiaru X, inaczej ma czekać na zwolnienie miejsca
  • odbiorca pobiera dane jeżeli kolejka nie jest pusta, inaczej ma czekać aż kolejka zapełni się

W takim przypadku w możemy napisać własną implementację bufora lub skorzystać z implementacji ArrayBlockingQueue, LinkedBlockingQueue lub PriorityBlockingQueue itd.

W przypadku rozproszonych systemów ten sposób komunikacji pomiędzy serwisami (mikro serwisami) możemy zrealizować przy pomocy wymiany komunikatów, baz danych SQL i NoSQL oraz na kilka innych. Przypadku aplikacji monolitycznej możemy skorzystać również z frameworka Akka.

Przykład implementacji ArrayBlockingQueuem, po uruchomieniu otrzymujemy

Thread: P3 send: 139
Thread: P1 send: 137
Thread: P2 send: 136
Result: {P1=137, P2=136, P3=139}

czyli działa prawidłowo, 4 odbiorców przetworzyło dane dostarczone przez 3 producentów danych.

Temat wielowątkowości jest podzielony na parę części:

Przykładowa implementacja:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
 
/**
 * Bufor wyjściowy zliczający ilość procesowanych danych
 */
class ResultBuffer{
 
   private Map<String, Integer> map = new HashMap<>(10);
 
   public synchronized void process(String name){
 
      int counter = 0;
      if (map.containsKey(name)){
         counter = map.get(name);
      }
      counter++;
      map.put(name, counter);
   }
 
   /**
    * Drukowanie zawartości bufora, ilość przetworzonych danych
    */
   public void printResult(){
 
      System.out.printf("Result: " + map.toString());
   }
}
 
/**
 * Klasa generująca dane umieszczane w buforze
 */
class Producer implements Runnable{
 
   private BlockingQueue<String> queue;
   private int numOfSendData = 0;
   private String name;
 
   public Producer(BlockingQueue<String> queue, String name){
 
      this.queue = queue;
      this.name = name;
   }
 
   @Override
   public void run(){
 
      try{
         do{
            numOfSendData++;
            queue.put(name);
            Thread.sleep((long) (10 + Math.random() * 10));
 
         } while (true);
      } catch (InterruptedException e){
         e.printStackTrace();
      } finally {
         System.out.println("Thread: " + Thread.currentThread().getName() + " send: " + numOfSendData);
      }
   }
}
 
/**
 * Klasa odbiorcy danych pobira je z bufora przetwarza
 * i umieszcza w buforze odbiorczym
 * w przypadku gdy przez 100 milisek nie ma danych następuje
 * wyście z danego wątku roboczego
 */
class Consumer implements Runnable{
 
   private BlockingQueue<String> queue;
   private ResultBuffer resultBuffer;
 
   private int numOfProcessData = 0;
   private Map<String, Integer> result = new HashMap<>(10);
 
   public Consumer(BlockingQueue<String> queue, ResultBuffer resultBuffer){
 
      this.queue = queue;
      this.resultBuffer = resultBuffer;
   }
 
   @Override
   public void run(){
 
      try{
         do{
            String element = queue.poll(100, TimeUnit.MILLISECONDS);
            if (element != null){
               resultBuffer.process(element);
               numOfProcessData++;
 
               //
               // Symulacja wykonującej się długo metody
               //
               Thread.sleep(20);
            }
 
         } while (true);
 
      } catch (InterruptedException e){
         e.printStackTrace();
      } finally {
         System.out.println("thread: " + Thread.currentThread().getName() + " process data: " + numOfProcessData);
      }
   }
}
 
/**
 *
 */
public class Thread4BlockingQueue{
   public static void main(String[] args){
 
      BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(20);
      ResultBuffer resultBuffer = new ResultBuffer();
 
      Thread producer1 = new Thread(new Producer(arrayBlockingQueue, "P1"));
      producer1.setName("P1");
      Thread producer2 = new Thread(new Producer(arrayBlockingQueue, "P2"));
      producer2.setName("P2");
      Thread producer3 = new Thread(new Producer(arrayBlockingQueue, "P3"));
      producer3.setName("P3");
 
      Thread consumer1 = new Thread(new Consumer(arrayBlockingQueue, resultBuffer));
      consumer1.setName("C1");
      Thread consumer2 = new Thread(new Consumer(arrayBlockingQueue, resultBuffer));
      consumer2.setName("C2");
      Thread consumer3 = new Thread(new Consumer(arrayBlockingQueue, resultBuffer));
      consumer3.setName("C3");
      Thread consumer4 = new Thread(new Consumer(arrayBlockingQueue, resultBuffer));
      consumer4.setName("C4");
 
 
      //
      // Uruchamiania wątków roboczych
      //
      producer1.start();
      producer2.start();
      producer3.start();
 
      consumer1.start();
      consumer2.start();
      consumer3.start();
      consumer4.start();
 
      try{
         Thread.sleep(2000);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
 
      //
      // Przerwanie wysyłania danych
      //
      producer1.interrupt();
      producer2.interrupt();
      producer3.interrupt();
 
      //
      // Oczekiwanie na zakączenie wątków przetwarzających dane
      // Jest tutaj zastosowane złe rozwiązanie, bo powinno się zastosować
      // pule wątków i oczekiwanie na zakończenie pracy
      //
      try{
         Thread.sleep(100);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
 
      resultBuffer.printResult();
   }
 
}