Programowanie Reaktywne - Tworzymy dane - Własna klasa publikująca.

14.02.2018


Artykuł ten jest częścią serii artykułów na temat Programowania reaktywnego.

Zapraszam na GitHub-a.

Tematy

  1. Wstęp
  2. Zabawa z czasem - Timer
  3. Kto za tym stoi? - Scheduler
  4. Nie zapominaj - Subscribe
  5. Zabawa z czasem - Interval
  6. Zabawa z czasem - Buffer
  7. Zabawa z czasem - Delay
  8. Zabawa z czasem - Sample
  9. Zabawa z czasem - Throttle
  10. Zabawa z czasem - Timestamp/TimeInterval
  11. Tworzymy dane - Generators
  12. Tworzymy dane - Własna klasa publikująca
  13. Marudzimy - Skip
  14. Marudzimy - Take
  15. Łap To! - ConsoleKey
  16. Kombinatorzy - Concat
  17. Kombinatorzy - Repeat
  18. Kombinatorzy - Start With
  19. Kombinatorzy - Ambiguous
  20. Kombinatorzy - Merge
  21. Kombinatorzy - Zip
  22. Kombinatorzy - Switch
  23. Kombinatorzy - When-And-Then
  24. Kombinatorzy - Combine Latest
  25. Transformers - Select
  26. Transformers - OfType and Cast
  27. Transformers - Metadata
  28. Bileciki do kontroli - Unit Tests of Interval
  29. Bileciki do kontroli - Unit Tests of Observer Interval
  30. Bileciki do kontroli - Unit Tests of Create Cold/Hot Observable
  31. Szpryca - AutoFac

Wstęp

Reactive Extensions - Custom observable class Z okazji Walentynek dzisiaj pojawi się głównie w kodzie coś ekstra. Zapraszam do kompilacji i obserwacji. Natomiast post poświęcony będzie budowaniu własnej klasy publikującej dane. W tym celu idąc krok dalej stworzyłem bibliotekę RXlib z jakiej będziemy jeszcze korzystać. Idea jest dość prosta. Główny strumień dystrybuujący dane to Interval.

Co 100ms będzie on powiadamiał podpiętych do niego odbiorców. Natomiast to właśnie do tych odbiorców będziemy się subskrybować i odbierać dane.

IObservable

Dysponując takim generycznym interfejsem (IObservable). Możemy stworzyć własną klasę dystrybuującą dane na strumień. W tym konkretnym przypadku stworzymy licznik czasu. I jako typ rozsyłany wykorzystamy klasę **Time**{:.color_1}.

Klasa będzie przechowywała sekundy, minuty, godziny. Dodatkowo by łatwo wyświetlać nadpisałem ToString.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Time
{
	public Time()
	{
		Seconds = 0;
		Minutes = 0;
		Hours = 0;
	}

	public short Seconds { get; set; }
	public short Minutes { get; set; }
	public long Hours { get; set; }

	public override string ToString()
	{
		return $"{Hours}:{Minutes:00}:{Seconds:00}";
	}
}

Skąd tutaj IDisposable? Przyda się gdy już wszystko będziemy zwijać i kończyć działanie programu. Najważniejsze co jest wymagane w przypadku budowania własnej klasy do obserwowania, to lista obserwujących: _observers.

