Daily Archives: January 28, 2014

Protocol Buffers (part 2, advanced), Tcp Networking

There has been no post yesterday. As usual I am preparing the source code for you during my spare time. This is not easy in my current environment. I will have to slow down the post frequency as the source codes are getting longer and more complex.

So, what is on the agenda now? Today we are going to send serialized binary data across the network. I am using the localhost to have a running example. Simply amend the local IP-addresses “127.0.0.1” to whatever suits you.

I wrote a small class (“ProtoType”) to identify objects quickly during the serialization and deserialization process. It is abstract and forms our base class. The class “Book” and “Fable” are nearly unchanged from the previous posts. I just added an override “ToString” in “Fable” to have a nicer output. Both classes must inherit from our abstract base class called “ProtoType”, which avoids implementing an interface and impacting the code legibility more than needed.

public class ProtoBufExample {

    public enum eType { Unknown = 0, eBook = 1, eFable };
    public abstract class ProtoType {
        public readonly eType objectId;
        public readonly byte[] objectIdAsBytes;

        public ProtoType() {
            Type t = this.GetType();
            if (t == typeof(ProtoBufExample.Book)) objectId = eType.eBook; // to identify the object before deserialization
            else if (t == typeof(ProtoBufExample.Fable)) objectId = eType.eFable; // to identify the object before deserialization
            else throw new Exception("object type unknown");
            objectIdAsBytes = BitConverter.GetBytes((Int16)objectId);
        } // constructor
    } // class 

    [ProtoContract]
    public class Book : ProtoType {            
        [ProtoMember(1)]
        public string author;
        [ProtoMember(2)]
        public List<Fable> stories;
        [ProtoMember(3)]
        public DateTime edition;
        [ProtoMember(4)]
        public int pages;
        [ProtoMember(5)]
        public double price;
        [ProtoMember(6)]
        public bool isEbook;

        public override string ToString() {
            StringBuilder s = new StringBuilder();
            s.Append("by "); s.Append(author);
            s.Append(", edition "); s.Append(edition.ToString("dd MMM yyyy"));
            s.Append(", pages "); s.Append(pages);
            s.Append(", price "); s.Append(price);
            s.Append(", isEbook "); s.Append(isEbook);
            s.AppendLine();
            if (stories != null) foreach (Fable lFable in stories) {
                    s.Append("title "); s.Append(lFable.title);
                    s.Append(", rating "); s.Append(lFable.customerRatings.Average()); // Average() is an extension method of "using System.Linq;"
                    s.AppendLine();
                }

            return s.ToString();
        } //
    } // class

    [ProtoContract]
    public class Fable : ProtoType {
        [ProtoMember(1)]
        public string title;
        [ProtoMember(2)]
        public double[] customerRatings;

        public override string ToString() {
            return "title " + title + ", rating " + customerRatings.Average();
        } //
    } // class

    public static Book GetData() {
        return new Book {
            author = "Aesop",
            price = 1.99,
            isEbook = false,
            edition = new DateTime(1975, 03, 13),
            pages = 203,
            stories = new List<Fable>(new Fable[] {
                new Fable{ title = "The Fox & the Grapes", customerRatings = new double[]{ 0.7, 0.7, 0.8} },
                new Fable{ title = "The Goose that Laid the Golden Eggs", customerRatings = new double[]{ 0.6, 0.75, 0.5, 1.0} },
                new Fable{ title = "The Cat & the Mice", customerRatings = new double[]{ 0.1, 0.0, 0.3} },
                new Fable{ title = "The Mischievous Dog", customerRatings = new double[]{ 0.45, 0.5, 0.4, 0.0, 0.5} }
            })
        };
    } //
} // class

The server-side always needs to be started before the client. Otherwise the client would not receive an immediate response from the server. We set up a listener on localhost port 65432 (arbitrary choice) and wait for a client connection. I chose the TCP protocol. It is easy to use and very reliable. We do not need to care about transmission problems. The downside is slowness. I will cover that one in comming posts.
The deserializer on the server-side cannot be run right away. We need to determine the object type first. And here we make use of our “ProtoType” class. The first two bytes of the transmission tell us the type. This is quite important for the speed of the deserialization. You see that the ProtoBuf.Serializer.DeserializeWithLengthPrefix method is generic. The type casting is minimized that way.

public class NetworkListener {
                
