Nirvana .NET : Publishing Data Groups with Reactive Extensions

So a while ago, I demonstrated how you can easily query DataGroups using Rx. What I didn’t discuss was how we could use Rx to model DataGroups on the service / data publishing side. We also had a scenario that you may not want in practice – where the data publishing service was always “hot”. It continually published messages, even when no clients were listening.

In my previous post, we had a something like this;

image

In this post we are going to replace the interactive code with a reactive query. Additionally we will monitor the data group membership and only activate the subscription when someone registers interest. At the end I’ll supply you with a solution you can download and play with.

image

What we will end up with is a system based on the publisher / subscriber, with which we will be able to publish an observable sequence of events over your local LAN, WAN, or Internet, to clients written in client applications written in a whole host of languages (e.g. .NET, Silverlight, Java, C++, Objective C, Flex & JavaScript).

Designing the Nirvana Realm Server

First thing we need to do is create a queue that the clients will use to subscribe / unsubscribe from the data groups they are interested in. Open up enterprise manager so that we can setup the realm.

Right-click on the realm, and select the “Create Queue” option.

image

I’m calling our queue “Commands” as it will be used for the incoming subscribe / unsubscribe commands. At this stage I’m just going to use a transient queue, if the system fails, I’m not worried if these messages go missing.

image

OK, the Nirvana realm should now look like this.

image

I’m not going to configure any Data Groups. I’m going to leave the data publishing service to create these dynamically.

Data Publisher Project

Lets create a .NET 4 console application. I’m going to call it “Publisher”.

image

We need add references to three assemblies;

Nirvana DotNet.dll
Nirvana.Reactive.dll
System.Reactive.dll

I’m using the official stable release of Rx; http://www.microsoft.com/download/en/details.aspx?id=26649

The latest Nirvana.NET API is available as part of the source code bundle at the bottom of this blog.

If you’re still with me, you should have something like this;

image

Data Publisher Code

Our data publisher is going to have a method that knows how to price the cross rate for a given currency pair.

IObservable<IMessage> GetCrossRate(string from, string to)

For the purpose of the experiment, I’m going to use Rx’s Generate method to create an observable that simulates a stream of prices. I’m also going to need a function that transforms the price we’re generating into a message that can be published to the Data Group.

Additionally I’m going to decorate this price stream with some side effects so that we can monitor what’s happening. This is the code I’ve ended up with.

		static IObservable GetCrossRate(string from, string to)
		{
			return Observable.Create(o =>
			{
				Console.WriteLine("{0}{1} Active", from, to);

				var rand = new Random();
				var prices = Observable.Generate(
					5d,
					i => true,
					i => i + rand.NextDouble() - 0.5, CreateMesage,
					i => TimeSpan.FromSeconds(0.1));

				var disposable = prices.Subscribe(o);

				return () =>
				{
					Console.WriteLine("{0}{1} Disposed", from, to);
					disposable.Dispose();
				};
			});
		}

		static IMessage CreateMesage(double price)
		{
			var m = new Message();
			m.Properties["Price"] = price;
			return m;
		}

I’m then going to create 3 instances of the observable and register them as DataGroups.


// Create some observables.
var eurusd = GetCrossRate("EUR", "USD");
var gbpaud = GetCrossRate("GBP", "AUD");
var audnzd = GetCrossRate("AUD", "NZD");

// Register them as data groups.
session.DataGroups.CreateDataGroup(eurusd, "EURUSD");
session.DataGroups.CreateDataGroup(gbpaud, "GBPAUD");
session.DataGroups.CreateDataGroup(audnzd, "AUDNZD");

There is one thing left to do on the data publishing side. We need to handle the incoming commands that tell us which subscribers are interested in which Data Groups. In fact we’ve not even defined what that command is going to look like yet. I think we’re going to need three fields.

image

This will tell the publisher;

  • Add or Remove a subscriber from a Data Group
  • Which Data Group
  • Which subscriber

I’ve added some code to our main method to process these incoming commands;

				// Query commands queue
				var commands =
					from args in session.Queues.CreateConsumer("Commands").ToObservable()
					select new
					{
						IsSubscribed = args.Message.Properties.GetBoolean("IsSubscribed"),
						DataGroupName = args.Message.Properties.GetString("DataGroupName"),
						StreamId = args.Message.Properties.GetString("StreamId"),
					};

				// Process incoming commandsB
				commands.Subscribe(command =>
				{
					var group = session.DataGroups.Root.Children
						.Where(g => g.Name == command.DataGroupName)
						.Single();
					if (command.IsSubscribed) group.AddStream(command.StreamId);
					else group.RemoveStream(command.StreamId);
				});

Here is the publisher code in its entirety.

using System;
using System.Linq;
using System.Reactive.Linq;
using MyChannels.Nirvana;