Do konstruktora wstrzykujemy obiekt obserwowalny powstały przy użyciu Observable.Interval. Będzie wyzwalał publikację danych ma strumień zapisanych na naszą listę _observers obserwatorów.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ObservableTimeCounter : IObservable<Time>, IDisposable
{
	private const int MinutesLimit = 59;
	private const int HourLimit = 59;

	private List<IObserver<Time>> _observers;
	private Time _time;
	private IDisposable _tickerSubscription;

	public ObservableTimeCounter(IObservable<long> ticker)
	{
		_observers = new List<IObserver<Time>>();
		_time = new Time();

		SubscribeOnTicker(ticker);
	}

	public IDisposable Subscribe(IObserver<Time> observer)
	{
		if (!_observers.Contains(observer))
		{
			_observers.Add(observer);
		}

		return new Unsubscriber<Time>(_observers, observer);
	}

Reactive Extensions - Własna klasa publikująca

Do zapisu subskrybentów służy metoda Subscribe. Jest ona narzucana przez interfejs i musimy ją zaimplementować (a jak inaczej subskrybować…).

Działanie proste jak obiekt obserwujący nie został jeszcze dodany do listy to dodajemy.

Zwracamy interfejs IDisposable, dzięki takiej konstrukcji mogliśmy wielokrotnie niszczyć obiekty obserwatorów w poprzednich postach.

Ale skąd tutaj u diabła wziąć typ IDisposable? Trzeba wykazać się sprytem i stworzyć kolejny obiekt: Unsubscriber.

Przez implementację interfejsu IDisposable, możemy zwrócić stworzony tak obiekt w Subscribe. A nowy obiekt zawiera odwołanie do obserwatora, oraz listy obserwatorów zawartych w obiekcie jaki będzie obserwowany. Dzięki takiemu zabiegowi będzie mogli się wypisać z _observers.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Unsubscriber<T> : IDisposable
{
	private List<IObserver<T>> _observers;
	private IObserver<T> _observer;

	public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
	{
		_observers = observers;
		_observer = observer;
	}

	public void Dispose()
	{
		if (_observer != null && _observers.Contains(_observer))
		{
			_observers.Remove(_observer);

Typ generyczny zastosowany został po to by nie tworzyć do każdej kolejnej klasy publikującej osobnych klas do odpisywania z listy. Generyczny typo powoduje uniwersalność tego rozwiązania. Czyli mniej kodu:).

A co z naszym ticker-em. Otóż będzie on służył akurat w tym przypadku do kalkulacji zmiennej _time co 1s. Służy do tego metoda CalculateTime. Nie jest ona zbyt ciekawa, zwiera w sobie po prostu zwiększanie pól: sekund, minut, godzin. I zerowanie odpowiedniego pola po przekroczeniu granicznej wartości. Czyli: 60 s => 1 min, 60 min => 1 h.

1
2
3
4
5
6
7
8
9
10
11
12
13
private void SubscribeOnTicker(IObservable<long> ticker)
{
	_tickerSubscription = ticker.Subscribe(
		index =>
		{
			if (index % 10 != 0) return;

			CalculateTime();
		},
		Console.WriteLine,
		OnComplete
	);
}

Na samym końcu metody CalculateTime znajduje się wywołanie metody Publish odpowiedzialnej za wysłanie na strumień aktualnej zmiennej _time.

A dzieje się to bardzo prosto, skoro mamy listę zapisanych obserwatorów _observers to dzięki dobrodziejstwu LINQ wyciąga do każdego obiektu łapę i dusi na publiczne metody OnNext.

1
2
3
4
5
6
7
8
9
10
11
12
private void Publish()
{
	_observers.ForEach(observer => observer.OnNext(_time));
}

Na koniec jeżeli zajdzie potrzeba, i strumień inicjujący (Interval) przejdzie w stan **Completed**. To użyje poniższej metody  **OnComplete**{:.color_1} i dokona żywota wszystkich subskrybentów.
private void OnComplete()
{
	_observers.ForEach(observer => observer.OnCompleted());

	_observers.Clear();
}

ObservableProvider

Reactive Extensions - Własna klasa publikująca Ten post to też początek życia dostawcy obiektów do obserwowania. W tym celu zacząłem pisać klasę pozwalającą na korzystanie z różnych dystrybutorów.

Obecnie zawiera w sobie główny obiekt _ticker. Będzie on wyzwalaczem dla pozostałych obiektów.

ObservableProvider dostarcza na chwilę obecną dwa obiekty do subskrybowania. Pierwszy z nich to opisywany wyżej ObservableTimeCounter. Natomiast drugi (ObservableValentinesDay) został dopisany specjalnie z okazji Walentynek. Zapraszam do przetestowania:).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ObservableProvider : IDisposable
{
	public ObservableTimeCounter TimeCounter { get; private set; }
	public ObservableValentinesDay ValentinesDay { get; private set; }

	private IObservable<long> _ticker;
	
	public ObservableProvider()
	{
		InitializeTicker();

		Initialize();
	}

	private void Initialize()
	{
		TimeCounter = new ObservableTimeCounter(_ticker);
		ValentinesDay = new ObservableValentinesDay(_ticker);
	}

Na bazie klasy należy stworzyć obiekt dostawcy, i korzystać z pól dostawców treści.

Zakończenie

Na koniec warto by w końcu coś zrobić z tą biblioteką. Po dowiązaniu do projektu OwnObservable. Tworzymy obiekt dostawcy i zapisujemy się do dwóch dostępnych dostawców treści: TimeCounter, ValentinesDay.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void Main(string[] args)
{
	var observableProvider = new ObservableProvider();

	observableProvider.TimeCounter.Subscribe(time =>
	{
		Console.ForegroundColor = ConsoleColor.White;
		Console.WriteLine(time);
	});

	observableProvider.ValentinesDay.Subscribe(valentinesDayMessage =>
	{
		Console.ForegroundColor = valentinesDayMessage.Color;
		Console.WriteLine(valentinesDayMessage.Message);
	});

Voilà.

Szczęśliwego dnia Walentego. Dziękuję i zapraszam na GitHub.


Jest to post wchodzący w skład podjętego wyzwania ogłoszonego przez MIROBURN we vlogu z dnia 3 lutego 2018 roku.

Celem wyzwania jest systematyczne działanie w ciągu 30 dni.

Postanowiłem pisać post dziennie o tematyce Programowania Reaktywnego dla platformy .NET.

Wszelkie źródła związane z postami znajdują się na repozytorium GitHub.

Stan obecny wyzwania: 30 z 30 dni.


Referencje:


Wcześniejszy: Programowanie Reaktywne - Tworzymy dane - Generators

Następny: Programowanie Reaktywne - Marudzimy - Skip


Zapisz się na listę :)