Category Archives: TBB

migration C#, Java, C++ (day 8), TCP, Qt

The last two days were characterized by incompatibilities. There is no TCP support in the C++ standard libraries. I knew why I tried to avoid Boost and Qt. Visual Studio 2013 has its problems with Boost. You get really weird error messages that cannot be fixed at all. Why should Visual Studio actively support Boost? They have their own libraries, which are pretty cool.
I was fighting with several compilers and tools that generated roughly 50 GB of library data in total. Finally I gave up as it is really as mad as it is described on some web pages. I ended up installing the Qt Creator and had to rewrite the entire C++ code. I have to say that Qt is structured in a much better way than Boost. Overall the C++ coding took roughly 20 times longer than the C# coding, which was completed within 20 minutes. Most of that time was due to the incompatibility fight.
C++ executes faster, but let’s take a company’s point of view. If someone invests 250,000 USD in extra production time, wouldn’t it be smarter to buy 10x faster computers instead?
In the stock market, there are many traders telling you that it is all about speed. Not really! Data is coming in a sequence. I do know many situations where you have to slow down the system to solve specific sequence problems. Most market participant aren’t even aware of this.

Today I compare two TCP chat programs. C# is straight forward. The C++ side looks a bit weird, because the Qt Q_OBJECT macro requires header files. I did not want to split up the code into many pieces, so I kept the code short and I also kept it in the header sections.

ADDENDUM 21 May 2014

The Java code has been added today. The code structure is better than the C# example. The short Java test program deals with 1 server and 2 clients.

Below the Java code you can find a second C# example that uses the improved object approach similar to the Java code.

TCP

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace DemoApp.ToCpp {
   public class Day8 {

      public class NetworkListener {

         private volatile bool _ExitLoop = true;
         private TcpListener _Listener;

         public int Port { get; private set; }
         public string IpAddress { get; private set; }
         public string ThreadName { get; private set; }
         private List<Client> _AllClients = new List<Client>();

         public NetworkListener(string xIpAddress, int xPort, string xThreadName) {
            Port = xPort;
            IpAddress = xIpAddress;
            ThreadName = xThreadName;
         } //

         private class Client {
            private TcpClient _TcpClient;
            private NetworkStream _NetworkStream;
            public delegate void dOnShutDown(Client xClient);
            public event dOnShutDown OnShutDown;

            private volatile bool _ExitLoop = false;
            public readonly string IpAddress;

            public Client(TcpClient xTcpClient) {
               _TcpClient = xTcpClient;
               IpAddress = xTcpClient.Client.LocalEndPoint.ToString();
               Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " new client connected.");
            } //

            public void Disconnect() {
               _ExitLoop = true;
               dOnShutDown lEvent = OnShutDown;
               if (lEvent != null) lEvent(this);
               if (_NetworkStream != null) _NetworkStream.WriteTimeout = 5; // immediate timeout
            } //

            public void ListenAsync() {
               Thread lThread = new Thread(new ThreadStart(Listen));
               lThread.IsBackground = true;
               lThread.Name = "Client_" + IpAddress;
               lThread.Start();
            } //

            private void Listen() {
               using (_NetworkStream = _TcpClient.GetStream()) {
                  using (StreamReader lStreamReader = new StreamReader(_NetworkStream)) {
                     while (!_ExitLoop) {
                        try {
                           string s = lStreamReader.ReadLine();
                           if (s == null) break;
                           Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " Client said: " + s);
                        }
                        catch (IOException) {
                           if (_ExitLoop) Console.WriteLine("SERVER MESSAGE -> user requested TcpClient shutdown");
                           else Console.WriteLine("SERVER MESSAGE -> disconnected");
                        }
                        catch (Exception ex) { Console.WriteLine(ex.Message); }
                     }
                  }
                  Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " Listener is shutting down");
               }
               _NetworkStream = null;
            } //
         } // class

         public bool WaitForClientsAsync() {
            if (!_ExitLoop) {
               Console.WriteLine("SERVER MESSAGE -> Listener running already");
               return false;
            }
            _ExitLoop = false;

            try {
               _Listener = new TcpListener(IPAddress.Parse(IpAddress), Port);
               _Listener.Start();

               Thread lThread = new Thread(new ThreadStart(WaitForAClient));
               lThread.IsBackground = true;
               lThread.Name = ThreadName;
               lThread.Start();

               return true;
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
            return false;
         } //

         public void Disconnect() {
            _ExitLoop = true;
            List<Client> lClone; // we need a clone, because the _AllClients collection will be manipulated during the foreach loop
            lock (_AllClients) { lClone = new List<Client>(_AllClients); }
            foreach (Client lClient in lClone) lClient.Disconnect();
            _Listener.Stop();
         } //

         private void WaitForAClient() {
            try {
               while (!_ExitLoop) {
                  int lClientCount;
                  lock (_AllClients) lClientCount = _AllClients.Count;
                  Console.WriteLine("SERVER MESSAGE -> Waiting for client number " + lClientCount);
                  TcpClient lTcpClient = _Listener.AcceptTcpClient();
                  Client lClient = new Client(lTcpClient);
                  lClient.OnShutDown += OnClientShutDown;
                  lock (_AllClients) _AllClients.Add(lClient);
                  lClient.ListenAsync(); // starts a background thread
               }
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         }

         void OnClientShutDown(NetworkListener.Client xClient) {
            lock (_AllClients) { _AllClients.Remove(xClient); }
         } //         

      } // class

      public class NetworkClient {
         public int Port { get; private set; }
         public string IpAddress { get; private set; }
         public string ThreadName { get; private set; }

         private NetworkStream _NetworkStream = null;
         private bool _ExitLoop = true;
         private BlockingCollection<string> _Queue = new BlockingCollection<string>();

         public NetworkClient(string xIpAddress, int xPort, string xThreadName) {
            Port = xPort;
            IpAddress = xIpAddress;
            ThreadName = xThreadName;
         } //

         public void ConnectAsync() {
            if (!_ExitLoop) return; // running already
            _ExitLoop = false;

            Thread lThread = new Thread(new ThreadStart(Loop));
            lThread.IsBackground = true;
            lThread.Name = ThreadName;
            lThread.Start();
         } //

         public void Disconnect() {
            _ExitLoop = true;
            _Queue.Add(null);
            if (_NetworkStream != null) _NetworkStream.ReadTimeout = 100;
         } //

         public void Send(string xText) {
            if (string.IsNullOrEmpty(xText)) return;
            _Queue.Add(xText); // thread safety by context switching
         } //

         private void Loop() {
            try {
               using (TcpClient lClient = new TcpClient()) {
                  lClient.Connect(IpAddress, Port);
                  using (_NetworkStream = lClient.GetStream()) {
                     using (StreamWriter lStreamWriter = new StreamWriter(_NetworkStream)) {
                        while (!_ExitLoop) {
                           try {
                              string s = _Queue.Take();
                              if (string.IsNullOrEmpty(s)) break;
                              lStreamWriter.WriteLine(s);
                           }
                           catch (System.IO.IOException) {
                              if (_ExitLoop) Console.WriteLine("CLIENT -> User requested shutdown.");
                              else Console.WriteLine("CLIENT -> disconnected");
                           }
                           catch (Exception ex) { Console.WriteLine(ex.Message); }
                        }
                     }
                     _ExitLoop = true;
                     Console.WriteLine(Environment.NewLine + "CLIENT -> shutting down");
                  }
               }
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         } //

      } // class


      public static void Test() {
         NetworkListener lServer = new NetworkListener("127.0.0.1", 65432, "Server");
         NetworkClient lClient = new NetworkClient("127.0.0.1", 65432, "Client");

         lServer.WaitForClientsAsync();

         lClient.ConnectAsync();
         lClient.Send("Hello World!"); // send a text message across the network

         System.Threading.Thread.Sleep(1000);

         lClient.Disconnect();
         lServer.Disconnect();

         Console.ReadLine();
      } //

   } // class
} // namespace

