XAML Playground
about XAML and other Amenities

Consume a Socket using Reactive Extension

2011-06-29T09:43:23+01:00 by Andrea Boschin

In my last post I wrote about writing a TCP server using the Reactive Extensions library. It was not a Silverlight related post, but it simply introduces the server side component that I will use in this post to show you how to use the same library to consume the incoming data and present it in Silverlight. The interesting part is to understand how rx can simplify the programming model when dealing with asynchronicity both in Silverlight and in server side full .NET programming.

The server I presented was a very simple socket listener that waits for connections and then, when the channel has been established, it push to the client a continuous stream of data that represent the current state of the CPU and memory of the server. With this huge amount of informations incoming we can create a little application that is able to show a chart and some gauges, updated almost in realtime.

The application, developed following the MVVM pattern, is made of a simple view containing the controls used to present the informations. These controls are feeded by a couple of properties in the ViewModel that are updated with the incoming data. So the most of the work is done by the ViewModel that is responsible of connecting to the socket and read the information stream to update the properties. In a real world solution probably you will have some kind of layer between the ViewModel and the socket, but for the sake of the post we will keep it simple as much as we need to understand how it works.

Consuming a socket in Silverlight means using an instance of the Socket class that represents the connection and a SocketAsyncEventArgs that is used when you call the methods of the connection to make the requests and receive responses. So, as an example, when you have to establish the connection you have to create the instance of the Socket class and the call the ConnectAsync method providing the SocketAsyncEventArgs initialized with the address of the endpoint to connect to. When the connection has been established the SocketAsyncEventArgs class will raise a Completed event that notify about the result of the operation. Doing it with Reactive Extensions mean something like this:

   1: protected void Connect()
   2: {
   3:     this.Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
   4:  
   5:     SocketAsyncEventArgs arguments = new SocketAsyncEventArgs();
   6:     arguments.RemoteEndPoint = new DnsEndPoint(Application.Current.Host.Source.DnsSafeHost, 4530);
   7:  
   8:     var socketAsObservable = from args in Observable.FromEvent<SocketAsyncEventArgs>(
   9:                                  ev => arguments.Completed += ev,
  10:                                  ev => arguments.Completed -= ev)
  11:                              select args.EventArgs;
  12:  
  13:     socketAsObservable
  14:         .Where(args => args.LastOperation == SocketAsyncOperation.Connect)
  15:         .Subscribe(
  16:         args =>
  17:         {
  18:             args.Dispose();
  19:             this.Receive();
  20:         });
  21:  
  22:     this.Socket.ConnectAsync(arguments);
  23: }

In the line #3 it is created the Socket with the common parameter that is used in Silverlight for a TCP channel. Then an instance of the SocketAsyncEventArgs is initialized and its property RemoteEndPoint is provided with and instance of DnsEndPoint that represente the address of the server to connect to. The port is 4530 like we defined in the previour article.

At this point it is created a stream, called socketAsObservable, from the Completed event of the SocketAsyncEventArgs. the stream filter the LastOperation to be "Connect", as we expect and in the Subscribe method disposes the SocketAsyncEventArgs instance and starts to receive data. The SocketAsyncEventArgs instance can be used only once so we have to carefully dispose it to avoid memory leaks.

Once the connection has successfully established you have to put your socket in Receive. When incoming data is detected you will be notified and you can fetch it from the SocketAsyncEventArgs and put again the socket in Receive. Here is how it appear with Reactive Extensions:

   1: protected void Receive()
   2: {
   3:     SocketAsyncEventArgs arguments = new SocketAsyncEventArgs();
   4:     arguments.SetBuffer(new byte[1024], 0, 1024);
   5:  
   6:     var socketAsObservable = from args in Observable.FromEvent<SocketAsyncEventArgs>(
   7:                                  ev => arguments.Completed += ev,
   8:                                  ev => arguments.Completed -= ev)
   9:                              select args.EventArgs;
  10:  
  11:     socketAsObservable
  12:         .Where(args => args.LastOperation == SocketAsyncOperation.Receive)
  13:         .Throttle(TimeSpan.FromMilliseconds(500))
  14:         .ObserveOnDispatcher()
  15:         .Subscribe(OnReceive);
  16:  
  17:     if (this.Socket.Connected)
  18:         this.Socket.ReceiveAsync(arguments);
  19: }