namespace Publisher
{
	class Program
	{
		static void Main()
		{
			using (var session = new Session("nsp://localhost"))
            {
				session.DataGroups.EnableAdmin = true;
				session.Initialize();

				// Query commands queue
				var commands =
					from args in session.Queues.CreateConsumer("Commands").ToObservable()
					select new
					{
						IsSubscribed = args.Message.Properties.GetBoolean("IsSubscribed"),
						DataGroupName = args.Message.Properties.GetString("DataGroupName"),
						StreamId = args.Message.Properties.GetString("StreamId"),
					};

				// Process incoming commandsB
				commands.Subscribe(command =>
				{
					var group = session.DataGroups.Root.Children
						.Where(g => g.Name == command.DataGroupName)
						.Single();
					if (command.IsSubscribed) group.AddStream(command.StreamId);
					else group.RemoveStream(command.StreamId);
				});

				// Create some observables.
				var eurusd = GetCrossRate("EUR", "USD");
				var gbpaud = GetCrossRate("GBP", "AUD");
				var audnzd = GetCrossRate("AUD", "NZD");

				// Register them as data groups.
				session.DataGroups.CreateDataGroup(eurusd, "EURUSD");
				session.DataGroups.CreateDataGroup(gbpaud, "GBPAUD");
				session.DataGroups.CreateDataGroup(audnzd, "AUDNZD");

				Console.WriteLine("Initialized...");
				Console.ReadLine();
			}
		}

		static IObservable GetCrossRate(string from, string to)
		{
			return Observable.Create(o =>
			{
				Console.WriteLine("{0}{1} Active", from, to);

				var rand = new Random();
				var prices = Observable.Generate(
					5d,
					i => true,
					i => i + rand.NextDouble() - 0.5, CreateMesage,
					i => TimeSpan.FromSeconds(0.1));

				var disposable = prices.Subscribe(o);

				return () =>
				{
					Console.WriteLine("{0}{1} Disposed", from, to);
					disposable.Dispose();
				};
			});
		}

		static IMessage CreateMesage(double price)
		{
			var m = new Message();
			m.Properties["Price"] = price;
			return m;
		}
	}
}

You can run the Publisher now. You should see something like this;

image

That not very interesting, but if you open up enterprise manager again, you will noticed that three DataGroups have been created.

image

Subscriber Code

We need to create another console application. I’m calling it “Subscriber”. Again we will need to add references to Rx & Nirvana.NET.

image

Our subscriber code is fairly straight forward. There are two parts to it. Firstly we need to query the data groups for incoming prices.

				// query data groups for incoming prices
				var prices = from args in session.DataGroups.ToObservable()
							 select new
							 {
								 args.Message.Destination,
								 Price = args.Message.Properties.GetDouble("Price"),
							 };
				prices.Subscribe(Console.WriteLine);

Then we need some code that manages our subscriptions. I’ve just created a loop that allows the user to subscribe to EURUSD.

				// producer used to send commands to the publisher
				var queue = session.Queues.CreateProducer("Commands");

				Console.WriteLine("Initialized...");
				while (true)
				{
					Console.WriteLine("Press enter to subscribe");
					Console.ReadLine();
					var message = new Message();
					message.Properties["IsSubscribed"] = true;
					message.Properties["DataGroupName"] = "EURUSD";
					message.Properties["StreamId"] = session.DataGroups.StreamId;
					// Subscribe
					queue.Send(message);

					Console.WriteLine("Press enter to unsubscribe");
					Console.ReadLine();
					message.Properties["IsSubscribed"] = false;
					// Unsubscribe
					queue.Send(message);
				}

Here is the subscriber code in it’s entirety.

using System;
using MyChannels.Nirvana;

namespace Subscriber
{
	using System.Reactive.Linq;

	class Program
	{
		static void Main()
		{
            using (var session = new Session("nsp://localhost"))
			{
				session.DataGroups.Enable = true;
				session.Initialize();

				// query data groups for incoming prices
				var prices = from args in session.DataGroups.ToObservable()
							 select new
							 {
								 args.Message.Destination,
								 Price = args.Message.Properties.GetDouble("Price"),
							 };
				prices.Subscribe(Console.WriteLine);

				// producer used to send commands to the publisher
				var queue = session.Queues.CreateProducer("Commands");

				Console.WriteLine("Initialized...");
				while (true)
				{
					Console.WriteLine("Press enter to subscribe");
					Console.ReadLine();
					var message = new Message();
					message.Properties["IsSubscribed"] = true;
					message.Properties["DataGroupName"] = "EURUSD";
					message.Properties["StreamId"] = session.DataGroups.StreamId;
					// Subscribe
					queue.Send(message);

					Console.WriteLine("Press enter to unsubscribe");
					Console.ReadLine();
					message.Properties["IsSubscribed"] = false;
					// Unsubscribe
					queue.Send(message);
				}
			}
		}
	}
}

Now run both projects.

image

You should see something like this.

image

Notice that when you press enter on the subscriber end, the observable on the publisher wakes up!

image

And then cools back down again!

image

Pretty simple stuff huh?

So there you have it. Publisher / Subscriber model using Nirvana. Reactive on both sides of the equation. I’ve package up the working solution with code, which also includes the latest binaries of the Nirvana .Net and Nirvana.Reactive APIs. You will also need to have downloaded Nirvana 6.0 from the main download area of our site.

Download

Source Code And Binaries

This entry was posted in .NET, DataGroups, Middleware, Rx Extensions. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s