example output:
SERVER MESSAGE -> Waiting for client number 0
SERVER MESSAGE -> 127.0.0.1:65432 new client connected.
SERVER MESSAGE -> Waiting for client number 1
SERVER MESSAGE -> 127.0.0.1:65432 Client said: Hello World!

CLIENT -> shutting down
SERVER MESSAGE -> 127.0.0.1:65432 Listener is shutting down
A blocking operation was interrupted by a call to WSACancelBlockingCall

#pragma once

#include <iostream>
#include <QtNetwork>
#include <QObject>
#include <QString>
#include <QTcpSocket>
#include <string>

using namespace std;

class Client : QObject {
    Q_OBJECT

private:
  int _Port = 0;
  QString _IpAddress;
  QTcpSocket _TcpSocket;

private slots:
    void Connected() { cout << "CLIENT MESSAGE -> connection established " << _IpAddress.toStdString() << ":" << _Port << endl; }

public:
   inline int getPort() { return _Port; }
   inline QString getIpAddress() { return _IpAddress; }

   Client(QString xIpAddress, int xPort) : QObject() {
    _IpAddress = xIpAddress;
    _Port = xPort;
   }
   ~Client() { cout << "deconstructing Client class " << _IpAddress.toStdString() << endl; }
   Client(QObject* lParent): QObject(lParent) { cout << "Client :)" << endl; }

  void Connect() {
      connect(&_TcpSocket, SIGNAL(connected()), this, SLOT(Connected()));
      QHostAddress lHostAddress(_IpAddress);
      _TcpSocket.connectToHost(lHostAddress, _Port);
      while (!_TcpSocket.waitForConnected( 2000 )) {
        cout << "CLIENT MESSAGE -> waiting for a feedback" << endl;
      }
  }

  void Disconnect() {
    _TcpSocket.close();
  } //

  void Send(const string &xText) {
    if (xText == "") return;
    const char *lText = xText.c_str();
    _TcpSocket.write(lText, xText.length() + 1);
  } //

};

//----------------------------------------------------------------------------------------------------------------------

#pragma once

#include <QTcpSocket>
#include <QHostAddress>
#include <thread>
#include <set>
#include <iostream>
#include <QtNetwork>

using namespace std;

class Connection : QObject {
    Q_OBJECT

  private:
    QTcpSocket *_TcpSocket;
    string _IpAddress;

    void Read() {
      try {
        char lBuffer[1024] = {0};
        _TcpSocket->read(lBuffer, _TcpSocket->bytesAvailable());
        cout << "SERVER MESSAGE -> " << _IpAddress << " Client said: " << lBuffer << endl;
      }
        catch (exception &ex) { cout << ex.what() << endl; }
    } //


  public:
    const string &IpAddress() const { return _IpAddress; }

    void Disconnect() {
      _TcpSocket->close();
      delete _TcpSocket;
    } //

    Connection(QTcpSocket *xTcpSocket) : _TcpSocket(xTcpSocket) {
      QHostAddress lAddress = xTcpSocket->localAddress();
      QString s = lAddress.toString();
      _IpAddress = s.toLocal8Bit().constData();
      connect(xTcpSocket, SIGNAL(readyRead()), this, SLOT(Read()));  // setup event
      cout << "SERVER MESSAGE -> " + _IpAddress << " new client connected." << endl;
    } //
    Connection(QObject* lParent): QObject(lParent) { cout << "Connection :)"; }
    ~Connection() { Disconnect(); cout << "deconstructing Connection class " << _IpAddress << endl; }

 }; // class

//----------------------------------------------------------------------------------------------------------------------

#pragma once

#include <iostream>
#include <QtNetwork>
#include <QObject>
#include <QString>
#include <QTcpSocket>
#include <QTcpServer>
#include <string>
#include "Connection.cpp"
#include <mutex>


using namespace std;

class Server : QObject {
    Q_OBJECT

private:
    list<Connection*> _AllClients;
    mutex mutex_AllClients;
    int _Port;
    string _IpAddress;
    QTcpServer _TcpServer;