Once again we create the instance of the SocketAsyncEventArgs. It is now initialized with a SetBuffer that allocates a buffer of 1 KByte that is used by the socket to compy the incoming data. Then the socketAsObservable is created using the FromEvent; This method is important because it is in charge of attaching and detaching the Completed event so we avoid to have unwanted delegates around. Again the socketAsObservable is filtered selecting only when LastOperation equals to Receive and we also apply a Throttling of about 500 milliseconds. It is made to discart updated when they are too fast. One update every half a second suffice to say that the UI is up to date with the server.

Fially we marshal the stream to the UI Thread using the ObserveOnDispatcher and the received events are forwarded to the OnReceive method that is responsible of parsing the received data and updating the UI.

   1: protected void OnReceive(SocketAsyncEventArgs args)
   2: {
   3:     string data = Encoding.UTF8.GetString(args.Buffer, 0, args.BytesTransferred);
   4:  
   5:     IEnumerable<Sample> samples = this.GetSamples(ref data);
   6:  
   7:     Array.Clear(args.Buffer, 0, 1024);
   8:  
   9:     if (data.Length > 0)
  10:     {
  11:         byte[] bytes = Encoding.UTF8.GetBytes(data);
  12:         Array.Copy(bytes, args.Buffer, bytes.Length);
  13:         args.SetBuffer(bytes.Length, 1024 - bytes.Length);
  14:     }
  15:     else
  16:         args.SetBuffer(0, 1024);
  17:  
  18:     if (this.Socket.Connected)
  19:         this.Socket.ReceiveAsync(args);
  20:  
  21:     this.Update(samples);
  22: }

The method peek up the received data and then calls again the ReceiveAsync method of the connection to make again the channel ready to receive other informations. Then the Update method is called to update the properties binded to the view.

The code is simple and obviously it need some additional check, as an example it needs to verify the communication errors that here are swallowed. I hope it shows, once again, how reactive extension can help you to simplify the consumption of asynchronous streams.

A TCP Server with Reactive Extensions

2011-06-17T14:17:05+01:00 by codeblock

I know, from the title of this post you may understand that the topic I’m about to cover is not really Silverlight related. Please be patient and it will become clear that there is a relation with Silverlight in the next part when I will complete my exposition.

After a long pause, today I return to the Reactive Extensions, I’ve temporarily shelved for lack of time, and I want to present the implementation of a little server that uses a socket and takes advantage of the Extensions. A socket server usually listens to a TCP port and it can be easily consumed by a Silverlight application. This is not a usual scenario because often it is hard to get rid of firewalls and security people in a company, so the use of HTTP is way better in most of the cases. But, sometimes, tipically when you need a reactive communication that let the application to be always up to date with events on the server, the use of a Socket is an effective soluzione

For such people that are not aware of how a socket server works, a brief explanation. A regular socket server usually opens a port on the address it is binded. The port is represented by a number between 1024 and 65535 (numbers lower than 1024 are reserved) and it is expressed after the ip or dns name separated by a colon.

The server initially reserves the port and starts to listen for incoming connections. Until the arrive of a connection the server is substantially idle but when the connection is detected it has to "accept" the request and initialize the channel that will be used by both the parts to exchange the informations.

The acceptance phase of the server must handle the new connection like it is a completely new channel, so the problem is that the server will have a main channel where it accept connections and a number of channels, one for each client that has requested the connection. Usually this means that the server must spawn a new thread for each connection it accepts and the thread will separately handle its channel leaving the main thread free of returning to the listening phase.

Using Reactive Extension properly makes these operations very easy and the code required to handle the listening and acceptance is really compact. In my sample I use a Console Application that starts the listening ad finally remain available until someone hit enter and definitely closes the application. In the following box the main function:

   1: static CancellationTokenSource cts = new CancellationTokenSource();
   2:  
   3: /// <summary>
   4: /// Mains the specified args.
   5: /// </summary>
   6: /// <param name="args">The args.</param>
   7: static void Main(string[] args)
   8: {
   9:     if (IPAddress.Loopback != null)
  10:     {
  11:         TcpListener listener = new TcpListener(IPAddress.Loopback, 4530);
  12:         listener.Start();
  13:  
  14:         Console.WriteLine("Service is listening on {0} port {1}.", IPAddress.Loopback, 4530);
  15:         Console.WriteLine("Press enter to stop");
  16:  
  17:         Func<IObservable<TcpClient>> accept =
  18:             Observable.FromAsyncPattern<TcpClient>(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient);
  19:  
  20:         accept()
  21:             .Where(o => o.Connected)
  22:             .ObserveOn(Scheduler.NewThread)
  23:             .Subscribe(acceptedClient => OnAccept(acceptedClient, accept));
  24:  
  25:         Console.ReadLine();
  26:         cts.Cancel();
  27:     }
  28: }