    private bool _ExitLoop = true;
    private TcpListener _Listener;
    public delegate void dOnMessage(object xSender, ProtoBufExample.Book xBook);
    public event dOnMessage OnMessage;
    private NetworkStream _NetworkStream = null;

    public int Port { get; private set; }
    public string IpAddress { get; private set; }
    public string ThreadName { get; private set; }

    public NetworkListener(string xIpAddress, int xPort, string xThreadName) {
        Port = xPort;
        IpAddress = xIpAddress;
        ThreadName = xThreadName;
    } //

    public bool Connect() {
        if (!_ExitLoop) {
            Console.WriteLine("Listener running already");
            return false;
        }
        _ExitLoop = false;

        try {
            _Listener = new TcpListener(IPAddress.Parse(IpAddress), Port);
            _Listener.Start();

            Thread lThread = new Thread(new ThreadStart(Loop));
            lThread.IsBackground = true;
            lThread.Name = ThreadName;
            lThread.Start();

            return true;
        }
        catch (Exception ex) { Console.WriteLine(ex.Message); }
        return false;
    } //

    public void Disconnect() {
        _ExitLoop = true;
        _NetworkStream.WriteTimeout = 5; // immediate timeout
    } //

    private void Loop() {
        try {
            while (!_ExitLoop) {
                Console.WriteLine("Waiting for a client");
                using (TcpClient lClient = _Listener.AcceptTcpClient()) {
                    string lClientIpAddress = lClient.Client.LocalEndPoint.ToString();
                    Console.WriteLine("New client connecting: " + lClientIpAddress);
                    using (_NetworkStream = lClient.GetStream()) {

                        while (!_ExitLoop) {
                            try {
                                byte[] lHeader = new byte[2];    // to indentify the object
                                if (_NetworkStream.Read(lHeader, 0, 2) != 2) break;
                                int lObjectType = BitConverter.ToInt16(lHeader, 0);
                                ProtoBufExample.eType lType = (ProtoBufExample.eType)lObjectType;
                                switch (lType) {
                                    case ProtoBufExample.eType.Unknown:
                                        break;
                                    case ProtoBufExample.eType.eBook:
                                        ProtoBufExample.Book lBook = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Book>(_NetworkStream, ProtoBuf.PrefixStyle.Fixed32);
                                        Console.WriteLine(Environment.NewLine + "received a book: ");
                                        Console.WriteLine(lBook.ToString());

                                        // raise an event
                                        dOnMessage lEvent = OnMessage;
                                        if (lEvent == null) continue;
                                        lEvent(lClient, lBook);

                                        break;
                                    case ProtoBufExample.eType.eFable:
                                        ProtoBufExample.Fable lFable = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Fable>(_NetworkStream, ProtoBuf.PrefixStyle.Fixed32);
                                        Console.WriteLine(Environment.NewLine + "received a fable: ");
                                        Console.WriteLine(lFable.ToString());
                                        break;
                                    default:
                                        Console.WriteLine("Mayday, mayday, we are in big trouble.");
                                        break;
                                }
                            }
                            catch (System.IO.IOException) {
                                if (_ExitLoop) Console.WriteLine("user requested TcpClient shutdown");
                                else Console.WriteLine("disconnected");
                            }
                            catch (Exception ex) { Console.WriteLine(ex.Message); }
                        }
                        Console.WriteLine(Environment.NewLine + "server/listener: shutting down");
                    }
                }
            }
        }
        catch (Exception ex) {
            Console.WriteLine(ex.Message);
        }
        finally {
            _ExitLoop = true;
            if (_Listener != null) _Listener.Stop();
        }
    } // 

} // class

The client side is straight forward. There is not much processing besides choosing the correct generic type for the serialization process. I used a BlockingCollection to make the context switching easier to read. It is not the fastest solution, but it definitely makes passing objects into a thread loop easy. I am personally no fan of the concurrent collections. They are not as predictable as any custom solution. _Queue.Take(); blocks and waits for data to arrive. It is thread-safe and does not require any object locking.

public class NetworkClient {
    public int Port { get; private set; }
    public string IpAddress { get; private set; }
    public string ThreadName { get; private set; }
    private NetworkStream _NetworkStream = null;
    private bool _ExitLoop = true;
    private BlockingCollection<ProtoBufExample.ProtoType> _Queue = new BlockingCollection<ProtoBufExample.ProtoType>();