    void OnClientShutDown(Connection *xClient) {
        cout << "SERVER MESSAGE -> Connection shutting down" << endl;
        mutex_AllClients.lock();
        _AllClients.remove(xClient);
        mutex_AllClients.unlock();
        xClient->Disconnect();
    } //

private slots:
    void NewConnection() {
        cout << "SERVER MESSAGE -> new connection" << endl;
        QTcpSocket *lSocket = _TcpServer.nextPendingConnection();
        Connection *lClient = new Connection(lSocket);
        mutex_AllClients.lock();
        _AllClients.push_back(lClient);
        mutex_AllClients.unlock();
    } //

public:    
    Server(QObject* lParent): QObject(lParent) { cout << "Server :)"; }
    ~Server() { cout << "deconstructing Server class " << endl; }
    inline int getPort() { return _Port; }
    inline string getIpAddress() { return _IpAddress; }    

    Server(string xIpAddress, int xPort) : QObject() {
      _Port = xPort;
      _IpAddress = xIpAddress;
      connect(&_TcpServer, SIGNAL(newConnection()), this, SLOT(NewConnection())); // setup event
      QString lIpAddress(xIpAddress.c_str());
      //QHostAddress lHostAddress(lIpAddress);
      if (_TcpServer.listen(QHostAddress::Any, xPort)) {
      //if(_TcpServer.listen(lHostAddress, xPort)) {
        cout << "SERVER MESSAGE -> listener up and running" << endl;
      }
    } //

    void Disconnect() {
      mutex_AllClients.lock();
      list<Connection*> lClone;
      list<Connection*>::const_iterator lIterator;
      for (lIterator = _AllClients.begin(); lIterator != _AllClients.end(); ++lIterator) {
        lClone.push_back(*lIterator);  // we need a clone, because the _AllClients collection will be manipulated during the foreach loop
      }
      mutex_AllClients.unlock();

      for (Connection *lClient : lClone) lClient->Disconnect();
      _TcpServer.close();
    } //

}; // class

//----------------------------------------------------------------------------------------------------------------------

#include <QApplication>
#include <iostream>
#include <thread>
#include "Server.h"
#include "Client.h"

int main(int argc, char** argv)
{
  QApplication app(argc, argv);
  try {

    Server lServer("127.0.0.1", 65432);
    Client lClient("127.0.0.1", 65432);

    lClient.Connect();    

    this_thread::sleep_for(chrono::milliseconds(1000)) ;
    lClient.Send("Hello World!"); // send a text message across the network
    this_thread::sleep_for(chrono::milliseconds(1000)) ;

    lClient.Disconnect();
    lServer.Disconnect();

    cout << "press enter to exit" << endl;
    cin.get();
  }
  catch (exception &ex) { cerr << ex.what() << endl; }
  return app.exec();
} //
package DemoApp;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;

public final class Server extends Thread {

  private final ArrayList<Client> _Clients = new ArrayList<>();
  public final int port;
  ServerSocket _ServerSocket;

  public Server(String xName, int xPort) throws IOException {
    super(xName);
    port = xPort;
    setDaemon(true);
    _ServerSocket = new ServerSocket(xPort);
  } // constructor

  public void close() throws IOException {
    this.interrupt(); // exit loop
    synchronized (_Clients) {
      for (Client lClient : _Clients) lClient.close();
      _Clients.clear();
    }

    _ServerSocket.close();
    System.out.println(_ServerSocket.getLocalSocketAddress().toString() + " server closed down");
  } //

  public void broadcast(String xMessage) {
    synchronized (_Clients) {
      for (Client lClient : _Clients)
        lClient.getWriter().send(xMessage);
    }
  } //

  @Override
  public void run() {
    System.out.println(_ServerSocket.getLocalSocketAddress().toString() + " server is waiting for clients");

    try {
      while (true) {
        Socket lSocket = _ServerSocket.accept();  // wait for a client to connect
        Client lClient = new Client(lSocket);
        synchronized (_Clients) {
          _Clients.add(lClient);
        }
        lClient.connect();
      }
    } catch (IOException ex) {
      //System.out.println(ex.getMessage());
    }
  } //

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.IOException;
import java.net.Socket;

public class Client {

  private Socket _Socket = null;
  public final String IpAddressHost;
  public final String IpAddressLocal;
  public final String IpConnectionString;
  private Reader _Reader = null;
  private Writer _Writer = null;

  public Socket getSocket() {
    return _Socket;
  } //

  public Reader getReader() {
    return _Reader;
  } //

  public Writer getWriter() {
    return _Writer;
  } //

  public void close() throws IOException {
    synchronized (this) {
      if (_Reader != null) _Reader.close(true);
      _Reader = null;
      if (_Writer != null) _Writer.close(true);
      _Writer = null;
    }

    System.out.println(IpConnectionString + " client closed down");
  } //

  public Client(Socket xSocket) {
    _Socket = xSocket;
    IpAddressHost = xSocket.getInetAddress().getHostAddress() + ":" + _Socket.getPort();
    IpAddressLocal = xSocket.getLocalAddress().getHostAddress() + ":" + _Socket.getLocalPort();
    IpConnectionString = "(Local " + IpAddressLocal + ",  Host " + IpAddressHost + ")";
  } //