In the first lines the server binds to the port 4530 of the localhost ip (127.0.0.1) and it starts to listen using the Start method. Then using the FromAsyncPattern method it is created an observable source that will notify once a new connection is detected. The observable will give the channel ready to communicate. So, the Subcribe method forward the channel to the OnAccept method that is responsible to feed the channel with informations retrieved by the computer (in this case if gives cpu usage and memory size in realtime). Obviously this is the operation we should spawn in a separate thread and the trick here is done by the ObserveOn(Scheduler.NewThread) that automatically initialize and start the thread using the body of the OnAccept method. Let me continue with the OnAccept method:

   1: static void OnAccept(TcpClient client, Func<IObservable<TcpClient>> accept)
   2: {
   3:     accept()
   4:         .Where(o => o.Connected)
   5:         .ObserveOn(Scheduler.NewThread)
   6:         .Subscribe(acceptedClient => OnAccept(acceptedClient, accept));
   7:  
   8:     Console.WriteLine("channel opened");
   9:  
  10:     while (!cts.Token.WaitHandle.WaitOne(100))
  11:     {
  12:         try
  13:         {
  14:             if (client.Connected)
  15:             {
  16:                 byte[] message = GetMessage();
  17:                 client.Client.Send(message);
  18:             }
  19:         }
  20:         catch (Exception ex)
  21:         {
  22:             Console.WriteLine(ex.Message);
  23:             break;
  24:         }
  25:     }
  26:  
  27:     Console.WriteLine("channel closed");
  28:     client.Close();
  29: }

As you can see the OnAccept method receives the connected TcpClient side by side with the instance of the observable and in the very first lines puts the main thread again on listen, just before starting its work. This operation closes the loop because the listener will continue to wait for new connections and the new thread will send the informations along the channel.

To gracefully handle the closure of the server, once you hit the enter key a CancellationTokenSource is used to forward to alle the threads the request of exit. The method GetMessage simply reads the informations of cpu and memory and translates them to a byte array that is sent on the channel.

   1: const ulong Mega = 1048576;
   2: static PerformanceCounter cpuCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total");
   3:  
   4: static byte[] GetMessage()
   5: {
   6:     ulong totalPhisicalMemory = 0;
   7:     ulong availablePhisicalMemory = 0;
   8:     ulong totalVirtualMemory = 0;
   9:     ulong availableVirtualMemory = 0;
  10:     float cpu = cpuCounter.NextValue();
  11:  
  12:     NativeMethods.MEMORYSTATUSEX memStatus = new NativeMethods.MEMORYSTATUSEX();
  13:  
  14:     if (NativeMethods.GlobalMemoryStatusEx(memStatus))
  15:     {
  16:         totalPhisicalMemory = memStatus.ulTotalPhys / Mega;
  17:         availablePhisicalMemory = memStatus.ulAvailPhys / Mega;
  18:         totalVirtualMemory = memStatus.ulTotalVirtual / Mega;
  19:         availableVirtualMemory = memStatus.ulAvailVirtual / Mega;
  20:     }
  21:  
  22:     return Encoding.UTF8.GetBytes(
  23:         string.Format(
  24:             CultureInfo.InvariantCulture,
  25:             "{0:0.0}:{1}:{2}:{3}:{4}{5}",
  26:             cpu,
  27:             totalPhisicalMemory,
  28:             availablePhisicalMemory,
  29:             totalVirtualMemory,
  30:             availableVirtualMemory,
  31:             Environment.NewLine));
  32: }

To have a try with the server please runs the code and then open a command prompt and type the following

telnet 127.0.0.1 4530
 

You will see a continuous stream of informations displayed on the console and you can also try to run multiple command prompt with the same instruction and all the connections will be connected. This example shows the powerfulness of the reactive extensions. In the next post I will show you how to use the library to consume this socket.