    public NetworkClient(string xIpAddress, int xPort, string xThreadName) {
        Port = xPort;
        IpAddress = xIpAddress;
        ThreadName = xThreadName;
    } //

    public void Connect() {
        if (!_ExitLoop) return; // running already
        _ExitLoop = false;

        Thread lThread = new Thread(new ThreadStart(Loop));
        lThread.IsBackground = true;
        lThread.Name = ThreadName;
        lThread.Start();
    } //

    public void Disconnect() {
        _ExitLoop = true;
        _Queue.Add(null);
        if (_NetworkStream != null) _NetworkStream.ReadTimeout = 100;
    } //

    public void Send(ProtoBufExample.ProtoType xObject) {
        if (xObject == null) return;
        _Queue.Add(xObject);
    } //

    private void Loop() {
        try {
            using (TcpClient lClient = new TcpClient()) {
                lClient.Connect(IpAddress, Port);
                using (_NetworkStream = lClient.GetStream()) {

                    while (!_ExitLoop) {
                        try {
                            ProtoBufExample.ProtoType lObject = _Queue.Take();
                            if (lObject == null) break;

                            switch (lObject.objectId) {
                                case ProtoBufExample.eType.eBook:
                                    _NetworkStream.Write(lObject.objectIdAsBytes, 0, 2);
                                    ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Book>(_NetworkStream, (ProtoBufExample.Book)lObject, ProtoBuf.PrefixStyle.Fixed32);
                                    break;
                                case ProtoBufExample.eType.eFable:
                                    _NetworkStream.Write(lObject.objectIdAsBytes, 0, 2);
                                    ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Fable>(_NetworkStream, (ProtoBufExample.Fable)lObject, ProtoBuf.PrefixStyle.Fixed32);
                                    break;
                                default:
                                    break;
                            }
                        }
                        catch (System.IO.IOException) {
                            if (_ExitLoop) Console.WriteLine("user requested TcpClient shutdown.");
                            else Console.WriteLine("disconnected");
                        }
                        catch (Exception ex) { Console.WriteLine(ex.Message); }
                    }
                    _ExitLoop = true;
                    Console.WriteLine(Environment.NewLine + "client: shutting down");
                }
            }
        }
        catch (Exception ex) { Console.WriteLine(ex.Message); }
    } // 

} // class

The main program is pretty neat. We get the “Book” data and send it across the localhost. Then we take a single story of the same book and send it again to see if the program deals with the different types properly. Et voilĂ , it does.
The framing of the serialized data is very important. If you do not frame it, then you cannot determine the type. There are weird examples on the internet. My quick google research showed that people generally do not implement that centerpiece, leaving beginners in the middle of nowhere.

public static class NetworkTest {

    public static void Test() {
        NetworkListener lServer = new NetworkListener("127.0.0.1", 65432, "Server");
        NetworkClient lClient = new NetworkClient("127.0.0.1", 65432, "Client");

        lServer.Connect();
        lServer.OnMessage += new NetworkListener.dOnMessage(OnBook);

        lClient.Connect();

        // send a book across the network
        ProtoBufExample.Book lBook = ProtoBufExample.GetData();
        lClient.Send(lBook);

        // send a fable across the network
        lClient.Send(lBook.stories[1]);

        System.Threading.Thread.Sleep(1000);

        lClient.Disconnect();
        lServer.Disconnect();

        Console.ReadLine();
    }

    static void OnBook(object xSender, ProtoBufExample.Book xBook) {
        Console.WriteLine("Book event was raised");
    } //
} //

example output:
Waiting for a client
New client connecting: 127.0.0.1:65432

received a book:
by Aesop, edition 13 Mar 1975, pages 203, price 1.99, isEbook False
title The Fox & the Grapes, rating 0.733333333333333
title The Goose that Laid the Golden Eggs, rating 0.7125
title The Cat & the Mice, rating 0.133333333333333
title The Mischievous Dog, rating 0.37

Book event was raised

received a fable:
title The Goose that Laid the Golden Eggs, rating 0.7125

client: shutting down

server/listener: shutting down

There is one tiny issue with the OnBook event. We receive data on a thread and block the thread as long as it takes to process the event. To avoid thread bottlenecks you should think about forwarding “Book” events to tasks and let them deal with whatever needs to be done asynchronously. I posted about tasks eg. here. ThreadPools and the Parallel class might also be interesting posts in this context.