XAML Playground
about XAML and other Amenities

A TCP Server with Reactive Extensions

2011-06-17T14:17:05+01:00 by codeblock

I know, from the title of this post you may understand that the topic I’m about to cover is not really Silverlight related. Please be patient and it will become clear that there is a relation with Silverlight in the next part when I will complete my exposition.

After a long pause, today I return to the Reactive Extensions, I’ve temporarily shelved for lack of time, and I want to present the implementation of a little server that uses a socket and takes advantage of the Extensions. A socket server usually listens to a TCP port and it can be easily consumed by a Silverlight application. This is not a usual scenario because often it is hard to get rid of firewalls and security people in a company, so the use of HTTP is way better in most of the cases. But, sometimes, tipically when you need a reactive communication that let the application to be always up to date with events on the server, the use of a Socket is an effective soluzione

For such people that are not aware of how a socket server works, a brief explanation. A regular socket server usually opens a port on the address it is binded. The port is represented by a number between 1024 and 65535 (numbers lower than 1024 are reserved) and it is expressed after the ip or dns name separated by a colon.

The server initially reserves the port and starts to listen for incoming connections. Until the arrive of a connection the server is substantially idle but when the connection is detected it has to "accept" the request and initialize the channel that will be used by both the parts to exchange the informations.

The acceptance phase of the server must handle the new connection like it is a completely new channel, so the problem is that the server will have a main channel where it accept connections and a number of channels, one for each client that has requested the connection. Usually this means that the server must spawn a new thread for each connection it accepts and the thread will separately handle its channel leaving the main thread free of returning to the listening phase.

Using Reactive Extension properly makes these operations very easy and the code required to handle the listening and acceptance is really compact. In my sample I use a Console Application that starts the listening ad finally remain available until someone hit enter and definitely closes the application. In the following box the main function:

   1: static CancellationTokenSource cts = new CancellationTokenSource();
   2:  
   3: /// <summary>
   4: /// Mains the specified args.
   5: /// </summary>
   6: /// <param name="args">The args.</param>
   7: static void Main(string[] args)
   8: {
   9:     if (IPAddress.Loopback != null)
  10:     {
  11:         TcpListener listener = new TcpListener(IPAddress.Loopback, 4530);
  12:         listener.Start();
  13:  
  14:         Console.WriteLine("Service is listening on {0} port {1}.", IPAddress.Loopback, 4530);
  15:         Console.WriteLine("Press enter to stop");
  16:  
  17:         Func<IObservable<TcpClient>> accept =
  18:             Observable.FromAsyncPattern<TcpClient>(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient);
  19:  
  20:         accept()
  21:             .Where(o => o.Connected)
  22:             .ObserveOn(Scheduler.NewThread)
  23:             .Subscribe(acceptedClient => OnAccept(acceptedClient, accept));
  24:  
  25:         Console.ReadLine();
  26:         cts.Cancel();
  27:     }
  28: }

In the first lines the server binds to the port 4530 of the localhost ip (127.0.0.1) and it starts to listen using the Start method. Then using the FromAsyncPattern method it is created an observable source that will notify once a new connection is detected. The observable will give the channel ready to communicate. So, the Subcribe method forward the channel to the OnAccept method that is responsible to feed the channel with informations retrieved by the computer (in this case if gives cpu usage and memory size in realtime). Obviously this is the operation we should spawn in a separate thread and the trick here is done by the ObserveOn(Scheduler.NewThread) that automatically initialize and start the thread using the body of the OnAccept method. Let me continue with the OnAccept method:

   1: static void OnAccept(TcpClient client, Func<IObservable<TcpClient>> accept)
   2: {
   3:     accept()
   4:         .Where(o => o.Connected)
   5:         .ObserveOn(Scheduler.NewThread)
   6:         .Subscribe(acceptedClient => OnAccept(acceptedClient, accept));
   7:  
   8:     Console.WriteLine("channel opened");
   9:  
  10:     while (!cts.Token.WaitHandle.WaitOne(100))
  11:     {
  12:         try
  13:         {
  14:             if (client.Connected)
  15:             {
  16:                 byte[] message = GetMessage();
  17:                 client.Client.Send(message);
  18:             }
  19:         }
  20:         catch (Exception ex)
  21:         {
  22:             Console.WriteLine(ex.Message);
  23:             break;
  24:         }
  25:     }
  26:  
  27:     Console.WriteLine("channel closed");
  28:     client.Close();
  29: }

As you can see the OnAccept method receives the connected TcpClient side by side with the instance of the observable and in the very first lines puts the main thread again on listen, just before starting its work. This operation closes the loop because the listener will continue to wait for new connections and the new thread will send the informations along the channel.

To gracefully handle the closure of the server, once you hit the enter key a CancellationTokenSource is used to forward to alle the threads the request of exit. The method GetMessage simply reads the informations of cpu and memory and translates them to a byte array that is sent on the channel.

   1: const ulong Mega = 1048576;
   2: static PerformanceCounter cpuCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total");
   3:  
   4: static byte[] GetMessage()
   5: {
   6:     ulong totalPhisicalMemory = 0;
   7:     ulong availablePhisicalMemory = 0;
   8:     ulong totalVirtualMemory = 0;
   9:     ulong availableVirtualMemory = 0;
  10:     float cpu = cpuCounter.NextValue();
  11:  
  12:     NativeMethods.MEMORYSTATUSEX memStatus = new NativeMethods.MEMORYSTATUSEX();
  13:  
  14:     if (NativeMethods.GlobalMemoryStatusEx(memStatus))
  15:     {
  16:         totalPhisicalMemory = memStatus.ulTotalPhys / Mega;
  17:         availablePhisicalMemory = memStatus.ulAvailPhys / Mega;
  18:         totalVirtualMemory = memStatus.ulTotalVirtual / Mega;
  19:         availableVirtualMemory = memStatus.ulAvailVirtual / Mega;
  20:     }
  21:  
  22:     return Encoding.UTF8.GetBytes(
  23:         string.Format(
  24:             CultureInfo.InvariantCulture,
  25:             "{0:0.0}:{1}:{2}:{3}:{4}{5}",
  26:             cpu,
  27:             totalPhisicalMemory,
  28:             availablePhisicalMemory,
  29:             totalVirtualMemory,
  30:             availableVirtualMemory,
  31:             Environment.NewLine));
  32: }

To have a try with the server please runs the code and then open a command prompt and type the following

telnet 127.0.0.1 4530
 

You will see a continuous stream of informations displayed on the console and you can also try to run multiple command prompt with the same instruction and all the connections will be connected. This example shows the powerfulness of the reactive extensions. In the next post I will show you how to use the library to consume this socket.