  public void connect() throws IOException {
    _Reader = new Reader(this);
    _Writer = new Writer(this);
    _Reader.start();  // start listening
    _Writer.start();  // start write loop        
  } //  

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class Reader extends Thread {

  private final Client _Client;
  private BufferedReader _StreamReader;

  public Reader(Client xClient) throws IOException {
    _Client = xClient;
    Socket lSocket = xClient.getSocket();
    _StreamReader = new BufferedReader(new InputStreamReader(lSocket.getInputStream()));
  } // constructor

  public void close(boolean xExitLoop) throws IOException {
    synchronized (this) {
      if (xExitLoop) interrupt();
      if (_StreamReader == null) return;
      _StreamReader.close();
      _StreamReader = null;
    }

    System.out.println(_Client.IpConnectionString + " closed reader connection");
  } //

  @Override
  public void run() {
    System.out.println(_Client.IpConnectionString + " start new reader connection");

    while (true)
      try {
        String lInput = _StreamReader.readLine();
        if (lInput == null) break;
        if (lInput.isEmpty()) continue;

        System.out.println(_Client.IpConnectionString + " message received: " + lInput);
        if ("EXIT".equals(lInput)) break;

      } catch (IOException | RuntimeException ex) {
        System.out.println(ex);
      }
    try {
      close(false);
    } catch (IOException ex) {
    }
  } //

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.LinkedList;

public class Writer extends Thread {

  private final Client _Client;
  private PrintWriter _PrintWriter;

  private final LinkedList<String> _WriteQueue = new LinkedList<>();

  public Writer(Client xClient) throws IOException {
    setDaemon(true);
    _Client = xClient;
    Socket lSocket = xClient.getSocket();
    _PrintWriter = new PrintWriter(lSocket.getOutputStream(), true);
  } // constructor

  public void close(boolean xExitLoop) throws IOException {
    synchronized (this) {
      if (xExitLoop) interrupt();
      if (_PrintWriter == null) return;
      _PrintWriter.close();
      _PrintWriter = null;
    }
    System.out.println(_Client.IpConnectionString + " closed writer connection");
  } //

  public void send(String xText) {
    if (xText == null) return;

    synchronized (_WriteQueue) {
      _WriteQueue.addLast(xText);
      _WriteQueue.notify();
    }
  } //

  @Override
  public void run() {
    System.out.println(_Client.IpConnectionString + " start new writer connection");

    try {
      while (true) {
        String lText = null;
        synchronized (_WriteQueue) {
          if (_WriteQueue.size() > 0) lText = _WriteQueue.removeFirst();
          else _WriteQueue.wait();
        }
        if (lText == null) continue;
        if (_PrintWriter == null) continue;
        _PrintWriter.println(lText);
      }
    } catch (InterruptedException ex) {
      if (ex.getMessage() != null) System.out.println(ex.getMessage());
    }
    try {
      close(false);
    } catch (IOException ex) {
    }
  } //

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.IOException;
import java.net.Socket;

public class Test {

  private static void Sleep() {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException ex) {
      System.out.println(ex.getMessage());
    }
  } //

  public static void main(String[] args) throws IOException {
    Server lServer = new Server("MyServer", 65432);
    lServer.start();
    Sleep();

    Socket lClientSocket1 = new Socket("localhost", 65432);
    Client lClient1 = new Client(lClientSocket1);
    lClient1.connect();
    Sleep();

    Socket lClientSocket2 = new Socket("localhost", 65432);
    Client lClient2 = new Client(lClientSocket2);
    lClient2.connect();
    Sleep();

    lClient1.getWriter().send("client->server: hello server!");
    Sleep();

    lServer.broadcast("server->client: hello client!");
    Sleep();

    lClient1.getWriter().send("EXIT");
    lClient1.close();

    lClient2.getWriter().send("EXIT");
    lClient2.close();

    Sleep();

    lServer.close();
    //System.in.read();
    Sleep();
  }
} // class

example output:
0.0.0.0/0.0.0.0:65432 server is waiting for clients
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) start new reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) start new writer connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) message received: client->server: hello server!
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) message received: EXIT
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) closed reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) closed reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) closed writer connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) message received: EXIT
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) closed reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) closed reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) closed writer connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) closed writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) closed writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) client closed down
0.0.0.0/0.0.0.0:65432 server closed down

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   public class Server {

      private readonly List<Client> _Clients = new List<Client>();
      public readonly int Port;
      private TcpListener _TcpListener = null;
      private Thread _Thread;
      private readonly string Name;

      public Server(String xName, int xPort) {
         Name = xName;
         Port = xPort;
      } // constructor

      public bool start() {
         try {
            _TcpListener = new TcpListener(IPAddress.Parse("127.0.0.1"), Port);
            _TcpListener.Start();

            _Thread = new Thread(loop);
            _Thread.IsBackground = true;
            _Thread.Name = Name;
            _Thread.Start();
         }
         catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); return false; }
         return true;
      } //

      public void Close() {
         if (_Thread != null) { _Thread.Interrupt(); _Thread = null; }
         lock (_Clients) {
            foreach (Client lClient in _Clients) lClient.close();
            _Clients.Clear();
         }

         _TcpListener.Stop();
         Console.WriteLine(_TcpListener.LocalEndpoint.ToString() + " server closed down");
         _TcpListener = null;
      } //

      public void broadcast(String xMessage) {
         lock (_Clients) {
            foreach (Client lClient in _Clients) lClient.Writer.Send(xMessage);
         }
      } //

      private void loop() {
         Console.WriteLine(_TcpListener.LocalEndpoint.ToString() + " server is waiting for clients");

         try {
            while (true) {
               int lClientCount;
               lock (_Clients) lClientCount = _Clients.Count;
               TcpClient lTcpClient = _TcpListener.AcceptTcpClient();   // wait for a client to connect
               Client lClient = new Client(lTcpClient);
               lock (_Clients) _Clients.Add(lClient);
               lClient.Connect();
            }
         }
         catch (ThreadInterruptedException) { } // Thread interruption
         catch (SocketException) { } // Thread interruption => SocketException
         catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); }
      } //

   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System;
using System.Net.Sockets;

namespace Tcp {
   public class Client {
      public TcpClient TcpClient { get; private set; }
      public readonly String IpAddressHost;
      public readonly String IpAddressLocal;
      public readonly String IpConnectionString;
      public Reader Reader { get; private set; }
      public Writer Writer { get; private set; }

      public void close() {
         lock (this) {
            if (Reader != null) Reader.Close(true);
            Reader = null;
            if (Writer != null) Writer.Close(true);
            Writer = null;
            if (TcpClient != null) {
               TcpClient.Close();
               Console.WriteLine(IpConnectionString + " TcpClient read/write closed");
            }
            TcpClient = null;
         }

         Console.WriteLine(IpConnectionString + " client closed down");
      } //

      public Client(string xHostname, int xPort) : this(new TcpClient(xHostname, xPort)) { }   // create and connect

      public Client(TcpClient xTcpClient) {
         TcpClient = xTcpClient;
         IpAddressHost = TcpClient.Client.RemoteEndPoint.ToString();
         IpAddressLocal = TcpClient.Client.LocalEndPoint.ToString();
         IpConnectionString = "(Local " + IpAddressLocal + ",  Host " + IpAddressHost + ")";
      } //

      public void Connect() {
         Reader = new Reader(this);
         Writer = new Writer(this);
         Reader.Start();  // start listening
         Writer.Start();  // start write loop        
      } //  

   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   public class Reader {

      private readonly Client _Client;
      private StreamReader _StreamReader = null;
      private Thread _Thread = null;

      public Reader(Client xClient) {
         _Client = xClient;
         NetworkStream lNetworkStream = xClient.TcpClient.GetStream();
         _StreamReader = new StreamReader(lNetworkStream);
      } // constructor

      public void Start() {
         _Thread = new Thread(loop);
         _Thread.IsBackground = true;
         _Thread.Start();
      } //

      public void Close(bool xExitLoop) {
         lock (this) {
            if (xExitLoop && (_Thread != null)) { _Thread.Interrupt(); _Thread = null; }

            // _Client.TcpClient.Close() in Client object
            //if (_StreamReader == null) return;
            //_StreamReader.Close();
            //_StreamReader = null;
         }
         
         //Console.WriteLine(_Client.IpConnectionString + " closed reader connection");
      } //

      private void loop() {
         Console.WriteLine(_Client.IpConnectionString + " start new reader connection");

         while (true) {
            try {
               String lInput = _StreamReader.ReadLine();
               if (lInput == null) break;
               if (lInput == "") continue;

               Console.WriteLine(_Client.IpConnectionString + " message received: " + lInput);
               if ("EXIT".Equals(lInput)) break;

            }
            catch (ThreadInterruptedException) { break; } // Thread interruption
            catch (IOException) { break; } // StreamReader disposed
            catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); }
         }
         Close(false);
      } //

   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   public class Writer {

      private readonly Client _Client;
      private StreamWriter _StreamWriter = null;
      private Thread _Thread = null;

      private readonly BlockingCollection<String> _WriteQueue = new BlockingCollection<string>();

      public Writer(Client xClient) {
         _Client = xClient;
         NetworkStream lNetworkStream = xClient.TcpClient.GetStream();
         _StreamWriter = new StreamWriter(lNetworkStream);
      } // constructor

      public void Start() {
         _Thread = new Thread(Loop);
         _Thread.IsBackground = true;
         _Thread.Start();
      } //

      public void Close(bool xExitLoop) {
         lock (this) {
            if (xExitLoop && (_Thread != null)) { _Thread.Interrupt(); _Thread = null; }

            // _Client.TcpClient.Close() in Client object
            //if (_StreamWriter == null) return;
            //_StreamWriter.Close();
            //_StreamWriter = null;
         }
         //Console.WriteLine(_Client.IpConnectionString + " closed writer connection");
      } //

      public void Send(string xText) {
         if (string.IsNullOrEmpty(xText)) return;
         _WriteQueue.Add(xText);
      } //


      private void Loop() {
         Console.WriteLine(_Client.IpConnectionString + " start new writer connection");

         while (true) {
            try {
               string lText = null;
               lText = _WriteQueue.Take();
               if (string.IsNullOrEmpty(lText)) continue;
               if (_StreamWriter == null) continue;
               _StreamWriter.WriteLine(lText);
               _StreamWriter.Flush();
            }
            catch (ObjectDisposedException) { break; } 
            catch (ThreadInterruptedException) { break; } // Thread interruption
            catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); }
         }

         Close(false);
      } //


   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   class Program {
      static void Main(string[] args) {

         Server lServer = new Server("MyServer", 65432);
         lServer.start();
         Thread.Sleep(2000);

         TcpClient lClientSocket1 = new TcpClient("localhost", 65432);
         Client lClient1 = new Client(lClientSocket1);
         lClient1.Connect();
         Thread.Sleep(2000);

         TcpClient lClientSocket2 = new TcpClient("localhost", 65432);
         Client lClient2 = new Client(lClientSocket2);
         lClient2.Connect();
         Thread.Sleep(2000);

         lClient1.Writer.Send("client->server: hello server!");
         Thread.Sleep(2000);

         lServer.broadcast("server->client: hello client!");
         Thread.Sleep(2000);

         lClient1.Writer.Send("EXIT");
         Thread.Sleep(2000);
         lClient1.close();

         lClient2.Writer.Send("EXIT");
         Thread.Sleep(2000);
         lClient2.close();

         Thread.Sleep(1000);

         lServer.Close();
         //Console.ReadLine();
         Thread.Sleep(5000);

      } // main
   } // class
} // namespace

example output:
127.0.0.1:65432 server is waiting for clients
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) start new reader connection
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) start new reader connection
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) start new writer connection
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) message received: client->server: hello server!
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) message received: EXIT
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) TcpClient read/write closed
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) message received: EXIT
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) TcpClient read/write closed
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) TcpClient read/write closed
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) TcpClient read/write closed
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) client closed down
127.0.0.1:65432 server closed down

Advertisements

migration C#, Java, C++ (day 7), TBB

logo

This is some relaxing stuff today.
Recommended reading for variable parameters on the C# side: __arglist in the Print() method

variable parameters

double avg(int xCount, params double[] xDoubles) {
  double lSum = 0;
  foreach (double d in xDoubles) lSum += d;
  return lSum / xDoubles.Length;
} //
private double avg(Double... xDoubles) {
  double lSum = 0;
  for (double d : xDoubles) lSum += d;
  return lSum / xDoubles.length;
} //
// #include <cstdarg>
double avg(int xCount, ...) {
  va_list args;
  double lSum = 0;

  va_start(args, xCount);
  for (int i = 0; i < xCount; i++) lSum += va_arg(args, double);
  va_end(args);

  return lSum / xCount;
}

Parallel vs. TBB

public void DoSomething() {
   Console.Write(Thread.CurrentThread.ManagedThreadId + " ");
   Thread.Sleep(100);
} //

Stopwatch _Stopwatch = new Stopwatch();
void startTimer() {
   _Stopwatch.Reset();
   _Stopwatch.Start();
} //

void stopTimer() {
   long h = _Stopwatch.ElapsedMilliseconds;
   Console.WriteLine("timer millisecs: " + h);
   _Stopwatch.Restart();
} //

BlockingCollection<int> _Queue;
void Producer() {
   Console.WriteLine();
   for (int i = 10; i >= 0; i--) {
      DoSomething();
      if (!_Queue.TryAdd(i)) Console.WriteLine("failed to push a new value to the queue");
   }
} //


void TBBs() {
   const int lSize = 50;

   startTimer();
   for (int i = 0; i < lSize; i++) DoSomething();
   stopTimer();

   Parallel.For(0, lSize, x => DoSomething());
   stopTimer();

   List<int> lList = new List<int>();
   for (int i = 0; i < lSize; i++) lList.Add(i);

   startTimer();
   Parallel.ForEach(lList, x => DoSomething());
   stopTimer();

   Parallel.Invoke(DoSomething, DoSomething, DoSomething);
   stopTimer();

   Thread lThread = new Thread(Producer);
   lThread.Start();

   using (_Queue = new BlockingCollection<int>()) {
      while (true) {
         int x = _Queue.Take();
         Console.WriteLine("received value " + x);
         if (x == 0) break;
      }
   }
} //
public final void DoSomething() {
  System.out.print(Thread.currentThread().getId() + " ");
  try {
    Thread.sleep(100);
  } catch (InterruptedException ex) {
    System.out.println(ex.getMessage());
  }
}

public class Stopwatch {

  long _Time;

  public void startTimer() {
    _Time = System.currentTimeMillis();
  } //

  public void stopTimer() {
    long h = System.currentTimeMillis();
    System.out.println("timer millisecs: " + (h - _Time));
    _Time = h;
  } //
} // class

private final LinkedBlockingDeque<Integer> _Queue = new LinkedBlockingDeque<>();

private void Producer() {
  System.out.println();
  for (int i = 10; i >= 0; i--) {
    DoSomething();
    if (!_Queue.add(i))
      System.out.println("failed to push a new value to the queue");
  }
}

private void TBBs() {
  final int lSize = 50;
  Stopwatch lStopwatch = new Stopwatch();

  lStopwatch.startTimer();
  for (int i = 0; i < lSize; i++) DoSomething();
  lStopwatch.stopTimer();

  int lCPUs = Runtime.getRuntime().availableProcessors();
  ExecutorService lExecutor = Executors.newFixedThreadPool(lCPUs);
  for (int i = 0; i < lSize; i++) lExecutor.submit(() -> DoSomething());
  lStopwatch.stopTimer();

  java.util.ArrayList<Integer> lList = new java.util.ArrayList<>();
  for (int i = 0; i < lSize; i++) lList.add(i);

  lStopwatch.startTimer();
  lList.parallelStream().forEach(x -> DoSomething());
  lStopwatch.stopTimer();

  Callable<Void> lCallable = () -> {
    DoSomething();
    return null;
  };

  ArrayList<Callable<Void>> lCallables = new ArrayList<>();

  lCallables.add(lCallable);
  lCallables.add(lCallable);
  lCallables.add(lCallable);

  try {
    lExecutor.invokeAll(lCallables);
  } catch (InterruptedException ex) {
    System.out.println("InterruptedException: " + ex.getMessage());
  }
  lStopwatch.stopTimer();

  Thread lThread = new Thread(() -> {
    Producer();
  });
  lThread.start();

  while (true) {
    int x = 0;
    try {
      x = _Queue.take();
    } catch (InterruptedException ex) {
      System.out.println("InterruptedException: " + ex.getMessage());
    }
    System.out.println("received value " + x);
    if (x == 0) break;
  }
} //
// #include <thread>
void DoSomething() {
  cout << this_thread::get_id() << " ";
  this_thread::sleep_for(chrono::milliseconds(100));
} //

unsigned int _timer = 0;
// #include <ctime>
void startTimer() {
  _timer = clock();
}
void stopTimer() {
  unsigned int lTimer = clock();
  cout << endl << "timer millisecs: " << clock() - _timer << endl;
  _timer = lTimer;
}

tbb::concurrent_bounded_queue<int> *_Queue;
void Producer() {
  cout << endl;
  for (int i = 10; i >= 0; i--)  {
    DoSomething();
    if (!_Queue->try_push(i)) cout << "failed to push a new value to the queue" << endl;
  }
}


// #include <thread>
void TBBs() {
  const int lSize = 50;

  startTimer();
  for (int i = 0; i < lSize; i++) DoSomething();
  stopTimer();

  tbb::parallel_for(0, lSize, 1, [&](int i) { DoSomething(); });
  stopTimer();

  vector<int> lVector;
  for (int i = 0; i < lSize; i++) lVector.push_back(i);

  startTimer();
  tbb::parallel_for_each(lVector.begin(), lVector.end(), [](int i) { DoSomething(); });
  stopTimer();

  tbb::parallel_do(lVector.begin(), lVector.end(), [](int i) { DoSomething(); });
  stopTimer();

  tbb::parallel_invoke(DoSomething, DoSomething, DoSomething);

  _Queue = new tbb::concurrent_bounded_queue<int>();
  thread lThread(Producer);
  lThread.detach();

  int i;
  while (true) {
    _Queue->pop(i);
    cout << endl << "received value " << i << endl;
    if (i == 0) break;
  }

  delete _Queue;
  _Queue = nullptr;
} //

delegates/Action/Func vs. func

void F1(string s) { Console.WriteLine(s); }
void F2(string s) { Console.WriteLine(">" + s + "<"); }

public void test() {
   // delegates
   Action<string> f1 = F1;  // use Func<> for methods with return values
   f1("hello world");
   F1("hello world");
   Action<string> f3 = (s => f1(s));
   f3("hello echo");

   f1 = F2;
   f1("hello world");

   // variable argument list
   Console.WriteLine("average is " + avg(4, 1.1, 2.2, 3.3, 4.4));  // 2.75

   TBBs();
} //
private void F1(String s) { System.out.println(s); } 
private void F2(String s) { System.out.println(">" + s + "<"); } 

public interface Action<T> { void invoke(T t); }

public void test() {
  // there are neither delegates nor pointers in Java
  Action<String> f1 = s -> F1(s);
  f1.invoke("hello world");
  F1("hello world");
  Action<String> f3 = s -> f1.invoke(s);
  f3.invoke("hello echo");

  //f1 = s -> F2(s);  compiler error, becuase this would change the "effectively final" status of f1
  Action<String> f4 = f1; // simple assignment
  f4 = s -> F2(s);
  f4.invoke("hello world");

  // variable argument list
  System.out.println("average is " + avg(1.1, 2.2, 3.3, 4.4)); // 2.75

  TBBs();
} //
void F1(const string &s) { cout << s << endl; }
void F2(const string &s) { cout << ">" << s << "<" << endl; }

void test() {
  // delegates
  function<void(const string &)> f1 = F1;
  f1("hello world");
  F1("hello world");
  function<void(const string &)> f3 = [=](const string &s) { f1(s); };
  f3("hello echo");

  f1 = F2;
  f1("hello world");

  // variable argument list
  cout << "average is " << avg(4, 1.1, 2.2, 3.3, 4.4) << endl;  // 2.75

  TBBs();
} //

today’s requirements

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
#include "tbb/tbb.h"
#include "tbb/parallel_for.h"
#include "tbb/concurrent_queue.h"
#include "tbb/blocked_range.h"
#include <cstdarg>
#include <thread>
#include <iostream>
#include <string>
#include <ctime>

migration C#, Java, C++ (day 5), lambda and TBB

logo

Phew, this was a terrible day. I wanted to do far more, but I had realized that there is no proper native threading in the C++ standard libraries. Back in the 1990s threading was a minor issue. We only had one CPU and FPU. We were concentrating more on the processes and devices at that time.

I had already mentioned that I wanted to avoid Boost and Qt. The best option I found was TBB, which seems to support multicore-processors very well. I will include TBB in my migration project from now on. Today, there is only one TBB example. It took me a lot of time to get the Intel software installed on my PC.

Finally, after all these years … we can solve problems with PCs that we would not have without them 😉

TBB covers only a small fraction today. This post is mainly about lambda functions.
In C++ you can find a lot of weird stuff that has been solved in a much better way in C#. As a hardcore C# programmer C++ might feel medieval sometimes. Rome was built bit by bit. Hence its streets are chaotic. C# maybe is like Chicago. When the city was built, people had time to think about the infrastructure before they started digging arbitrary holes in the ground.
Let’s see what my thoughts will be on day 15.

Please also check my post Lambda expressions (advanced). I mentioned that the C# compiler takes care of the local variable life-cycles, when lambda expressions use local variables, which would become invalid by the time these expressions try to use them. Remember, these local variables are not created inside the lambda expressions themselves.

Today’s prerequisite for C++

#include <iostream>
#include <vector>
#include <ctime>
#include <thread>
#include "tbb/tbb.h"
#include "tbb/parallel_for.h"
using namespace std;

Lambda

public void DoSomething() {
   Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " ");
   Thread.Sleep(200);
} //

public void Lambdas() {
   // functor
   // There are no functors in C#.
   // Action or Func delegates are the closest approaches.
   Action lAction = DoSomething;
   lAction();

   // lambda expression
   Func<int, int> func2 = (int x) => { return ++x; };
   int r2 = func2(4);
   Console.WriteLine(r2); // 5

   // lambda expression capturing a local variable
   // the compiler takes care of the lifetime of variable i
   int i = 5;
   Func<int, int> func3 = (int x) => { return x + i; };
   int r3 = func3(4);
   Console.WriteLine(r3); // 9

   // lambda expression capturing any local variable by reference
   List<int> lList = new List<int>() { 1, 2, 3, 5, 7, 11, 13, 17, 19, 23 };
   int lSum1 = 0;
   lList.ForEach((int q) => { lSum1 += q; });
   Console.WriteLine(lSum1); // 101

   // lambda expression with a specific return type
   Func<int, bool> func6 = (int x) => { x++; return true; };
   bool r6 = func6(4);
   Console.WriteLine(r6); // True
} //
class Class1 implements Runnable {
  @Override
  public void run() {
    try {
      System.out.println(Thread.currentThread().getId() + " ");
      Thread.sleep(200);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
} // class

public static void main(String[] args) {
    Class1 c1 = new Class1();
    c1.run();

    // lambda expression
    Function<Integer, Integer> func2 = i -> { return ++i; };
    Integer r2 = func2.apply(4);
    System.out.println(r2); // 5

    // lambda expression capturing a local variable
    // the compiler takes care of the lifetime of variable i
    Integer i = 5;
    Function<Integer, Integer> func3 = x -> { return x + i; };
    int r3 = func3.apply(4);
    System.out.println(r3); // 9

    // lambda expression capturing any local variable by reference
    java.util.ArrayList<Integer> lList = new java.util.ArrayList<>(java.util.Arrays.asList(new Integer[]{1, 2, 3, 5, 7, 11, 13, 17, 19, 23}));
    Integer lSum1 = 0;
    lList.forEach((Integer q) -> {
      //lSum1 += q;  compiler complains, because it has no write access to lSum1
    });
    // thus we have to use:
    lSum1 = lList.stream().map((q) -> q).reduce(lSum1, Integer::sum);
    System.out.println(lSum1); // 101 

    // lambda expression with a specific return type
    Function<Integer, Boolean> func6 = x -> {
      x++;
      return true;
    };
    boolean r6 = func6.apply(4);
    System.out.println(r6); // true
  } // 
class func1 {
public:
  int operator ()(int x) { return ++x; }
};

void Lambdas() {

  // functor
  int r1 = func1()(3);
  cout << r1 << endl; // 4

  // lambda expression
  auto func2 = [](int x) { return ++x; };
  int r2 = func2(4);
  cout << r2 << endl; // 5

  // lambda expression capturing a local variable
  int i = 5;
  auto func3 = [i](int x) { return x + i; };
  int r3 = func3(4);
  cout << r3 << endl; // 9

  // lambda expression capturing any local variable
  auto func4 = [=](int x) { return x + i + r2 * r1; };  // 1 + 5 + (5 * 4)
  int r4 = func4(1);
  cout << r4 << endl; // 26

  // lambda expression capturing a local variable by reference
  vector<int> v1{ 1, 2, 3, 5, 7, 11, 13, 17, 19, 23 };
  int lSum1 = 0;
  for (int q : v1) { [&lSum1](int x) {lSum1 += x; } (q); }
  cout << lSum1 << endl; // 101

  // lambda expression capturing any local variable by reference
  vector<int> v2{ 1, 2, 3, 5, 7, 11, 13, 17, 19, 23 };
  int lSum2 = 0;
  int lProduct2 = 1;
  for (int q : v2) { [&](int x) {lSum2 += x; lProduct2 *= x; }(q); }
  cout << lSum2 << " " << lProduct2 << endl; // 101 223092870

  // lambda expression capturing any local variable by value
  // The external variables can be assigned, but they are not referenced,
  // because the lambda expression is only working with local copies.
  // This can be achieved by using the keyword "mutable".
  // By default lambdas are not mutable.
  int lSum3 = 0;
  int lProduct3 = 1;
  auto func5 = [=](int x) mutable {lSum3 += x; lProduct3 *= x; return x; };
  int r5 = func5(1);
  cout << lSum3 << " " << lProduct3 << " " << r5 << endl; // 0 1 1

  // lambda expression with a specific return type
  auto func6 = [](int x) -> bool { x++; return true; };
  bool b6 = func6(6);
  cout << (b6 ? "true" : "false") << endl; // true
} //

TBB

public void TBB() {
  const int lSize = 100;

  Stopwatch lStopwatch = new Stopwatch();
  lStopwatch.Start();
  for (int i = 0; i < lSize; i++) DoSomething();
  lStopwatch.Stop();
  Console.WriteLine();
  Console.WriteLine("time for serial loop was " + lStopwatch.ElapsedMilliseconds / 1000.0 + " seconds");

  lStopwatch.Reset();
  lStopwatch.Start();
  Parallel.For(0, lSize, i => DoSomething());
  lStopwatch.Stop();
  Console.WriteLine();
  Console.WriteLine("time for parallel loop was " + lStopwatch.ElapsedMilliseconds/1000.0 + " seconds");
} //

example output:
9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9
9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9
9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9
time for serial loop was 20.001 seconds
9 10 11 13 14 12 15 17 16 9 10 11 13 14 12 15 17 16 9 10 11 13 14 12 15 17 16 9
10 11 13 14 12 15 17 16 9 10 11 13 14 12 15 17 16 9 10 11 13 14 12 15 17 16 9 10
11 13 14 12 15 17 16 9 10 11 13 14 12 15 16 9 10 11 13 14 12 15 16 9 10 11 13 1
4 12 15 9 10 11 13 14 12 15 9 10 11 13 14 12 15
time for parallel loop was 2.408 seconds

public final void TBB() {
  Class1 lDoSomething = new Class1();
  final int lSize = 100;

  long lStopwatch = System.currentTimeMillis();
  for (int i = 0; i < lSize; i++) {
    lDoSomething.run();
  }
  long lDiff = System.currentTimeMillis() - lStopwatch;
  System.out.println();
  System.out.println("time for serial loop was " + lDiff / 1000 + " seconds");

  lStopwatch = System.currentTimeMillis();    
  ArrayList<Class1> lList = new ArrayList<>();
  for (int i = 0; i < lSize; i++) lList.add(lDoSomething);
  lList.parallelStream().forEach(x -> x.run());

  lDiff = System.currentTimeMillis() - lStopwatch;
  System.out.println();
  System.out.println("time for parallel loop was " + lDiff / 1000.0 + " seconds");
} //

example output:
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
time for serial loop was 20 seconds
12 13 14 1 16 11 15 17 14 13 12 1 16 11 15 17 13 12 14 1 16 17 11 15 13 14 12 16 1 11 15 17 13 14 12 16 1 11 15 17 12 13 14 1 16 11 17 15 14 12 13 16 1 15 17 11 12 13 14 16 1 17 11 15 14 13 12 1 16 11 15 17 13 14 12 1 16 15 17 11 14 12 13 16 1 11 17 15 14 12 13 16 1 17 11 15 12 13 1 15
time for parallel loop was 2.841 seconds

void DoSomething() {
  cout << this_thread::get_id() << "  ";
  this_thread::sleep_for(chrono::milliseconds(200));
} //

void TBBs() {
  const int lSize = 100;

  time_t lTimer;
  time(&lTimer);
  for (int i = 0; i < lSize; i++) DoSomething();
  double lSecondsA = difftime(time(nullptr), lTimer);
  cout << endl << "time for serial loop was " << lSecondsA << " seconds" << endl;

  time(&lTimer);
  tbb::parallel_for(0, lSize, 1, [&](int i) {	DoSomething(); });
  double lSecondsB = difftime(time(nullptr), lTimer);
  cout << endl << "time for parallel loop was " << lSecondsB << " seconds" << endl;
} //

example output:
4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 46
16 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616
4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616
4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 46
16 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616
4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616
4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 4616 46
16 4616 4616 4616 4616 4616 4616
time for serial loop was 20 seconds
4616 4068 4916 3788 2880 4168 4752 6796 4616 4916 4068 2880 3788 41
68 6796 4752 4616 4068 4916 2880 3788 4168 6796 4752 4616 4916 4068
2880 3788 4168 6796 4752 4616 4916 4068 3788 2880 4168 6796 4752
4616 4068 4916 2880 3788 4168 6796 4752 4616 4068 4916 3788 2880 41
68 6796 4752 4616 4068 4916 2880 3788 6796 4168 4752 4616 4068 4916
2880 3788 6796 4168 4752 4616 4068 4916 2880 3788 6796 4168 4752
4616 4916 4068 3788 2880 6796 4168 4752 4616 4068 4916 3788 2880 67
96 4168 4752 4616 4916 4068 3788
time for parallel loop was 2 seconds