Category Archives: Threading

Timer Finetuning, Each Microsecond Counts

Einstein

 

This is a one day excursus from my present chart posts, which will continue with my next post.

Exact timing is not an issue for most of us. Who cares if something gets executed a few millisecond earlier or later. Well, this blog was started to highlight many issues of “hard-core” programming. Thus I do care!

Back in 1985, when I was hacking on my Commodore 64, there was no such as Context Switching. To wait for a few moments you could run a loop for a number of times. And when you were running the same loop on the same C64 model again, then you had pretty much the same passed amount of time. The emphasis in on the word “same”. But these days don’t work like that. You never face the “same” situation again. Since the invention of multithreading (in fact it was called multitasking on the good old Commodore Amiga) context switching came into place.

(There are Multimedia Timers in the windows core environment. I am not going to use these. High precision inside the standard .Net framework is possible. There is no need to leave robustness behind.)

I was running several small tests to get a feeling for the .Net timing. The first and most simple approach was using PriorityBoostEnabled on the process to boost the process priority. Unfortunately its use is limited. Your priority only gets boosted, when your main window has the focus. And when your process is on a high priority anyway, it obviously won’t change a lot.
Therefore the next step was to increase the process priority to ‘ProcessPriorityClass.RealTime’. This is a dangerous setting and should only be used by skilled programmers. Your application could consume too much privileged time at the expense of the entire system, which would result in a slowed down execution time for ALL processes and services – including your process that you were originally trying to speed-up.

One important aspect is the PrivilegedProcessorTime. We’ll assume 80 milliseconds, just to have a realistic number. This would imply that the windows system was giving your process 80 ms to execute code before it switched to another process. You have to share this order of magnitude amongst all your threads. 10 ms of these are consumed by your GUI update. Make sure to execute any time sensitive code in one piece before the next context switching takes place.

As we talk about timers, it does not help you at all to be precise on one side, but then lose processor execution time to another thread or process.
Let me come up with a short story. You take the train at exactly 1 pm after waiting for 8 hours. You only have 10 minutes left to get from Austria to Belgium. That sucks, right? You obviously have to start your journey without waiting 8 hours beforehand.
The hard-core scenario would be to get changed, pack your suitcase and sleep until 12:45pm … yes, in your business suit! Then suddenly wake up, jump out of your bed, don’t kiss your wife, run to the station and use the full 8 hours for your journey. You only make it, when you arrive in Brussels before the ticket collector kicks you out, because your ticket was only valid for 8 hours.

 

Let’s delve into practice. In the below example you can see that getting the time-stamp does take about half a tick. When you run a long loop and try to get the time-stamp many times, then it seems to slow down. The reason for this is the context switching. Somewhere in the middle the PrivilegedProcessorTime expired. Therefore running only a few loops can show better results.

The system timer is terribly slow. Its delays and reliability are bad as bad can be. It apparently accepts a double for microseconds rather than the usual long. You would expect a higher resolution then, wouldn’t you?

Especially the first run is unpredictable. The reason for this are the CLR runtime compilation operations. Your code gets compiled shortly before the first execution. You will observe this with any timer. Start your code earlier and skip the first callback. This improves the precision for the first callback you are really waiting for.

The thread timer is more precise, but is firing too early sometimes. Its precision is above one millisecond.

The timer with the most potential isn’t really a timer. It is a thread sleep method in a loop. In my opinion that is the most advanced solution to solve the timer precision problem. The loop is running on a thread. This thread is not shared like on a task scheduler. You own that thread and nobody else will use it. You obviously should not run hundreds of threads. Run 5 of them, and you are still in the green zone. The big advantage is that you can change the priority of this thread to ‘Highest’. This gives you the attention that real geeks need. Furthermore, you won’t have any multiple code executions. There is one event running at a time. If you miss one, because the previous event was running too long, then you can still decide to not execute the next run. A general system delay would then not queue up events that you don’t want to execute anymore … as it is too late anyway. You obviously can add this feature for any timer, but this one is the safest way to easily have only one event running at a time.

Check out my MicroTimer class. It waits a little bit less than required and then calls SpinWait multiple times. Once the process thread gets processor time assigned, you most likely won’t face context switching. You wait like a disciple for the one and only messiah, running around a rigid pole.

The MicroTimer class should give you a whopping precision of 1 microsecond …. unless my usual ‘unless’ sentence states something else 😉
The example output shows the delay at the point when the timer was leaving the inner loop to raise an event. And it shows the delay at the time it recorded the data inside the event. Obviously there are some microseconds in between. Measure that time and adjust your schedule accordingly.

 

Computer Specs:
Lenovo Yoga 2 11”
Intel i5-4202Y 1.6 GHz processor
128GB SSD hard disk
4GB memory
Windows 8.1, 64 bit

 

Admittedly, the code is a bit of Spaghetti style today. Tests are really what they are. It was clear that the MicroTimer would win in the end. So my effort for a proper style went into that direction.

 

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;

namespace TimerPrecisionTest {
	class Program {
		private const long _NumDataPoints = 10;
		private static List<long> _DataPoints = new List<long>();
		private static int _CurrentDataPoint = 0;

		private static long[] _Schedule;

		private static long _PeriodInMs = 3 * Stopwatch.Frequency * 1000;
		private static long _PeriodInTicks = 3 * Stopwatch.Frequency;
		private static double _FrequencyAsDouble = (double)Stopwatch.Frequency;

		private static AutoResetEvent _AutoResetEvent = new AutoResetEvent(false);

		static void Main(string[] args) {
			Process p = Process.GetCurrentProcess();
			p.PriorityBoostEnabled = true;  // every little helps
			p.PriorityClass = ProcessPriorityClass.Normal;
			Console.WriteLine("Process with normal priority:");
			Console.WriteLine("Priviledged processor time for process " + p.ProcessName + " is " + p.PrivilegedProcessorTime.TotalMilliseconds.ToString("#,##0.0") + " ms");
			p.PriorityClass = ProcessPriorityClass.RealTime;
			Console.WriteLine("Process with high priority:");
			Console.WriteLine("Priviledged processor time for process " + p.ProcessName + " is " + p.PrivilegedProcessorTime.TotalMilliseconds.ToString("#,##0.0") + " ms");

			Console.WriteLine("IsHighResolution system clock: " + Stopwatch.IsHighResolution);
			Console.WriteLine("Number of ticks per second: " + Stopwatch.Frequency.ToString("#,##0"));

			long a = Stopwatch.GetTimestamp();
			for (int i = 0; i < 100000; i++) {
				long b = Stopwatch.GetTimestamp();
			}
			long c = Stopwatch.GetTimestamp();
			Console.WriteLine("Number of ticks to obtain a timestamp: " + ((c - a) / 100000.0).ToString("#,##0.00"));
			Console.WriteLine();

			UseSystemTimer();

			UseThreadingTimer();

			// a simple loop
			Thread lThread = new Thread(new ThreadStart(UseLoop));
			lThread.Priority = ThreadPriority.Highest;
			lThread.IsBackground = true;
			lThread.Start();

			_AutoResetEvent.WaitOne();

			testSpinWaitPrecision();

			// a proper loop
			UseMicroTimerClass();

			Console.ReadLine();
		} //

	#region MicroTimer
		private static void UseMicroTimerClass() {
			Console.WriteLine("MICRO TIMER CLASS:");
			Init();
			long lMaxDelay = (3L * Stopwatch.Frequency) / 1000L; // 3 ms
			MicroTimer lMicroTimer = new MicroTimer(new Queue<long>(_Schedule), lMaxDelay);
			lMicroTimer.OnMicroTimer += OnMicroTimer;
			lMicroTimer.OnMicroTimerStop += OnMicroTimerStop;
			lMicroTimer.OnMicroTimerSkipped += OnMicroTimerSkipped;
			lMicroTimer.Start();
		} //

		static void OnMicroTimerSkipped(int xSenderThreadId, long xWakeUpTimeInTicks, long xDelayInTicks) {
			Console.WriteLine("MicroTimer for WakeUpTime " + xWakeUpTimeInTicks + " did not run. Delay was: " + xDelayInTicks);
		} //

		static void OnMicroTimerStop(int xSenderThreadId) {
			Console.WriteLine("MicroTimer stopped.");
			PrintStats();
		} //

		static void OnMicroTimer(int xSenderThreadId, long xWakeUpTimeInTicks, long xDelayInTicks) {
			RecordDatapoint();
			Console.WriteLine("(Delay at wakeup time was " + xDelayInTicks.ToString("#,##0" + " tick)"));
		} //
	#endregion

		#region SpinWait precision
		private static void testSpinWaitPrecision() {
			Console.WriteLine();
			Console.WriteLine("SpinWait tests (neglecting PrivilegedProcessorTime):");
			Thread.CurrentThread.Priority = ThreadPriority.Highest;
			Thread.Sleep(0);  // switch context at a good point
			long a = Stopwatch.GetTimestamp();
			Thread.SpinWait(100000);
			long b = Stopwatch.GetTimestamp();
			Console.WriteLine("Number of ticks for a SpinWait: " + (b - a).ToString("#,##0"));

			a = Stopwatch.GetTimestamp();
			Thread.Sleep(0);
			for (int i = 0; i < 100; i++) {
				Thread.SpinWait(100000);
			}
			b = Stopwatch.GetTimestamp();
			double lAverage = (b - a) / 100.0;
			Console.WriteLine("Average ticks for 100x SpinWaits: " + lAverage.ToString("#,##0") + " == " + (lAverage * 1000.0 * 1000.0/ Stopwatch.Frequency).ToString("#,##0.0000") + " microseconds");

		  // now we do get extremly precise
			long lEndTime = Stopwatch.GetTimestamp() + Stopwatch.Frequency; // wake up in one second
			Thread.Sleep(900); // imagine a timer raises an event roughly 100ms too early
			while (Stopwatch.GetTimestamp() < lEndTime) {
				Thread.SpinWait(10);  // no context switching
			}
			a = Stopwatch.GetTimestamp();
			Console.WriteLine("SpinWait caused an error of just: " + ((a - lEndTime) * 1000.0 * 1000.0 / _FrequencyAsDouble).ToString("#,##0.0000") + " microseconds");
			Thread.CurrentThread.Priority = ThreadPriority.Normal;
			Console.WriteLine();
		} //
		#endregion

		#region simple loop
		private static void UseLoop() {
			Thread.CurrentThread.Priority = ThreadPriority.Highest;
			Console.WriteLine("LOOP AND SLEEP:");
			Init();
			Thread.Sleep((int)getTimeMsToNextCall_Long());

			while (_CurrentDataPoint < _NumDataPoints) {
				RecordDatapoint();
				if (_CurrentDataPoint >= _NumDataPoints) break;
				Thread.Sleep((int)getTimeMsToNextCall_Long());
			}

			PrintStats();
			_AutoResetEvent.Set();
			Thread.CurrentThread.Priority = ThreadPriority.Normal;
		} //
		#endregion

		#region SystemTimer
		private static System.Timers.Timer _SystemTimer = null;
		private static void UseSystemTimer() {
			Console.WriteLine("SYSTEM TIMER:");
			Init();
			_SystemTimer = new System.Timers.Timer();
			_SystemTimer.AutoReset = false;
			_SystemTimer.Elapsed += SystemTimer_Elapsed;
			_SystemTimer.Interval = getTimeMsToNextCall_Double();  // do not init in constructor!
			_SystemTimer.Start();

			_AutoResetEvent.WaitOne();
			_SystemTimer.Stop();
			_SystemTimer = null;

			PrintStats();
		} //

		private static void SystemTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) {
			RecordDatapoint();

			// calibrate timer (we did not start with the right interval when we launched it)
			System.Timers.Timer lTimer = _SystemTimer;
			if (lTimer == null) return;
			if (_CurrentDataPoint >= _NumDataPoints) return;
			lTimer.Stop();
			lTimer.Interval = getTimeMsToNextCall_Double();
			lTimer.Start();
		} //
		#endregion

		#region ThreadingTimer
		private static System.Threading.Timer _ThreadingTimer = null;

		private static void UseThreadingTimer() {
			Console.WriteLine("THREAD TIMER:");
			Init();

			TimerCallback lCallback = new TimerCallback(ThreadingTimer_Elapsed);
			_ThreadingTimer = new System.Threading.Timer(lCallback, null, getTimeMsToNextCall_Long(), (long)(Timeout.Infinite));

			_AutoResetEvent.WaitOne();
			_ThreadingTimer = null;

			PrintStats();
		} //

		private static void ThreadingTimer_Elapsed(object xState) {
			RecordDatapoint();

			// restart timer
			System.Threading.Timer lTimer = _ThreadingTimer;
			if (lTimer == null) return;
			if (_CurrentDataPoint >= _NumDataPoints) return;
			lTimer.Change(getTimeMsToNextCall_Long(), (long)Timeout.Infinite);
		} //
		#endregion

		#region miscellaneous
		private static void Init() {
			_DataPoints.Clear();
			_CurrentDataPoint = 0;

			// init exact time schedule
			long lOffset = Stopwatch.GetTimestamp() + _PeriodInTicks;  // we start in the future
			_Schedule = new long[_NumDataPoints];
			for (int i = 0; i < _NumDataPoints; i++) {
				_Schedule[i] = lOffset;
				lOffset += _PeriodInTicks;
			}
		} //

		private static void PrintStats() {
			if (_DataPoints.Count < 1) return;

			Console.WriteLine("Average " + _DataPoints.Average());
			long lMin = _DataPoints.Min();
			long lMax = _DataPoints.Max();
			Console.WriteLine("Min     " + lMin);
			Console.WriteLine("Max     " + lMax);
			Console.WriteLine("Range   " + (lMax - lMin));
			Console.WriteLine();
		} //

		private static void RecordDatapoint() {
			long lDifference = Stopwatch.GetTimestamp() - _Schedule[_CurrentDataPoint];  // positive = late, negative = early
			_DataPoints.Add(lDifference);
			Console.WriteLine("Delay in ticks: " + lDifference.ToString("#,##0") + " == " + ((lDifference * 1000000.0) / _FrequencyAsDouble).ToString("#,##0") + " microseconds");
			_CurrentDataPoint++;
			if (_CurrentDataPoint >= _NumDataPoints) _AutoResetEvent.Set();
		} //

		private static long getTimeMsToNextCall_Long() {
			long lTicks = (_Schedule[_CurrentDataPoint] - Stopwatch.GetTimestamp());
			return (1000 * lTicks) / Stopwatch.Frequency;
		} //

		private static double getTimeMsToNextCall_Double() {
			double lTicks = (double)(_Schedule[_CurrentDataPoint] - Stopwatch.GetTimestamp());
			return (1000.0 * lTicks) / _FrequencyAsDouble;
		} //
		#endregion

	} // class
} // namespace

 

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace TimerPrecisionTest {
	public class MicroTimer {

		private readonly Queue<long> _TickTimeTable;
		private readonly Thread _Thread;
		private readonly long _MaxDelayInTicks;  // do not run if the delay was too long
		private long _NextWakeUpTickTime;

		public delegate void dOnMicroTimer(int xSenderThreadId, long xWakeUpTimeInTicks, long xDelayInTicks);
		public event dOnMicroTimer OnMicroTimer;
		public event dOnMicroTimer OnMicroTimerSkipped;

		public delegate void dQuickNote(int xSenderThreadId);
		public event dQuickNote OnMicroTimerStart;
		public event dQuickNote OnMicroTimerStop;

		public MicroTimer(Queue<long> xTickTimeTable, long xMaxDelayInTicks) {
			_TickTimeTable = xTickTimeTable;
			_Thread = new Thread(new ThreadStart(Loop));
			_Thread.Priority = ThreadPriority.Highest;
			_Thread.Name = "TimerLoop";
			_Thread.IsBackground = true;
			_MaxDelayInTicks = xMaxDelayInTicks;
		} //

		public int Start() {
			if ((_Thread.ThreadState & System.Threading.ThreadState.Unstarted) == 0) return -1;
			_Thread.Start();
			return _Thread.ManagedThreadId;
		} //

		public void Stop() {
			_Thread.Interrupt();
		} //

		private void Loop() {
			dQuickNote lOnStart = OnMicroTimerStart;
			if (lOnStart != null) lOnStart(_Thread.ManagedThreadId);

			try {
				while (true) {
					if (_TickTimeTable.Count < 1) break;
					_NextWakeUpTickTime = _TickTimeTable.Dequeue();
					long lMilliseconds = _NextWakeUpTickTime - Stopwatch.GetTimestamp();
					if (lMilliseconds < 0L) continue;
					lMilliseconds = (lMilliseconds * 1000) / Stopwatch.Frequency;
					lMilliseconds -= 50;  // we want to wake up earlier and spend the last time using SpinWait
					Thread.Sleep((int)lMilliseconds);

					while (Stopwatch.GetTimestamp() < _NextWakeUpTickTime) {
						Thread.SpinWait(10);
					}
					long lWakeUpTimeInTicks = Stopwatch.GetTimestamp();
					long lDelay = lWakeUpTimeInTicks - _NextWakeUpTickTime;
					if (lDelay < _MaxDelayInTicks) {
						dOnMicroTimer lHandler = OnMicroTimer;
						if (lHandler == null) continue;
						lHandler(_Thread.ManagedThreadId, lWakeUpTimeInTicks, lDelay);
					}
					else {
						dOnMicroTimer lHandler = OnMicroTimerSkipped;
						if (lHandler == null) continue;
						lHandler(_Thread.ManagedThreadId, lWakeUpTimeInTicks, lDelay);
					}
				}
			}
			catch (ThreadInterruptedException) { }
			catch (Exception) { Console.WriteLine("Exiting timer thread."); }

			dQuickNote lOnStop = OnMicroTimerStop;
			if (lOnStop != null) lOnStop(_Thread.ManagedThreadId);
		} //

	} // class
} // namespace

Example output:

Process with normal priority:
Priviledged processor time for process TimerPrecisionTest.vshost is 109.4 ms
Process with high priority:
Priviledged processor time for process TimerPrecisionTest.vshost is 109.4 ms
IsHighResolution system clock: True
Number of ticks per second: 1,558,893
Number of ticks to obtain a timestamp: 0.27

SYSTEM TIMER:
Delay in ticks: 65,625 == 42,097 microseconds
Delay in ticks: 1,414 == 907 microseconds
Delay in ticks: 1,663 == 1,067 microseconds
Delay in ticks: 1,437 == 922 microseconds
Delay in ticks: 25,829 == 16,569 microseconds
Delay in ticks: 1,532 == 983 microseconds
Delay in ticks: 14,478 == 9,287 microseconds
Delay in ticks: 14,587 == 9,357 microseconds
Delay in ticks: 14,615 == 9,375 microseconds
Delay in ticks: 14,650 == 9,398 microseconds
Average 15583
Min 1414
Max 65625
Range 64211

THREAD TIMER:
Delay in ticks: 18,890 == 12,118 microseconds
Delay in ticks: 17,493 == 11,221 microseconds
Delay in ticks: 11,750 == 7,537 microseconds
Delay in ticks: 11,824 == 7,585 microseconds
Delay in ticks: 11,914 == 7,643 microseconds
Delay in ticks: 11,858 == 7,607 microseconds
Delay in ticks: 11,935 == 7,656 microseconds
Delay in ticks: 12,049 == 7,729 microseconds
Delay in ticks: 12,108 == 7,767 microseconds
Delay in ticks: 24,953 == 16,007 microseconds
Average 14477.4
Min 11750
Max 24953
Range 13203

LOOP AND SLEEP:
Delay in ticks: 7,346 == 4,712 microseconds
Delay in ticks: 7,367 == 4,726 microseconds
Delay in ticks: 7,423 == 4,762 microseconds
Delay in ticks: 7,494 == 4,807 microseconds
Delay in ticks: 7,542 == 4,838 microseconds
Delay in ticks: 7,408 == 4,752 microseconds
Delay in ticks: 20,249 == 12,989 microseconds
Delay in ticks: 20,275 == 13,006 microseconds
Delay in ticks: 20,351 == 13,055 microseconds
Delay in ticks: 20,383 == 13,075 microseconds
Average 12583.8
Min 7346
Max 20383
Range 13037

SpinWait tests (neglecting PrivilegedProcessorTime):

Number of ticks for a SpinWait: 4,833
Average ticks for 100x SpinWaits: 1,422 == 912.0831 microseconds
SpinWait caused an error of just: 0.0000 microseconds

MICRO TIMER CLASS:
Delay in ticks: 772 == 495 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 34 == 22 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 5 == 3 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 5 == 3 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 7 == 4 microseconds
(Delay at wakeup time was 2 tick)
MicroTimer stopped.
Average 85.3
Min 5
Max 772
Range 767

Advertisements

migration C#, Java, C++ (day 11), future and promise

There is hardly anything directly to compare with in C#. Future and promise are C++ concepts, which are probably best compared to C# task concepts. The comparison was not easy today. Have a look at the outcome.

Wiki

future and promise

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Program {
    public class Day11 {

        private class Params {
            public int Input;
            public int Result;
        }

        private void DoSomething() {
            Console.WriteLine("DoSomething()");
            Thread.Sleep(4000);
        }  //     

        private void Method1(object xObject) {
            Params lParams = xObject as Params;
            Console.WriteLine("Method1");
            Thread.Sleep(2000);
            lParams.Result = 2 * lParams.Input;
        } //

        private int Method2(int xInt1) {
            Console.WriteLine("Method2");
            Thread.Sleep(2000);
            return 2 * xInt1;
        } //

        private async Task<int> Method3(int xInt1) {
            await Task.Delay(0);
            Console.WriteLine("Method3");
            return 2 * xInt1;
        } //

        private void Method4(Params xParams, AutoResetEvent xEvent) {
            Console.WriteLine("Method4");
            Thread.Sleep(2000);
            xEvent.WaitOne();
            xParams.Result = 2 * xParams.Input;
        } //


        static void Main(string[] args) {
            Day11 lDay10 = new Day11();
            lDay10.test();
        } //

        public async void test() {
            int i;

            // use a thread to get an asynchronous result
            ParameterizedThreadStart lParams = new ParameterizedThreadStart(Method1);
            Thread thread1 = new Thread(lParams);
            Params p = new Params() { Input = 123 };
            thread1.Start(p);
            Console.WriteLine("Method1.join()");
            thread1.Join();
            Console.WriteLine("Method1 result is: " + p.Result);

            Func<int, int> t1 = new Func<int, int>(x => { return Method2(x); });
            i = t1(123); // synchronous
            IAsyncResult lResult = t1.BeginInvoke(123, null, null); // asynchronously
            lResult.AsyncWaitHandle.WaitOne();

            i = await Method3(123);
            Console.WriteLine("Method3 result is: " + i);

            p.Input = 123;
            p.Result = 0;
            AutoResetEvent lEvent = new AutoResetEvent(false);
            Task t2 = new Task(() => Method4(p, lEvent));
            t2.Start();
            lEvent.Set();
            t2.Wait();
            Console.WriteLine("Method4 result is: " + p.Result);

            Console.ReadLine();
        } //

    } // class
} // namespace

example output:
Method1.join()
Method1
Method1 result is: 246
Method2
Method2
Method3
Method3 result is: 246
Method4
Method4 result is: 246

#include <future>
#include <iostream>
#include <thread>

using namespace std;

void DoSomething() {
  cout << "DoSomething()" << endl;
  this_thread::sleep_for(chrono::seconds(4));
}  //

void Thread1(int xInt1, int &xInt2) {
  cout << "Thread1" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  xInt2 = 2 * xInt1;
} //

int Thread2(int xInt1) {
  cout << "Thread2" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  return 2 * xInt1;
} //

int Thread3(future<int> &xInt1) {
  cout << "Thread3" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  return 2 * xInt1.get(); // .get() => waits until the promise is fullfilled
} //

int Thread4(shared_future<int> xInt1) {
  cout << "Thread4" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  return 2 * xInt1.get(); // returns the same value to all waiting futures
} //

void test(){
  int i;

  // use a thread to get an asynchronous result
  thread t1(Thread1, 123, ref(i));
  cout << "Thread1.join()" << endl;
  t1.join();
  cout << "Thread1 result is: " << i << endl;

  // like a task, may be synchronous or asychronous  
  future<int> f1 = async(Thread2, 123);
  cout << "Thread2, f1.get()" << endl;
  i = f1.get(); // waits

  // like a synchronous task, which is running on the same thread
  // will be called when you get the value
  future<int> f2 = async(launch::deferred, Thread2, 123); 
  cout << "Thread2, f2.get()" << endl; 
  i = f2.get(); // waits
  cout << "Thread2 deferred result is: " << i << endl;

  // like an asynchronous task, which is running on a new thread
  future<int> f3 = async(launch::async, Thread2, 123); 
  cout << "Thread2, f3.get()" << endl; 
  i = f3.get(); // waits
  cout << "Thread2 async result is: " << i << endl;

  // same as f1, because this is the default value; the system decides what to do
  future<int> f4 = async(launch::async | launch::deferred, Thread2, 123);
  i = f4.get(); // waits

  promise<int> lPromise5; // promise to provide a value at a later time  
  future<int> f5 = lPromise5.get_future();
  future<int> f5b = async(launch::async, Thread3, ref(f5));
  DoSomething();
  cout << "lPromise5.set_value()" << endl;
  lPromise5.set_value(123); // fulfill our promise now
  cout << "f5b async result is: " << f5b.get() << endl;

  promise<int> lPromise6; // promise to provide a value at a later time
  future<int> f6 = lPromise6.get_future();
  DoSomething();
  // tell the parallel thread to stop waiting for the promise fulfillment
  lPromise6.set_exception(make_exception_ptr(runtime_error("sorry, I cannot fulfill my promise")));  

  //
  // SOME PRACTICAL ISSUES
  //
  promise<int> lPromise7; // promise to provide a value at a later time
  future<int> f7 = lPromise7.get_future();
  promise<int> lPromise8 = move(lPromise7); // you cannot assign a promise, you have to move it
  future<int> f8 = move(f7); // same with the future    

  // you cannot call future.get() more than once
  // to allow multiple consumers use:
  shared_future<int> f9 = f8.share();
  future<int> f10 = async(launch::async, Thread4, f9);
  future<int> f11 = async(launch::async, Thread4, f9);
  future<int> f12 = async(launch::async, Thread4, f9);
  future<int> f13 = async(launch::async, Thread4, f9);
  lPromise8.set_value(123);
  cout << "f10: " << f10.get() << ", f11:" << f11.get() << ", f12:" << f12.get() << ", f13:" << f13.get() << endl;

  // packaged_task
  auto t2 = bind(Thread2, 123);
  t2(); // synchronous

  packaged_task<int()> t3(bind(Thread2, 123)); // wrapper to make async calls
  t3(); // call the task  in a different context
  future<int> f14 = t3.get_future(); // getting a future is not possible with t2 
  int x = f14.get();
  cout << "f14: " << x << endl;

  cin.get();
} //

example output:
Thread1.join()
Thread1

Thread1 result is: 246
Thread2, f1.get()
Thread2

Thread2, f2.get()
Thread2
Thread2 deferred result is: 246
Thread2, f3.get()
Thread2

Thread2 async result is: 246
Thread2
DoSomething()
Thread3

lPromise5.set_value()
f5b async result is: 246
DoSomething()
Thread4
Thread4

Thread4
Thread4
f10: 246, f11:246, f12:246, f13:246
Thread2
Thread2
f14: 246

package Program;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Day11 {

  private static void Sleep(int xMilliseconds) {
    try {
      Thread.sleep(xMilliseconds);
    } catch (InterruptedException ex) {
      System.err.println(ex.getMessage());
    }
  } //

  private class Thread1 extends Thread {

    public int Input;
    public int Result;

    @Override
    public void run() {
      System.out.println("Thread1");
      Sleep(2000);
      Result = 2 * Input;
    } //
  } // class

  private class Thread2 {

    private final ExecutorService pool = Executors.newFixedThreadPool(5);

    public Future<Integer> getPromise(final int xInt1) {
      return pool.submit(() -> {
        System.out.println("Thread2");
        Sleep(2000);
        return 2 * xInt1;
      });
    } //
  } // class

  private class Thread3 implements Callable<Integer> {

    private final ExecutorService _Pool = Executors.newFixedThreadPool(5);
    private int _Int1;

    public Future<Integer> getPromise(final int xInt1) {
      _Int1 = xInt1;
      return _Pool.submit(this);
    } //

    @Override
    public Integer call() throws Exception {
      synchronized (this) {
        this.wait();
      }
      System.out.println("Thread3");
      Sleep(2000);
      return 2 * _Int1;
    } //
  } // class

  public final void test() throws InterruptedException, ExecutionException {

    // use a thread to get an asynchronous result    
    Thread1 t1 = new Thread1();
    t1.Input = 123;
    t1.start();
    System.out.println("Thread1.join()");
    t1.join();
    System.out.println("Thread1 result is: " + t1.Result);

    // like a task, may be synchronous or asychronous 
    Thread2 t2 = new Thread2();
    Future<Integer> f1 = t2.getPromise(123);
    System.out.println("Thread2, f1.getPromise(123)");
    int i = f1.get(); // waits   
    System.out.println("f1 result is: " + i);

    // deferred
    Thread3 t3 = new Thread3();
    Future<Integer> f2 = t3.getPromise(123);
    synchronized (t3) {
      t3.notify(); // start calculation manually    
    }
    System.out.println("Thread3 result is: " + f2.get()); // wait

    new Scanner(System.in).nextLine();
  } //

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    Day11 lDay11 = new Day11();
    lDay11.test();
  } //

} // class

example output:
Thread1
Thread1.join()
Thread1 result is: 246
Thread2
Thread2, f1.getPromise(123)
f1 result is: 246
Thread3
Thread3 result is: 246

migration C#, Java, C++ (day 8), TCP, Qt

The last two days were characterized by incompatibilities. There is no TCP support in the C++ standard libraries. I knew why I tried to avoid Boost and Qt. Visual Studio 2013 has its problems with Boost. You get really weird error messages that cannot be fixed at all. Why should Visual Studio actively support Boost? They have their own libraries, which are pretty cool.
I was fighting with several compilers and tools that generated roughly 50 GB of library data in total. Finally I gave up as it is really as mad as it is described on some web pages. I ended up installing the Qt Creator and had to rewrite the entire C++ code. I have to say that Qt is structured in a much better way than Boost. Overall the C++ coding took roughly 20 times longer than the C# coding, which was completed within 20 minutes. Most of that time was due to the incompatibility fight.
C++ executes faster, but let’s take a company’s point of view. If someone invests 250,000 USD in extra production time, wouldn’t it be smarter to buy 10x faster computers instead?
In the stock market, there are many traders telling you that it is all about speed. Not really! Data is coming in a sequence. I do know many situations where you have to slow down the system to solve specific sequence problems. Most market participant aren’t even aware of this.

Today I compare two TCP chat programs. C# is straight forward. The C++ side looks a bit weird, because the Qt Q_OBJECT macro requires header files. I did not want to split up the code into many pieces, so I kept the code short and I also kept it in the header sections.

ADDENDUM 21 May 2014

The Java code has been added today. The code structure is better than the C# example. The short Java test program deals with 1 server and 2 clients.

Below the Java code you can find a second C# example that uses the improved object approach similar to the Java code.

TCP

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace DemoApp.ToCpp {
   public class Day8 {

      public class NetworkListener {

         private volatile bool _ExitLoop = true;
         private TcpListener _Listener;

         public int Port { get; private set; }
         public string IpAddress { get; private set; }
         public string ThreadName { get; private set; }
         private List<Client> _AllClients = new List<Client>();

         public NetworkListener(string xIpAddress, int xPort, string xThreadName) {
            Port = xPort;
            IpAddress = xIpAddress;
            ThreadName = xThreadName;
         } //

         private class Client {
            private TcpClient _TcpClient;
            private NetworkStream _NetworkStream;
            public delegate void dOnShutDown(Client xClient);
            public event dOnShutDown OnShutDown;

            private volatile bool _ExitLoop = false;
            public readonly string IpAddress;

            public Client(TcpClient xTcpClient) {
               _TcpClient = xTcpClient;
               IpAddress = xTcpClient.Client.LocalEndPoint.ToString();
               Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " new client connected.");
            } //

            public void Disconnect() {
               _ExitLoop = true;
               dOnShutDown lEvent = OnShutDown;
               if (lEvent != null) lEvent(this);
               if (_NetworkStream != null) _NetworkStream.WriteTimeout = 5; // immediate timeout
            } //

            public void ListenAsync() {
               Thread lThread = new Thread(new ThreadStart(Listen));
               lThread.IsBackground = true;
               lThread.Name = "Client_" + IpAddress;
               lThread.Start();
            } //

            private void Listen() {
               using (_NetworkStream = _TcpClient.GetStream()) {
                  using (StreamReader lStreamReader = new StreamReader(_NetworkStream)) {
                     while (!_ExitLoop) {
                        try {
                           string s = lStreamReader.ReadLine();
                           if (s == null) break;
                           Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " Client said: " + s);
                        }
                        catch (IOException) {
                           if (_ExitLoop) Console.WriteLine("SERVER MESSAGE -> user requested TcpClient shutdown");
                           else Console.WriteLine("SERVER MESSAGE -> disconnected");
                        }
                        catch (Exception ex) { Console.WriteLine(ex.Message); }
                     }
                  }
                  Console.WriteLine("SERVER MESSAGE -> " + IpAddress + " Listener is shutting down");
               }
               _NetworkStream = null;
            } //
         } // class

         public bool WaitForClientsAsync() {
            if (!_ExitLoop) {
               Console.WriteLine("SERVER MESSAGE -> Listener running already");
               return false;
            }
            _ExitLoop = false;

            try {
               _Listener = new TcpListener(IPAddress.Parse(IpAddress), Port);
               _Listener.Start();

               Thread lThread = new Thread(new ThreadStart(WaitForAClient));
               lThread.IsBackground = true;
               lThread.Name = ThreadName;
               lThread.Start();

               return true;
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
            return false;
         } //

         public void Disconnect() {
            _ExitLoop = true;
            List<Client> lClone; // we need a clone, because the _AllClients collection will be manipulated during the foreach loop
            lock (_AllClients) { lClone = new List<Client>(_AllClients); }
            foreach (Client lClient in lClone) lClient.Disconnect();
            _Listener.Stop();
         } //

         private void WaitForAClient() {
            try {
               while (!_ExitLoop) {
                  int lClientCount;
                  lock (_AllClients) lClientCount = _AllClients.Count;
                  Console.WriteLine("SERVER MESSAGE -> Waiting for client number " + lClientCount);
                  TcpClient lTcpClient = _Listener.AcceptTcpClient();
                  Client lClient = new Client(lTcpClient);
                  lClient.OnShutDown += OnClientShutDown;
                  lock (_AllClients) _AllClients.Add(lClient);
                  lClient.ListenAsync(); // starts a background thread
               }
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         }

         void OnClientShutDown(NetworkListener.Client xClient) {
            lock (_AllClients) { _AllClients.Remove(xClient); }
         } //         

      } // class

      public class NetworkClient {
         public int Port { get; private set; }
         public string IpAddress { get; private set; }
         public string ThreadName { get; private set; }

         private NetworkStream _NetworkStream = null;
         private bool _ExitLoop = true;
         private BlockingCollection<string> _Queue = new BlockingCollection<string>();

         public NetworkClient(string xIpAddress, int xPort, string xThreadName) {
            Port = xPort;
            IpAddress = xIpAddress;
            ThreadName = xThreadName;
         } //

         public void ConnectAsync() {
            if (!_ExitLoop) return; // running already
            _ExitLoop = false;

            Thread lThread = new Thread(new ThreadStart(Loop));
            lThread.IsBackground = true;
            lThread.Name = ThreadName;
            lThread.Start();
         } //

         public void Disconnect() {
            _ExitLoop = true;
            _Queue.Add(null);
            if (_NetworkStream != null) _NetworkStream.ReadTimeout = 100;
         } //

         public void Send(string xText) {
            if (string.IsNullOrEmpty(xText)) return;
            _Queue.Add(xText); // thread safety by context switching
         } //

         private void Loop() {
            try {
               using (TcpClient lClient = new TcpClient()) {
                  lClient.Connect(IpAddress, Port);
                  using (_NetworkStream = lClient.GetStream()) {
                     using (StreamWriter lStreamWriter = new StreamWriter(_NetworkStream)) {
                        while (!_ExitLoop) {
                           try {
                              string s = _Queue.Take();
                              if (string.IsNullOrEmpty(s)) break;
                              lStreamWriter.WriteLine(s);
                           }
                           catch (System.IO.IOException) {
                              if (_ExitLoop) Console.WriteLine("CLIENT -> User requested shutdown.");
                              else Console.WriteLine("CLIENT -> disconnected");
                           }
                           catch (Exception ex) { Console.WriteLine(ex.Message); }
                        }
                     }
                     _ExitLoop = true;
                     Console.WriteLine(Environment.NewLine + "CLIENT -> shutting down");
                  }
               }
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         } //

      } // class


      public static void Test() {
         NetworkListener lServer = new NetworkListener("127.0.0.1", 65432, "Server");
         NetworkClient lClient = new NetworkClient("127.0.0.1", 65432, "Client");

         lServer.WaitForClientsAsync();

         lClient.ConnectAsync();
         lClient.Send("Hello World!"); // send a text message across the network

         System.Threading.Thread.Sleep(1000);

         lClient.Disconnect();
         lServer.Disconnect();

         Console.ReadLine();
      } //

   } // class
} // namespace

example output:
SERVER MESSAGE -> Waiting for client number 0
SERVER MESSAGE -> 127.0.0.1:65432 new client connected.
SERVER MESSAGE -> Waiting for client number 1
SERVER MESSAGE -> 127.0.0.1:65432 Client said: Hello World!

CLIENT -> shutting down
SERVER MESSAGE -> 127.0.0.1:65432 Listener is shutting down
A blocking operation was interrupted by a call to WSACancelBlockingCall

#pragma once

#include <iostream>
#include <QtNetwork>
#include <QObject>
#include <QString>
#include <QTcpSocket>
#include <string>

using namespace std;

class Client : QObject {
    Q_OBJECT

private:
  int _Port = 0;
  QString _IpAddress;
  QTcpSocket _TcpSocket;

private slots:
    void Connected() { cout << "CLIENT MESSAGE -> connection established " << _IpAddress.toStdString() << ":" << _Port << endl; }

public:
   inline int getPort() { return _Port; }
   inline QString getIpAddress() { return _IpAddress; }

   Client(QString xIpAddress, int xPort) : QObject() {
    _IpAddress = xIpAddress;
    _Port = xPort;
   }
   ~Client() { cout << "deconstructing Client class " << _IpAddress.toStdString() << endl; }
   Client(QObject* lParent): QObject(lParent) { cout << "Client :)" << endl; }

  void Connect() {
      connect(&_TcpSocket, SIGNAL(connected()), this, SLOT(Connected()));
      QHostAddress lHostAddress(_IpAddress);
      _TcpSocket.connectToHost(lHostAddress, _Port);
      while (!_TcpSocket.waitForConnected( 2000 )) {
        cout << "CLIENT MESSAGE -> waiting for a feedback" << endl;
      }
  }

  void Disconnect() {
    _TcpSocket.close();
  } //

  void Send(const string &xText) {
    if (xText == "") return;
    const char *lText = xText.c_str();
    _TcpSocket.write(lText, xText.length() + 1);
  } //

};

//----------------------------------------------------------------------------------------------------------------------

#pragma once

#include <QTcpSocket>
#include <QHostAddress>
#include <thread>
#include <set>
#include <iostream>
#include <QtNetwork>

using namespace std;

class Connection : QObject {
    Q_OBJECT

  private:
    QTcpSocket *_TcpSocket;
    string _IpAddress;

    void Read() {
      try {
        char lBuffer[1024] = {0};
        _TcpSocket->read(lBuffer, _TcpSocket->bytesAvailable());
        cout << "SERVER MESSAGE -> " << _IpAddress << " Client said: " << lBuffer << endl;
      }
        catch (exception &ex) { cout << ex.what() << endl; }
    } //


  public:
    const string &IpAddress() const { return _IpAddress; }

    void Disconnect() {
      _TcpSocket->close();
      delete _TcpSocket;
    } //

    Connection(QTcpSocket *xTcpSocket) : _TcpSocket(xTcpSocket) {
      QHostAddress lAddress = xTcpSocket->localAddress();
      QString s = lAddress.toString();
      _IpAddress = s.toLocal8Bit().constData();
      connect(xTcpSocket, SIGNAL(readyRead()), this, SLOT(Read()));  // setup event
      cout << "SERVER MESSAGE -> " + _IpAddress << " new client connected." << endl;
    } //
    Connection(QObject* lParent): QObject(lParent) { cout << "Connection :)"; }
    ~Connection() { Disconnect(); cout << "deconstructing Connection class " << _IpAddress << endl; }

 }; // class

//----------------------------------------------------------------------------------------------------------------------

#pragma once

#include <iostream>
#include <QtNetwork>
#include <QObject>
#include <QString>
#include <QTcpSocket>
#include <QTcpServer>
#include <string>
#include "Connection.cpp"
#include <mutex>


using namespace std;

class Server : QObject {
    Q_OBJECT

private:
    list<Connection*> _AllClients;
    mutex mutex_AllClients;
    int _Port;
    string _IpAddress;
    QTcpServer _TcpServer;

    void OnClientShutDown(Connection *xClient) {
        cout << "SERVER MESSAGE -> Connection shutting down" << endl;
        mutex_AllClients.lock();
        _AllClients.remove(xClient);
        mutex_AllClients.unlock();
        xClient->Disconnect();
    } //

private slots:
    void NewConnection() {
        cout << "SERVER MESSAGE -> new connection" << endl;
        QTcpSocket *lSocket = _TcpServer.nextPendingConnection();
        Connection *lClient = new Connection(lSocket);
        mutex_AllClients.lock();
        _AllClients.push_back(lClient);
        mutex_AllClients.unlock();
    } //

public:    
    Server(QObject* lParent): QObject(lParent) { cout << "Server :)"; }
    ~Server() { cout << "deconstructing Server class " << endl; }
    inline int getPort() { return _Port; }
    inline string getIpAddress() { return _IpAddress; }    

    Server(string xIpAddress, int xPort) : QObject() {
      _Port = xPort;
      _IpAddress = xIpAddress;
      connect(&_TcpServer, SIGNAL(newConnection()), this, SLOT(NewConnection())); // setup event
      QString lIpAddress(xIpAddress.c_str());
      //QHostAddress lHostAddress(lIpAddress);
      if (_TcpServer.listen(QHostAddress::Any, xPort)) {
      //if(_TcpServer.listen(lHostAddress, xPort)) {
        cout << "SERVER MESSAGE -> listener up and running" << endl;
      }
    } //

    void Disconnect() {
      mutex_AllClients.lock();
      list<Connection*> lClone;
      list<Connection*>::const_iterator lIterator;
      for (lIterator = _AllClients.begin(); lIterator != _AllClients.end(); ++lIterator) {
        lClone.push_back(*lIterator);  // we need a clone, because the _AllClients collection will be manipulated during the foreach loop
      }
      mutex_AllClients.unlock();

      for (Connection *lClient : lClone) lClient->Disconnect();
      _TcpServer.close();
    } //

}; // class

//----------------------------------------------------------------------------------------------------------------------

#include <QApplication>
#include <iostream>
#include <thread>
#include "Server.h"
#include "Client.h"

int main(int argc, char** argv)
{
  QApplication app(argc, argv);
  try {

    Server lServer("127.0.0.1", 65432);
    Client lClient("127.0.0.1", 65432);

    lClient.Connect();    

    this_thread::sleep_for(chrono::milliseconds(1000)) ;
    lClient.Send("Hello World!"); // send a text message across the network
    this_thread::sleep_for(chrono::milliseconds(1000)) ;

    lClient.Disconnect();
    lServer.Disconnect();

    cout << "press enter to exit" << endl;
    cin.get();
  }
  catch (exception &ex) { cerr << ex.what() << endl; }
  return app.exec();
} //
package DemoApp;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;

public final class Server extends Thread {

  private final ArrayList<Client> _Clients = new ArrayList<>();
  public final int port;
  ServerSocket _ServerSocket;

  public Server(String xName, int xPort) throws IOException {
    super(xName);
    port = xPort;
    setDaemon(true);
    _ServerSocket = new ServerSocket(xPort);
  } // constructor

  public void close() throws IOException {
    this.interrupt(); // exit loop
    synchronized (_Clients) {
      for (Client lClient : _Clients) lClient.close();
      _Clients.clear();
    }

    _ServerSocket.close();
    System.out.println(_ServerSocket.getLocalSocketAddress().toString() + " server closed down");
  } //

  public void broadcast(String xMessage) {
    synchronized (_Clients) {
      for (Client lClient : _Clients)
        lClient.getWriter().send(xMessage);
    }
  } //

  @Override
  public void run() {
    System.out.println(_ServerSocket.getLocalSocketAddress().toString() + " server is waiting for clients");

    try {
      while (true) {
        Socket lSocket = _ServerSocket.accept();  // wait for a client to connect
        Client lClient = new Client(lSocket);
        synchronized (_Clients) {
          _Clients.add(lClient);
        }
        lClient.connect();
      }
    } catch (IOException ex) {
      //System.out.println(ex.getMessage());
    }
  } //

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.IOException;
import java.net.Socket;

public class Client {

  private Socket _Socket = null;
  public final String IpAddressHost;
  public final String IpAddressLocal;
  public final String IpConnectionString;
  private Reader _Reader = null;
  private Writer _Writer = null;

  public Socket getSocket() {
    return _Socket;
  } //

  public Reader getReader() {
    return _Reader;
  } //

  public Writer getWriter() {
    return _Writer;
  } //

  public void close() throws IOException {
    synchronized (this) {
      if (_Reader != null) _Reader.close(true);
      _Reader = null;
      if (_Writer != null) _Writer.close(true);
      _Writer = null;
    }

    System.out.println(IpConnectionString + " client closed down");
  } //

  public Client(Socket xSocket) {
    _Socket = xSocket;
    IpAddressHost = xSocket.getInetAddress().getHostAddress() + ":" + _Socket.getPort();
    IpAddressLocal = xSocket.getLocalAddress().getHostAddress() + ":" + _Socket.getLocalPort();
    IpConnectionString = "(Local " + IpAddressLocal + ",  Host " + IpAddressHost + ")";
  } //

  public void connect() throws IOException {
    _Reader = new Reader(this);
    _Writer = new Writer(this);
    _Reader.start();  // start listening
    _Writer.start();  // start write loop        
  } //  

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

public class Reader extends Thread {

  private final Client _Client;
  private BufferedReader _StreamReader;

  public Reader(Client xClient) throws IOException {
    _Client = xClient;
    Socket lSocket = xClient.getSocket();
    _StreamReader = new BufferedReader(new InputStreamReader(lSocket.getInputStream()));
  } // constructor

  public void close(boolean xExitLoop) throws IOException {
    synchronized (this) {
      if (xExitLoop) interrupt();
      if (_StreamReader == null) return;
      _StreamReader.close();
      _StreamReader = null;
    }

    System.out.println(_Client.IpConnectionString + " closed reader connection");
  } //

  @Override
  public void run() {
    System.out.println(_Client.IpConnectionString + " start new reader connection");

    while (true)
      try {
        String lInput = _StreamReader.readLine();
        if (lInput == null) break;
        if (lInput.isEmpty()) continue;

        System.out.println(_Client.IpConnectionString + " message received: " + lInput);
        if ("EXIT".equals(lInput)) break;

      } catch (IOException | RuntimeException ex) {
        System.out.println(ex);
      }
    try {
      close(false);
    } catch (IOException ex) {
    }
  } //

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.LinkedList;

public class Writer extends Thread {

  private final Client _Client;
  private PrintWriter _PrintWriter;

  private final LinkedList<String> _WriteQueue = new LinkedList<>();

  public Writer(Client xClient) throws IOException {
    setDaemon(true);
    _Client = xClient;
    Socket lSocket = xClient.getSocket();
    _PrintWriter = new PrintWriter(lSocket.getOutputStream(), true);
  } // constructor

  public void close(boolean xExitLoop) throws IOException {
    synchronized (this) {
      if (xExitLoop) interrupt();
      if (_PrintWriter == null) return;
      _PrintWriter.close();
      _PrintWriter = null;
    }
    System.out.println(_Client.IpConnectionString + " closed writer connection");
  } //

  public void send(String xText) {
    if (xText == null) return;

    synchronized (_WriteQueue) {
      _WriteQueue.addLast(xText);
      _WriteQueue.notify();
    }
  } //

  @Override
  public void run() {
    System.out.println(_Client.IpConnectionString + " start new writer connection");

    try {
      while (true) {
        String lText = null;
        synchronized (_WriteQueue) {
          if (_WriteQueue.size() > 0) lText = _WriteQueue.removeFirst();
          else _WriteQueue.wait();
        }
        if (lText == null) continue;
        if (_PrintWriter == null) continue;
        _PrintWriter.println(lText);
      }
    } catch (InterruptedException ex) {
      if (ex.getMessage() != null) System.out.println(ex.getMessage());
    }
    try {
      close(false);
    } catch (IOException ex) {
    }
  } //

} // class

// -----------------------------------------------------------------------------------------------------------------------

package DemoApp;

import java.io.IOException;
import java.net.Socket;

public class Test {

  private static void Sleep() {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException ex) {
      System.out.println(ex.getMessage());
    }
  } //

  public static void main(String[] args) throws IOException {
    Server lServer = new Server("MyServer", 65432);
    lServer.start();
    Sleep();

    Socket lClientSocket1 = new Socket("localhost", 65432);
    Client lClient1 = new Client(lClientSocket1);
    lClient1.connect();
    Sleep();

    Socket lClientSocket2 = new Socket("localhost", 65432);
    Client lClient2 = new Client(lClientSocket2);
    lClient2.connect();
    Sleep();

    lClient1.getWriter().send("client->server: hello server!");
    Sleep();

    lServer.broadcast("server->client: hello client!");
    Sleep();

    lClient1.getWriter().send("EXIT");
    lClient1.close();

    lClient2.getWriter().send("EXIT");
    lClient2.close();

    Sleep();

    lServer.close();
    //System.in.read();
    Sleep();
  }
} // class

example output:
0.0.0.0/0.0.0.0:65432 server is waiting for clients
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) start new reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) start new writer connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) message received: client->server: hello server!
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) message received: EXIT
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) closed reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) closed reader connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) closed writer connection
(Local 127.0.0.1:54059, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) message received: EXIT
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) closed reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) closed reader connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) closed writer connection
(Local 127.0.0.1:54060, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) closed writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54059) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) closed writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:54060) client closed down
0.0.0.0/0.0.0.0:65432 server closed down

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   public class Server {

      private readonly List<Client> _Clients = new List<Client>();
      public readonly int Port;
      private TcpListener _TcpListener = null;
      private Thread _Thread;
      private readonly string Name;

      public Server(String xName, int xPort) {
         Name = xName;
         Port = xPort;
      } // constructor

      public bool start() {
         try {
            _TcpListener = new TcpListener(IPAddress.Parse("127.0.0.1"), Port);
            _TcpListener.Start();

            _Thread = new Thread(loop);
            _Thread.IsBackground = true;
            _Thread.Name = Name;
            _Thread.Start();
         }
         catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); return false; }
         return true;
      } //

      public void Close() {
         if (_Thread != null) { _Thread.Interrupt(); _Thread = null; }
         lock (_Clients) {
            foreach (Client lClient in _Clients) lClient.close();
            _Clients.Clear();
         }

         _TcpListener.Stop();
         Console.WriteLine(_TcpListener.LocalEndpoint.ToString() + " server closed down");
         _TcpListener = null;
      } //

      public void broadcast(String xMessage) {
         lock (_Clients) {
            foreach (Client lClient in _Clients) lClient.Writer.Send(xMessage);
         }
      } //

      private void loop() {
         Console.WriteLine(_TcpListener.LocalEndpoint.ToString() + " server is waiting for clients");

         try {
            while (true) {
               int lClientCount;
               lock (_Clients) lClientCount = _Clients.Count;
               TcpClient lTcpClient = _TcpListener.AcceptTcpClient();   // wait for a client to connect
               Client lClient = new Client(lTcpClient);
               lock (_Clients) _Clients.Add(lClient);
               lClient.Connect();
            }
         }
         catch (ThreadInterruptedException) { } // Thread interruption
         catch (SocketException) { } // Thread interruption => SocketException
         catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); }
      } //

   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System;
using System.Net.Sockets;

namespace Tcp {
   public class Client {
      public TcpClient TcpClient { get; private set; }
      public readonly String IpAddressHost;
      public readonly String IpAddressLocal;
      public readonly String IpConnectionString;
      public Reader Reader { get; private set; }
      public Writer Writer { get; private set; }

      public void close() {
         lock (this) {
            if (Reader != null) Reader.Close(true);
            Reader = null;
            if (Writer != null) Writer.Close(true);
            Writer = null;
            if (TcpClient != null) {
               TcpClient.Close();
               Console.WriteLine(IpConnectionString + " TcpClient read/write closed");
            }
            TcpClient = null;
         }

         Console.WriteLine(IpConnectionString + " client closed down");
      } //

      public Client(string xHostname, int xPort) : this(new TcpClient(xHostname, xPort)) { }   // create and connect

      public Client(TcpClient xTcpClient) {
         TcpClient = xTcpClient;
         IpAddressHost = TcpClient.Client.RemoteEndPoint.ToString();
         IpAddressLocal = TcpClient.Client.LocalEndPoint.ToString();
         IpConnectionString = "(Local " + IpAddressLocal + ",  Host " + IpAddressHost + ")";
      } //

      public void Connect() {
         Reader = new Reader(this);
         Writer = new Writer(this);
         Reader.Start();  // start listening
         Writer.Start();  // start write loop        
      } //  

   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   public class Reader {

      private readonly Client _Client;
      private StreamReader _StreamReader = null;
      private Thread _Thread = null;

      public Reader(Client xClient) {
         _Client = xClient;
         NetworkStream lNetworkStream = xClient.TcpClient.GetStream();
         _StreamReader = new StreamReader(lNetworkStream);
      } // constructor

      public void Start() {
         _Thread = new Thread(loop);
         _Thread.IsBackground = true;
         _Thread.Start();
      } //

      public void Close(bool xExitLoop) {
         lock (this) {
            if (xExitLoop && (_Thread != null)) { _Thread.Interrupt(); _Thread = null; }

            // _Client.TcpClient.Close() in Client object
            //if (_StreamReader == null) return;
            //_StreamReader.Close();
            //_StreamReader = null;
         }
         
         //Console.WriteLine(_Client.IpConnectionString + " closed reader connection");
      } //

      private void loop() {
         Console.WriteLine(_Client.IpConnectionString + " start new reader connection");

         while (true) {
            try {
               String lInput = _StreamReader.ReadLine();
               if (lInput == null) break;
               if (lInput == "") continue;

               Console.WriteLine(_Client.IpConnectionString + " message received: " + lInput);
               if ("EXIT".Equals(lInput)) break;

            }
            catch (ThreadInterruptedException) { break; } // Thread interruption
            catch (IOException) { break; } // StreamReader disposed
            catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); }
         }
         Close(false);
      } //

   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   public class Writer {

      private readonly Client _Client;
      private StreamWriter _StreamWriter = null;
      private Thread _Thread = null;

      private readonly BlockingCollection<String> _WriteQueue = new BlockingCollection<string>();

      public Writer(Client xClient) {
         _Client = xClient;
         NetworkStream lNetworkStream = xClient.TcpClient.GetStream();
         _StreamWriter = new StreamWriter(lNetworkStream);
      } // constructor

      public void Start() {
         _Thread = new Thread(Loop);
         _Thread.IsBackground = true;
         _Thread.Start();
      } //

      public void Close(bool xExitLoop) {
         lock (this) {
            if (xExitLoop && (_Thread != null)) { _Thread.Interrupt(); _Thread = null; }

            // _Client.TcpClient.Close() in Client object
            //if (_StreamWriter == null) return;
            //_StreamWriter.Close();
            //_StreamWriter = null;
         }
         //Console.WriteLine(_Client.IpConnectionString + " closed writer connection");
      } //

      public void Send(string xText) {
         if (string.IsNullOrEmpty(xText)) return;
         _WriteQueue.Add(xText);
      } //


      private void Loop() {
         Console.WriteLine(_Client.IpConnectionString + " start new writer connection");

         while (true) {
            try {
               string lText = null;
               lText = _WriteQueue.Take();
               if (string.IsNullOrEmpty(lText)) continue;
               if (_StreamWriter == null) continue;
               _StreamWriter.WriteLine(lText);
               _StreamWriter.Flush();
            }
            catch (ObjectDisposedException) { break; } 
            catch (ThreadInterruptedException) { break; } // Thread interruption
            catch (Exception ex) { Console.WriteLine(ex.StackTrace + Environment.NewLine + ex.Message); }
         }

         Close(false);
      } //


   } // class
} // namespace

// -----------------------------------------------------------------------------------------------------------------------

using System.Net.Sockets;
using System.Threading;

namespace Tcp {
   class Program {
      static void Main(string[] args) {

         Server lServer = new Server("MyServer", 65432);
         lServer.start();
         Thread.Sleep(2000);

         TcpClient lClientSocket1 = new TcpClient("localhost", 65432);
         Client lClient1 = new Client(lClientSocket1);
         lClient1.Connect();
         Thread.Sleep(2000);

         TcpClient lClientSocket2 = new TcpClient("localhost", 65432);
         Client lClient2 = new Client(lClientSocket2);
         lClient2.Connect();
         Thread.Sleep(2000);

         lClient1.Writer.Send("client->server: hello server!");
         Thread.Sleep(2000);

         lServer.broadcast("server->client: hello client!");
         Thread.Sleep(2000);

         lClient1.Writer.Send("EXIT");
         Thread.Sleep(2000);
         lClient1.close();

         lClient2.Writer.Send("EXIT");
         Thread.Sleep(2000);
         lClient2.close();

         Thread.Sleep(1000);

         lServer.Close();
         //Console.ReadLine();
         Thread.Sleep(5000);

      } // main
   } // class
} // namespace

example output:
127.0.0.1:65432 server is waiting for clients
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) start new reader connection
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) start new reader connection
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) start new reader connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) start new writer connection
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) start new writer connection
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) message received: client->server: hello server!
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) message received: server->client: hello client!
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) message received: EXIT
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) TcpClient read/write closed
(Local 127.0.0.1:58161, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) message received: EXIT
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) TcpClient read/write closed
(Local 127.0.0.1:58163, Host 127.0.0.1:65432) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) TcpClient read/write closed
(Local 127.0.0.1:65432, Host 127.0.0.1:58161) client closed down
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) TcpClient read/write closed
(Local 127.0.0.1:65432, Host 127.0.0.1:58163) client closed down
127.0.0.1:65432 server closed down

migration C#, Java, C++ (day 7), TBB

logo

This is some relaxing stuff today.
Recommended reading for variable parameters on the C# side: __arglist in the Print() method

variable parameters

double avg(int xCount, params double[] xDoubles) {
  double lSum = 0;
  foreach (double d in xDoubles) lSum += d;
  return lSum / xDoubles.Length;
} //
private double avg(Double... xDoubles) {
  double lSum = 0;
  for (double d : xDoubles) lSum += d;
  return lSum / xDoubles.length;
} //
// #include <cstdarg>
double avg(int xCount, ...) {
  va_list args;
  double lSum = 0;

  va_start(args, xCount);
  for (int i = 0; i < xCount; i++) lSum += va_arg(args, double);
  va_end(args);

  return lSum / xCount;
}

Parallel vs. TBB

public void DoSomething() {
   Console.Write(Thread.CurrentThread.ManagedThreadId + " ");
   Thread.Sleep(100);
} //

Stopwatch _Stopwatch = new Stopwatch();
void startTimer() {
   _Stopwatch.Reset();
   _Stopwatch.Start();
} //

void stopTimer() {
   long h = _Stopwatch.ElapsedMilliseconds;
   Console.WriteLine("timer millisecs: " + h);
   _Stopwatch.Restart();
} //

BlockingCollection<int> _Queue;
void Producer() {
   Console.WriteLine();
   for (int i = 10; i >= 0; i--) {
      DoSomething();
      if (!_Queue.TryAdd(i)) Console.WriteLine("failed to push a new value to the queue");
   }
} //


void TBBs() {
   const int lSize = 50;

   startTimer();
   for (int i = 0; i < lSize; i++) DoSomething();
   stopTimer();

   Parallel.For(0, lSize, x => DoSomething());
   stopTimer();

   List<int> lList = new List<int>();
   for (int i = 0; i < lSize; i++) lList.Add(i);

   startTimer();
   Parallel.ForEach(lList, x => DoSomething());
   stopTimer();

   Parallel.Invoke(DoSomething, DoSomething, DoSomething);
   stopTimer();

   Thread lThread = new Thread(Producer);
   lThread.Start();

   using (_Queue = new BlockingCollection<int>()) {
      while (true) {
         int x = _Queue.Take();
         Console.WriteLine("received value " + x);
         if (x == 0) break;
      }
   }
} //
public final void DoSomething() {
  System.out.print(Thread.currentThread().getId() + " ");
  try {
    Thread.sleep(100);
  } catch (InterruptedException ex) {
    System.out.println(ex.getMessage());
  }
}

public class Stopwatch {

  long _Time;

  public void startTimer() {
    _Time = System.currentTimeMillis();
  } //

  public void stopTimer() {
    long h = System.currentTimeMillis();
    System.out.println("timer millisecs: " + (h - _Time));
    _Time = h;
  } //
} // class

private final LinkedBlockingDeque<Integer> _Queue = new LinkedBlockingDeque<>();

private void Producer() {
  System.out.println();
  for (int i = 10; i >= 0; i--) {
    DoSomething();
    if (!_Queue.add(i))
      System.out.println("failed to push a new value to the queue");
  }
}

private void TBBs() {
  final int lSize = 50;
  Stopwatch lStopwatch = new Stopwatch();

  lStopwatch.startTimer();
  for (int i = 0; i < lSize; i++) DoSomething();
  lStopwatch.stopTimer();

  int lCPUs = Runtime.getRuntime().availableProcessors();
  ExecutorService lExecutor = Executors.newFixedThreadPool(lCPUs);
  for (int i = 0; i < lSize; i++) lExecutor.submit(() -> DoSomething());
  lStopwatch.stopTimer();

  java.util.ArrayList<Integer> lList = new java.util.ArrayList<>();
  for (int i = 0; i < lSize; i++) lList.add(i);

  lStopwatch.startTimer();
  lList.parallelStream().forEach(x -> DoSomething());
  lStopwatch.stopTimer();

  Callable<Void> lCallable = () -> {
    DoSomething();
    return null;
  };

  ArrayList<Callable<Void>> lCallables = new ArrayList<>();

  lCallables.add(lCallable);
  lCallables.add(lCallable);
  lCallables.add(lCallable);

  try {
    lExecutor.invokeAll(lCallables);
  } catch (InterruptedException ex) {
    System.out.println("InterruptedException: " + ex.getMessage());
  }
  lStopwatch.stopTimer();

  Thread lThread = new Thread(() -> {
    Producer();
  });
  lThread.start();

  while (true) {
    int x = 0;
    try {
      x = _Queue.take();
    } catch (InterruptedException ex) {
      System.out.println("InterruptedException: " + ex.getMessage());
    }
    System.out.println("received value " + x);
    if (x == 0) break;
  }
} //
// #include <thread>
void DoSomething() {
  cout << this_thread::get_id() << " ";
  this_thread::sleep_for(chrono::milliseconds(100));
} //

unsigned int _timer = 0;
// #include <ctime>
void startTimer() {
  _timer = clock();
}
void stopTimer() {
  unsigned int lTimer = clock();
  cout << endl << "timer millisecs: " << clock() - _timer << endl;
  _timer = lTimer;
}

tbb::concurrent_bounded_queue<int> *_Queue;
void Producer() {
  cout << endl;
  for (int i = 10; i >= 0; i--)  {
    DoSomething();
    if (!_Queue->try_push(i)) cout << "failed to push a new value to the queue" << endl;
  }
}


// #include <thread>
void TBBs() {
  const int lSize = 50;

  startTimer();
  for (int i = 0; i < lSize; i++) DoSomething();
  stopTimer();

  tbb::parallel_for(0, lSize, 1, [&](int i) { DoSomething(); });
  stopTimer();

  vector<int> lVector;
  for (int i = 0; i < lSize; i++) lVector.push_back(i);

  startTimer();
  tbb::parallel_for_each(lVector.begin(), lVector.end(), [](int i) { DoSomething(); });
  stopTimer();

  tbb::parallel_do(lVector.begin(), lVector.end(), [](int i) { DoSomething(); });
  stopTimer();

  tbb::parallel_invoke(DoSomething, DoSomething, DoSomething);

  _Queue = new tbb::concurrent_bounded_queue<int>();
  thread lThread(Producer);
  lThread.detach();

  int i;
  while (true) {
    _Queue->pop(i);
    cout << endl << "received value " << i << endl;
    if (i == 0) break;
  }

  delete _Queue;
  _Queue = nullptr;
} //

delegates/Action/Func vs. func

void F1(string s) { Console.WriteLine(s); }
void F2(string s) { Console.WriteLine(">" + s + "<"); }

public void test() {
   // delegates
   Action<string> f1 = F1;  // use Func<> for methods with return values
   f1("hello world");
   F1("hello world");
   Action<string> f3 = (s => f1(s));
   f3("hello echo");

   f1 = F2;
   f1("hello world");

   // variable argument list
   Console.WriteLine("average is " + avg(4, 1.1, 2.2, 3.3, 4.4));  // 2.75

   TBBs();
} //
private void F1(String s) { System.out.println(s); } 
private void F2(String s) { System.out.println(">" + s + "<"); } 

public interface Action<T> { void invoke(T t); }

public void test() {
  // there are neither delegates nor pointers in Java
  Action<String> f1 = s -> F1(s);
  f1.invoke("hello world");
  F1("hello world");
  Action<String> f3 = s -> f1.invoke(s);
  f3.invoke("hello echo");

  //f1 = s -> F2(s);  compiler error, becuase this would change the "effectively final" status of f1
  Action<String> f4 = f1; // simple assignment
  f4 = s -> F2(s);
  f4.invoke("hello world");

  // variable argument list
  System.out.println("average is " + avg(1.1, 2.2, 3.3, 4.4)); // 2.75

  TBBs();
} //
void F1(const string &s) { cout << s << endl; }
void F2(const string &s) { cout << ">" << s << "<" << endl; }

void test() {
  // delegates
  function<void(const string &)> f1 = F1;
  f1("hello world");
  F1("hello world");
  function<void(const string &)> f3 = [=](const string &s) { f1(s); };
  f3("hello echo");

  f1 = F2;
  f1("hello world");

  // variable argument list
  cout << "average is " << avg(4, 1.1, 2.2, 3.3, 4.4) << endl;  // 2.75

  TBBs();
} //

today’s requirements

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
#include "tbb/tbb.h"
#include "tbb/parallel_for.h"
#include "tbb/concurrent_queue.h"
#include "tbb/blocked_range.h"
#include <cstdarg>
#include <thread>
#include <iostream>
#include <string>
#include <ctime>

C# to C++ (advanced), review of day 4, locks and operators

As promised I am going to explain the C# side of post C# to C++ (day 4) today.
Let’s start with a dummy method. We will need it in the examples to fill in arbitrary code at the right places.

public void DoSomething() {
   Console.WriteLine("Good night my dear thread " + Thread.CurrentThread.ManagedThreadId);
   Thread.Sleep(2000);
} //

Lock

This is the standard. The Lock keyword is used frequently. The syntax is simple and locks are easy to deal with. My speed tests also tell that the speed impact of locks is negligible. Thus you can use locks without hesitation when needed.

object lAnyObject2 = new object();
public void LockUsage() {
   lock (lAnyObject2) {
      DoSomething();
   }
} //

Monitor

Locks are like macros that use the more complex Monitor class, which has more specific methods. If you don’t use the complexity of the Monitor class, then you are better of by simply using locks, which provide much better code legibility. Microsoft describes it like this:
“The functionality provided by the Enter and Exit methods is identical to that provided by the C# lock statement, except that lock wraps the Enter(Object, Boolean) method overload and the Exit method in a try…finally block to ensure that the monitor is released.”

object lAnyObject1 = new object();
public void MonitorUsage() {
   bool lLocked = false;
   try {
      Monitor.Enter(lAnyObject1, ref lLocked);  // using System.Threading;
      DoSomething();
   }
   finally {
      if (lLocked) Monitor.Exit(lAnyObject1);
   }
} //

Semaphore

Semaphores are used to limit the number of threads that can access resources. Let’s say you want to limit the output of a loudspeaker, because you cannot listen to five different sounds simultaneously. Nevertheless you want to hear urgent incoming chat message sounds from your friends while listening to nice background sounds of a game. In that case you could limit the number of threads using a resource to two. Or you are writing to a hard disk, which can deal with some simultaneous write operations. Still you want to limit the number of threads writing at the same time.
There is a slim class of Semaphores as well. SemaphoreSlim provides a lightweight semaphore class that doesn’t use Windows kernel semaphores.

private static Semaphore _Semaphore = new Semaphore(3, 3); // three objects allowed to access concurrently
public void SemaphoreUsage() {
   bool lLocked = false;
   try {
      lLocked = _Semaphore.WaitOne();
      DoSomething();
   }
   finally {
      if (lLocked) _Semaphore.Release();
   }
} //

Mutex

Mutexes are used across the system. They are identified by strings. Make sure you use unique strings to avoid conflicts. Let’s say you are running an application, which sets a Mutex. By mistake the user tries to start the same application again. But you can check the Mutex now and close/not start the second application.

private const string cMutexName = "MyMutex"
private static Mutex _Mutex = new Mutex(false, cMutexName);  // known by every process
public void MutexUsage() {
   bool lLocked = false;

   try {
      // optional complexity: access rights
      //bool lNewCreation;
      //MutexSecurity lSecurity = new MutexSecurity();
      //SecurityIdentifier lId = new SecurityIdentifier(WellKnownSidType.WorldSid, null);
      //MutexRights lRights = MutexRights.Synchronize | MutexRights.Modify;
      //lSecurity.AddAccessRule(new MutexAccessRule(lId, lRights, AccessControlType.Allow));
      //_Mutex = new Mutex(false, cMutexName, out lNewCreation, lSecurity);
      //MutexSecurity lReverse = _Mutex.GetAccessControl();

      lLocked = _Mutex.WaitOne(2000); // You can the option to set a time limit. Here 2000 milliseconds.
      if (!lLocked) {
         Console.WriteLine("Try again later. Mutex is used by another process or thread.");
      }

      DoSomething();
   }
   finally {
      if (lLocked == true) _Mutex.ReleaseMutex();
   }
} //

Operator overloading

It does look easy, but can cause complex situations. All you have to do is to declare a method as static and replace the method name by the word operator followed by the operator itself. The return value can be of any type. The parameters should have at least one type that equals the class type. The order of the parameters is important.

Let’s have a look at the example source code of post C# to C++ (day 4) again:

public class myClass {
   // this class is not thread safe
 
   int[] Values = { 1, 2, 3 };
 
   public static myClass operator +(myClass a, myClass b) {
      int n = a.Values.Length;
      myClass lNewClass = new myClass();
      for (int i = 0; i < n; i++) lNewClass.Values[i] = a.Values[i] + b.Values[i];
      return lNewClass;
   } //      
 
   public static double operator *(myClass a, myClass b) {
      int n = a.Values.Length;
      int lSum = 0;
      for (int i = 0; i < n; i++) lSum += a.Values[i] * b.Values[i];
      return lSum;
   } //
 
   public static string operator +(string a, myClass b) {
      //return ">> " + a + b + "<<";  // WRONG! causes recursion
      return ">> " + a + b.ToString() + "<<";
   } //
 
   // I will explanation this in my post on Tuesday 4 February 2014
   // uncomment this and play with it (=>hardcore C#)
   //public static string operator +(myClass a, string b) {
   //   //return ">> " + a + b + "<<";  // WRONG! causes recursion
   //   return ">> " + a + b.ToString() + "<<";
   //} //
   // becomes even more hardcore when you have two conflicting overloads
 
   public override string ToString() {
      return "Values: " + Values[0] + " " + Values[1] + " " + Values[2] + " ";
   } //
 
} // class
 
public static void test() {
   myClass a = new myClass();
   myClass b = new myClass();
   myClass c = a + b;
   double d = a * b;
   Console.WriteLine("(Sum) " + c);  // ">> (Sum) Values: 2 4 6 <<"
   Console.WriteLine("(Sum) " + c.ToString());  // "(Sum) Values: 2 4 6"
   Console.WriteLine(c + " (Sum)");  // "Values: 2 4 6  (Sum)"
   Console.WriteLine(d);  // 14
   Console.ReadLine();
} //
source code line operator called output
myClass c = a + b; public static myClass operator +(myClass a, myClass b) {…}
double d = a * b; public static double operator *(myClass a, myClass b) {…}
Console.WriteLine(“(Sum) ” + c); public static string operator +(string a, myClass b) {…} >> (Sum) Values: 2 4 6 <<
Console.WriteLine(“(Sum) ” + c.ToString()); [not part of myClass]
operator overload “+” of the string class
(Sum) Values: 2 4 6
Console.WriteLine(c + ” (Sum)”); c.ToString() is called by the string class operator overload “+” for objects Values: 2 4 6  (Sum)

You are using operator overloading quite often without noticing it. A quick example is:

DateTime x = DateTime.Now;
DateTime y = x.AddMinutes(5);
TimeSpan t = y - x;  // operator "-" is overloaded and returns a TimeSpan.

Let’s make it more difficult now.
Uncomment public static string operator +(myClass a, string b) {…}. The statement Console.WriteLine(c + ” (Sum)”) now prints “>> >> Values: 2 4 6 << (Sum)<<“.
First the operator public static string operator +(myClass a, string b) {…} is called. This operator indirectly calls another operator public static string operator +(string a, myClass b) {…}, because the code return “>> ” + a + b.ToString() + “<<"; was called, which started with a string “>> ” + a class.

Implicit

This is used to avoid explicit cast operations and convert implicitly. But more precisely we talk about User-defined conversions.

public class ClassX {
   private readonly double _Value;
   public ClassX(double xInitValue) { _Value = xInitValue; } // constructor
   public override string ToString() { return "CLASS"; }

   // cast operation: (double)ClassX;
   public static implicit operator double(ClassX xClassX) { return xClassX._Value; }
   // cast operation: (ClassX)double;
   public static implicit operator ClassX(double xDouble) { return new ClassX(xDouble); }
}

public static void test() {
   int i = 10;
   double d1 = i;          // implicit cast operation
   double d2 = (double)i;  // explicit cast operation

   ClassX c = new ClassX(99.0);
   double d3 = c;          // implicit cast operation
   double d4 = (double)c;  // explicit cast operation
   c = 1.2;

   Console.WriteLine("class or double? " + c);  // "CLASS"
   Console.ReadLine();
}

Conflicts

So what happens when you have two classes with opposing declarations? Luckily the compiler is smart enough to detect these issues. The below source code does not compile at all. The statement Console.WriteLine(a + b) causes an error message like: “The call is ambiguous between the following methods or properties: ‘…A.operator +(…A, …B)’ and ‘B.operator +(…A, …B)'”.
The next statement Console.WriteLine(b + a) is not even a real problem. We did not define any operator overloading. Thus the compiler complains “Operator ‘+’ cannot be applied to operands of type ‘…B’ and ‘…A'”

public class A {
   public static string operator +(A a, B b) { return "class A"; }
} // class
public class B {
   public static string operator +(A a, B b) { return "class B"; }
} // class

public static void test() {
	A a = new A();
	B b = new B();
	Console.WriteLine(a + b);
	Console.WriteLine(b + a);
} //

Protocol Buffers (part 3, advanced), Tcp Networking

Ok guys, hardcore. Unfortunately in a different way than I thought 😉
Once again it took a lot of time to prepare the next source code example. It extends the code of my last post.

I removed the ProtoType class, because inheritance is not a real strength of Protobuf-Net. It is possible, but the ease of code maintenance is a definite argument against it. The complexity increases slowly, we cannot afford code that is difficult to change.

In theory we could write a Serialize() method for each class, the result would be extreme performance. You could use the BitConverter class to convert and then merge values without causing too much overhead. And by using Assembler I guess you could even make it 10 times faster again.

I think that all modern protocols lose a lot of time due to the required reflection, which is mainly the mapping of methods and properties. And unlike Assembler the modern languages do not allow to eg. simply write an integer to a memory address and only read the first byte of it.
You can rotate bits or multiply and divide, what is much less efficient.

In C# you can use the Parallel class to boost your performance. Of course the processing time of each code piece has to be substantial, otherwise it takes longer to create tasks than to solve the problems. In slow networks it might even be worth considering packing data before it is sent.

I replaced the ProtoType class by the Header class, which tells the type of the succeeding data block and also adds a serial identifier. This identifier can be used to receive vital feedbacks. For instance the server tells the client if the data transmission was successful or not. It can also send calculation results or error messages.

The data stream has the following order: header, data, header, data, header, data … except for feedback headers; they do not need data blocks. The enum eType inside the Header class defines all possible data types: ErrorMessage, Feedback, Book or Fable.
As said before I did not use inheritance for the Header class. The code remains legible and there is no spaghetti code to achieve indirect multiple inheritance.

In my last post, the client was sending and the server was receiving data. Now the communication is bidirectional. The client uses two threads, one to send and one to receive. The sending method is still using the BlockingCollection construction and an endless loop on a separate thread.
The server can be connected to several clients simultaneously. To keep it simple I decided sending data without any context switching. This usually blocks threads during the IO operation. I have added tasks inside the event OnMessageBook to give an example on how to avoid this. Nevertheless the send method uses a lock on the client preventing simultaneous write actions on the socket. This is bad practice; you should not apply locks on objects that might be locked in other areas as well. This is out of your scope, you don’t know if the .Net framework or any other code uses a lock on the same object, which could cause undesired behavior. In the below example it would have been better to wrap the client object into another class and apply the lock on that outer shell object. But I guess this is ok in our shorter example. The code was long enough already.

public static void Send(TcpClient xClient, ProtoBufExample.Header xHeader) {
  if (xHeader == null) return;
  if (xClient == null) return;

  lock (xClient) {
     NetworkStream lNetworkStream = xClient.GetStream();
  ....

The above lock problem could be avoided with this:

public class ClientData {
   public TcpClient Client { get; private set; }
   private DateTime _ConnectedSince;
   private string _UserName;
   ....

public static void Send(ClientData xClient, ProtoBufExample.Header xHeader) {
  if (xHeader == null) return;
  if (xClient == null) return;

  lock (xClient) {
     NetworkStream lNetworkStream = xClient.Client.GetStream();
  ....

And here is the entire source code:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ProtoBuf;

namespace DemoApp {

   public class ProtoBufExample {

      public enum eType : byte { eError = 0, eFeedback, eBook, eFable };

      [ProtoContract]
      public class Header {
         [ProtoMember(1)] public eType objectType;
         [ProtoMember(2)] public readonly int serialMessageId;

         public object data;
         private static int _HeaderSerialId = 0;

         public Header(object xData, eType xObjectType, int xSerialMessageId = 0) {
            data = xData;
            serialMessageId = (xSerialMessageId == 0) ? Interlocked.Increment(ref _HeaderSerialId) : xSerialMessageId;
            objectType = xObjectType; // we could use "if typeof(T) ...", but it would be slower, harder to maintain and less legible
         } // constructor

         // parameterless constructor needed for Protobuf-net
         public Header() {
         } // constructor

      } // class

      [ProtoContract]
      public class ErrorMessage {
         [ProtoMember(1)]
         public string Text;
      } // class

      [ProtoContract]
      public class Book {
         [ProtoMember(1)] public string author;
         [ProtoMember(2, DataFormat = DataFormat.Group)] public List<Fable> stories;
         [ProtoMember(3)] public DateTime edition;
         [ProtoMember(4)] public int pages;
         [ProtoMember(5)] public double price;
         [ProtoMember(6)] public bool isEbook;

         public override string ToString() {
            StringBuilder s = new StringBuilder();
            s.Append("by "); s.Append(author);
            s.Append(", edition "); s.Append(edition.ToString("dd MMM yyyy"));
            s.Append(", pages "); s.Append(pages);
            s.Append(", price "); s.Append(price);
            s.Append(", isEbook "); s.Append(isEbook);
            s.AppendLine();
            if (stories != null) foreach (Fable lFable in stories) {
                  s.Append("title "); s.Append(lFable.title);
                  s.Append(", rating "); s.Append(lFable.customerRatings.Average()); // Average() is an extension method of "using System.Linq;"
                  s.AppendLine();
               }

            return s.ToString();
         } //
      } // class

      [ProtoContract]
      public class Fable {
         [ProtoMember(1)] public string title;
         [ProtoMember(2, DataFormat = DataFormat.Group)]
         public double[] customerRatings;

         public override string ToString() {
            return "title " + title + ", rating " + customerRatings.Average();
         } //
      } // class

      public static Book GetData() {
         return new Book {
            author = "Aesop",
            price = 1.99,
            isEbook = false,
            edition = new DateTime(1975, 03, 13),
            pages = 203,
            stories = new List<Fable>(new Fable[] {
                new Fable{ title = "The Fox & the Grapes", customerRatings = new double[]{ 0.7, 0.7, 0.8} },
                new Fable{ title = "The Goose that Laid the Golden Eggs", customerRatings = new double[]{ 0.6, 0.75, 0.5, 1.0} },
                new Fable{ title = "The Cat & the Mice", customerRatings = new double[]{ 0.1, 0.0, 0.3} },
                new Fable{ title = "The Mischievous Dog", customerRatings = new double[]{ 0.45, 0.5, 0.4, 0.0, 0.5} }
            })
         };
      } //
   } // class

   public class PendingFeedbacks {
      private readonly ConcurrentDictionary<int, ProtoBufExample.Header> _Messages = new ConcurrentDictionary<int, ProtoBufExample.Header>();

      public int Count { get { return _Messages.Count; } }

      public void Add(ProtoBufExample.Header xHeader) {
         if (xHeader == null) throw new Exception("cannot add a null header");

         if (!_Messages.TryAdd(xHeader.serialMessageId, xHeader)) {
            throw new Exception("there must be a programming error somewhere");
         }
      } //

      public void Remove(ProtoBufExample.Header xHeader) {
         ProtoBufExample.Header lHeader;
         if (!_Messages.TryRemove(xHeader.serialMessageId, out lHeader)) {
            throw new Exception("there must be a programming error somewhere");
         }

         switch (xHeader.objectType) {
            case ProtoBufExample.eType.eError:
               Console.WriteLine("error: " + ((ProtoBufExample.ErrorMessage)xHeader.data).Text);
               Console.WriteLine("the message that was sent out was: " + lHeader.objectType + " with serial id " + lHeader.serialMessageId);
               Console.WriteLine("please check the log files" + Environment.NewLine);
               break;
            case ProtoBufExample.eType.eFeedback:
               // all ok !
               break;
            default:
               Console.WriteLine("warning: This message type was not expected.");
               break;
         }
      } //
   } // class

   public static class NetworkTest {
      public static void Test() {
         NetworkListener lServer = new NetworkListener("127.0.0.1", 65432, "Server");
         NetworkClient lClient = new NetworkClient("127.0.0.1", 65432, "Client");

         lServer.Connect();
         lServer.OnMessageBook += new NetworkListener.dOnMessageBook(OnMessageBook);
         lServer.OnMessageFable += new NetworkListener.dOnMessageFable(OnMessageFable);

         lClient.Connect();

         ProtoBufExample.Header lHeader;

         // send a book across the network
         ProtoBufExample.Book lBook = ProtoBufExample.GetData();
         lHeader = new ProtoBufExample.Header(lBook, ProtoBufExample.eType.eBook);
         lClient.Send(lHeader);

         System.Threading.Thread.Sleep(1000);  // remove this to see the asynchonous processing (the output will look terrible)

         // send a fable across the network
         lHeader = new ProtoBufExample.Header(lBook.stories[1], ProtoBufExample.eType.eFable);
         lClient.Send(lHeader);

         System.Threading.Thread.Sleep(1000);

         lClient.Disconnect();
         lServer.Disconnect();

         Console.ReadLine();
      }

      // demo: synchronous processing
      static void OnMessageFable(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Fable xFable) {
         Console.WriteLine(Environment.NewLine + "received a fable: ");
         Console.WriteLine(xFable.ToString());

         // demo: we tell the server that something went wrong
         ProtoBufExample.ErrorMessage lErrorMessage = new ProtoBufExample.ErrorMessage() { Text = "The fable was rejected. It is far too short." };
         ProtoBufExample.Header lErrorHeader = new ProtoBufExample.Header(lErrorMessage, ProtoBufExample.eType.eError, xHeader.serialMessageId);
         NetworkListener.Send(xSender, lErrorHeader);
      } //

      // demo: asynchronous processing
      static void OnMessageBook(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Book xBook) {
         Task.Factory.StartNew(() => {
            Console.WriteLine(Environment.NewLine + "received a book: ");
            Console.WriteLine(xBook.ToString());

            // send a feedback without any body to signal all was ok
            ProtoBufExample.Header lFeedback = new ProtoBufExample.Header(null, ProtoBufExample.eType.eFeedback, xHeader.serialMessageId);
            NetworkListener.Send(xSender, lFeedback);
            return;
         });

         Console.WriteLine("Book event was raised");
      } //

   } // class

   public class NetworkListener {

      private bool _ExitLoop = true;
      private TcpListener _Listener;

      public delegate void dOnMessageBook(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Book xBook);
      public event dOnMessageBook OnMessageBook;

      public delegate void dOnMessageFable(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Fable xFable);
      public event dOnMessageFable OnMessageFable;

      private List<TcpClient> _Clients = new List<TcpClient>();

      public int Port { get; private set; }
      public string IpAddress { get; private set; }
      public string ThreadName { get; private set; }

      public NetworkListener(string xIpAddress, int xPort, string xThreadName) {
         Port = xPort;
         IpAddress = xIpAddress;
         ThreadName = xThreadName;
      } //

      public bool Connect() {
         if (!_ExitLoop) {
            Console.WriteLine("Listener running already");
            return false;
         }
         _ExitLoop = false;

         try {
            _Listener = new TcpListener(IPAddress.Parse(IpAddress), Port);
            _Listener.Start();

            Thread lThread = new Thread(new ThreadStart(LoopWaitingForClientsToConnect));
            lThread.IsBackground = true;
            lThread.Name = ThreadName + "WaitingForClients";
            lThread.Start();

            return true;
         }
         catch (Exception ex) { Console.WriteLine(ex.Message); }
         return false;
      } //

      public void Disconnect() {
         _ExitLoop = true;
         lock (_Clients) {
            foreach (TcpClient lClient in _Clients) lClient.Close();
            _Clients.Clear();
         }
      } //        

      private void LoopWaitingForClientsToConnect() {
         try {
            while (!_ExitLoop) {
               Console.WriteLine("waiting for a client");
               TcpClient lClient = _Listener.AcceptTcpClient();
               string lClientIpAddress = lClient.Client.LocalEndPoint.ToString();
               Console.WriteLine("new client connecting: " + lClientIpAddress);
               if (_ExitLoop) break;
               lock (_Clients) _Clients.Add(lClient);

               Thread lThread = new Thread(new ParameterizedThreadStart(LoopRead));
               lThread.IsBackground = true;
               lThread.Name = ThreadName + "CommunicatingWithClient";
               lThread.Start(lClient);
            }
         }
         catch (Exception ex) { Console.WriteLine(ex.Message); }
         finally {
            _ExitLoop = true;
            if (_Listener != null) _Listener.Stop();
         }
      } // 

      private void LoopRead(object xClient) {
         TcpClient lClient = xClient as TcpClient;
         NetworkStream lNetworkStream = lClient.GetStream();

         while (!_ExitLoop) {
            try {
               ProtoBufExample.Header lHeader = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Header>(lNetworkStream, ProtoBuf.PrefixStyle.Fixed32);
               if (lHeader == null) break; // happens during shutdown process
               switch (lHeader.objectType) {

                  case ProtoBufExample.eType.eBook:
                     ProtoBufExample.Book lBook = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Book>(lNetworkStream, ProtoBuf.PrefixStyle.Fixed32);
                     if (lBook == null) break;
                     lHeader.data = lBook; // not necessary, but nicer                            

                     dOnMessageBook lEventBook = OnMessageBook;
                     if (lEventBook == null) continue;
                     lEventBook(lClient, lHeader, lBook);
                     break;

                  case ProtoBufExample.eType.eFable:
                     ProtoBufExample.Fable lFable = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Fable>(lNetworkStream, ProtoBuf.PrefixStyle.Fixed32);
                     if (lFable == null) break;
                     lHeader.data = lFable; // not necessary, but nicer                            

                     dOnMessageFable lEventFable = OnMessageFable;
                     if (lEventFable == null) continue;
                     lEventFable(lClient, lHeader, lFable);
                     break;

                  default:
                     Console.WriteLine("Mayday, mayday, we are in big trouble.");
                     break;
               }
            }
            catch (System.IO.IOException) {
               if (_ExitLoop) Console.WriteLine("user requested client shutdown");
               else Console.WriteLine("disconnected");
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         }
         Console.WriteLine("server: listener is shutting down");
      } //

      public static void Send(TcpClient xClient, ProtoBufExample.Header xHeader) {
         if (xHeader == null) return;
         if (xClient == null) return;

         lock (xClient) {
            try {
               NetworkStream lNetworkStream = xClient.GetStream();

               // send header (most likely a simple feedback)
               ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Header>(lNetworkStream, xHeader, ProtoBuf.PrefixStyle.Fixed32);

               // send errors
               if (xHeader.objectType != ProtoBufExample.eType.eError) return;
               ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.ErrorMessage>(lNetworkStream, (ProtoBufExample.ErrorMessage)xHeader.data, ProtoBuf.PrefixStyle.Fixed32);
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         }

      } //

   } // class

   public class NetworkClient {
      public int Port { get; private set; }
      public string IpAddress { get; private set; }
      public string ThreadName { get; private set; }
      private NetworkStream _NetworkStream = null;
      private TcpClient _Client = null;
      private bool _ExitLoop = true;
      private BlockingCollection<ProtoBufExample.Header> _Queue = new BlockingCollection<ProtoBufExample.Header>();
      private readonly PendingFeedbacks _PendingFeedbacks = new PendingFeedbacks();

      public NetworkClient(string xIpAddress, int xPort, string xThreadName) {
         Port = xPort;
         IpAddress = xIpAddress;
         ThreadName = xThreadName;
      } //

      public void Connect() {
         if (!_ExitLoop) return; // running already
         _ExitLoop = false;

         _Client = new TcpClient();
         _Client.Connect(IpAddress, Port);
         _NetworkStream = _Client.GetStream();

         Thread lLoopWrite = new Thread(new ThreadStart(LoopWrite));
         lLoopWrite.IsBackground = true;
         lLoopWrite.Name = ThreadName + "Write";
         lLoopWrite.Start();

         Thread lLoopRead = new Thread(new ThreadStart(LoopRead));
         lLoopRead.IsBackground = true;
         lLoopRead.Name = ThreadName + "Read";
         lLoopRead.Start();
      } //

      public void Disconnect() {
         _ExitLoop = true;
         _Queue.Add(null);
         if (_Client != null) _Client.Close();
         //if (_NetworkStream != null) _NetworkStream.Close();
      } //

      public void Send(ProtoBufExample.Header xHeader) {
         if (xHeader == null) return;
         _PendingFeedbacks.Add(xHeader);
         _Queue.Add(xHeader);
      } //


      private void LoopWrite() {
         while (!_ExitLoop) {
            try {
               ProtoBufExample.Header lHeader = _Queue.Take();
               if (lHeader == null) break;

               // send header
               ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Header>(_NetworkStream, lHeader, ProtoBuf.PrefixStyle.Fixed32);

               // send data
               switch (lHeader.objectType) {
                  case ProtoBufExample.eType.eBook:
                     ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Book>(_NetworkStream, (ProtoBufExample.Book)lHeader.data, ProtoBuf.PrefixStyle.Fixed32);
                     break;
                  case ProtoBufExample.eType.eFable:
                     ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Fable>(_NetworkStream, (ProtoBufExample.Fable)lHeader.data, ProtoBuf.PrefixStyle.Fixed32);
                     break;
                  default:
                     break;
               }
            }
            catch (System.IO.IOException) {
               if (_ExitLoop) Console.WriteLine("user requested client shutdown.");
               else Console.WriteLine("disconnected");
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         }
         _ExitLoop = true;
         Console.WriteLine("client: writer is shutting down");
      } //


      private void LoopRead() {
         while (!_ExitLoop) {
            try {
               ProtoBufExample.Header lHeader = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Header>(_NetworkStream, ProtoBuf.PrefixStyle.Fixed32);
               if (lHeader == null) break;
               if (lHeader.objectType == ProtoBufExample.eType.eError) {
                  ProtoBufExample.ErrorMessage lErrorMessage = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.ErrorMessage>(_NetworkStream, ProtoBuf.PrefixStyle.Fixed32);
                  lHeader.data = lErrorMessage;
               }
               _PendingFeedbacks.Remove(lHeader);
            }
            catch (System.IO.IOException) {
               if (_ExitLoop) Console.WriteLine("user requested client shutdown");
               else Console.WriteLine("disconnected");
            }
            catch (Exception ex) { Console.WriteLine(ex.Message); }
         }
         Console.WriteLine("client: reader is shutting down");
      } //

   } // class

} // namespace

example output:
waiting for a client
new client connecting: 127.0.0.1:65432
waiting for a client
Book event was raised

received a book:
by Aesop, edition 13 Mar 1975, pages 203, price 1.99, isEbook False
title The Fox & the Grapes, rating 0.733333333333333
title The Goose that Laid the Golden Eggs, rating 0.7125
title The Cat & the Mice, rating 0.133333333333333
title The Mischievous Dog, rating 0.37

received a fable:
title The Goose that Laid the Golden Eggs, rating 0.7125
error: The fable was rejected. It is far too short.
the message that was sent out was: eFable with serial id 2
please check the log files

client: writer is shutting down
server: listener is shutting down
user requested client shutdown
client: reader is shutting down

Async and await (advanced, .Net 4.5, C# 5)

The importance is in the details. It all looks easy, but follow each step carefully today.

Windows pauses threads that are waiting for I/O operations to complete (eg. internet or file access). The same threads cannot be used for other jobs in the meantime and new threads need to be created. You could use tasks to solve this specific problem. The program would start an asynchronous task to deal with an I/O operation. After a while the same task would trigger a follow-up procedure via continuation task. It requires some work to cover all code paths, but it can be done.

C# 5 has implemented new keywords to make your life easier. You can use async to mark methods for asynchronous operations, which start synchronously and then split up as soon as the program arrives at any await keyword.

The below Print() method prints the time, sequence and ThreadId. This information is useful to understand the program cycle.

private static void Print(int xStep) {
    Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + " step " + xStep + " , thread " + Thread.CurrentThread.ManagedThreadId);
} //

static async void AsyncCalls1() {
    Print(1);
    int i = await Task.Run<int>(() => {
        Print(2); Thread.Sleep(5000);
        Print(3); return 0;
    });
    Print(4);  // same thread as in step 3

    Console.ReadLine();
    // return void
} //

example output:
19:09:36 step 1 , thread 9
19:09:36 step 2 , thread 10
19:09:41 step 3 , thread 10
19:09:41 step 4 , thread 10

The above code is a warm up for us. The method AsyncCalls1() returns void. I emphasize this seemingly insignificant fact here. If you do not return void then the compiler will complain. It wants you to add async in the calling method as well. But if you do so, then it would also ask you to add async in the calling method, that called the calling method. It would be an endless game until you arrive at Main(). And there you would not know what to do, because you cannot use async in Main(). Novices can get quite frustrated with such minuscule glitch.

What is the program doing? It starts new task, which uses another thread from the thread pool. The original thread is then neglected, there is no follow-up. Now check this out: When the created task ends, the program continues with (Task.ContinueWith()) the same thread, which it was using in the task. It seems there is no context switching.

static async void AsyncCalls2() {
    Print(1); Task<int> task = AsyncCalls3();
    Print(4); int x = await task;
    Print(7); // same thread as in step 6

    Console.ReadLine();
    // return void
} //

static async Task<int> AsyncCalls3() {
    Print(2);
    int i = await Task.Run<int>(() => {
        Print(3); Thread.Sleep(5000);
        Print(5); return 0;
    });
    Print(6); return i;  // same thread as in step 5, returning an INTEGER !!!
} //

example output:
19:10:16 step 1 , thread 9
19:10:16 step 2 , thread 9
19:10:16 step 3 , thread 10
19:10:16 step 4 , thread 9
19:10:21 step 5 , thread 10
19:10:21 step 6 , thread 10
19:10:21 step 7 , thread 10

Method AsyncCalls3() has a return value, which is a Task. The task that is started inside this method returns an integer. But doesn’t Task.Run() have to return Task<int> according to its definition? It is the await that changes the behavior. It returns the integer value (0). await has been implemented to shorten code, and this is what it does. The code is more legible.

Method AsyncCalls2() calls AsyncCalls3()
and receives an integer and not a Task<int>. This is caused by the async keyword.
AsyncCalls2() itself returns void. This is the same issue as with AsyncCalls1(). However AsyncCalls3() can return a value to AsyncCalls2(), because AsyncCalls2() itself uses the async keyword in the method definition.

Check the program sequence. I marked the steps clearly to make comprehension easy. And then analyse the thread context switching. Between step 2 and 3 is a context switch operation, but not between 5, 6 and 7. This is the same behavior as in the first example code.

public static async void AsyncCalls4() {
    Print(1); string s = await AsyncCalls5();
    Print(4);

    Console.ReadLine();
    // return void
} //

// using System.Net.Http;
public static async Task<string> AsyncCalls5() {
    using (HttpClient lClient = new HttpClient()) {
        Print(2); string lResult = await lClient.GetStringAsync("http://www.microsoft.com");
        Print(3); return lResult; 
    }
} //

example output:
19:11:47 step 1 , thread 10
19:11:47 step 2 , thread 10
19:11:48 step 3 , thread 14
19:11:48 step 4 , thread 14

When searching for async and await on the web you will find the emphasis on I/O. Most example programs concentrate on this and don’t explain what is roughly going on inside the async-I/O method itself. Basically .Net async-I/O methods deal with tasks and use a similar construction to Task.ContinueWith(). This is why I concentrated on different examples that can be used in any context (even though not very meaningful examples). The internet download example is more or less a classical one. You can use await on many I/O methods. Keep in mind that AsyncCalls4() returns void and that you are not supposed to call AsyncCalls5() from the Main() method, because you would have to add async to it.

Exceptions (part 3, advanced)

For Windows Forms and WPF we have specific exceptions to deal with. Actually these are not really exceptions. They are events, which are raised whenever any unhandled exception occurs. “Unhandled” means there is nothing that catches exceptions and the entire stack did not come up with any solution. Nothing stopped the thread falling down to its lowest level, and even that level does not know what to do with the exception. You can eg. throw an exception on a button click to cause this behavior.
These events allow applications to log information about unhandled exceptions.

In Windows Forms the EventHandler is defined in System.Threading and is called ThreadExceptionEventHandler. If you do not define this event call then your application will crash when unhandled exceptions show up.

[STAThread]
static void Main() {
    Application.EnableVisualStyles();
    Application.SetCompatibleTextRenderingDefault(false);
    Application.ThreadException += Application_ThreadException;
    Application.Run(new Form1());
} //

static void Application_ThreadException(object sender, System.Threading.ThreadExceptionEventArgs e) {
    Exception lException = (Exception)e.Exception;
    MessageBox.Show(lException.Message);
} //

The approach in WPF is similar. We are talking in Domains. Therefore we subscribe to AppDomain.CurrentDomain.UnhandledException .

public MainWindow() {
    InitializeComponent();
    AppDomain.CurrentDomain.UnhandledException += CurrentDomain_UnhandledException;            
} //

void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) {
    Exception lException = (Exception)e.ExceptionObject;
    MessageBox.Show(lException.Message + "\nIsTerminating: " + e.IsTerminating);
} //

private void button1_Click(object sender, RoutedEventArgs e) {
    throw new Exception("user exception thrown");
} //

Events (part 3, advanced)

Events: Let’s go multithreading! We want the crème de la crème. Well, multitasking is also fine.

The code does not need to run in a specific sequence. The required independence is given. Again, there are many ways to use multithreading. An easy approach is to start a task in each method that is called by the event.

C# does support BeginInvoke() for delegates. This method is not supported by the .NET Compact Framework though. We don’t care, because our hardcore programs are for serious applications, definitely not for mobile phone apps. Let’s see how good BeginInvoke() works. Maybe we don’t have to reinvent the wheel.

BeginInvoke() initiates asynchronous calls, it returns immediately and provides the IAsyncResult, which can be used to monitor the progress of the asynchronous call.
EndInvoke() retrieves the results. It blocks until the thread has completed.

You have the following options after calling BeginInvoke():
1) Call EndInvoke() to block the current thread until the call completes.
2) Obtain the WaitHandle from IAsyncResult.AsyncWaitHandle, use its WaitOne() and then call EndInvoke().
3) Poll the IAsyncResult to check the current state, after it has completed call EndInvoke().
4) Pass a callback to BeginInvoke(). The callback will use the ThreadPool to notify you. In the callback you have to call EndInvoke().

public event EventHandler OnChange;

public void DoSomeWork(object sender, object e) {
    Thread.Sleep(2100);
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " mission accomplished!");
} //

public void RunMyExample() {
    OnChange += new EventHandler(DoSomeWork);
    //OnChange += new EventHandler(DoSomeWork);
    //OnChange += new EventHandler(DoSomeWork);

    IAsyncResult lAsyncResult;

    Console.WriteLine("Choice 1");
    lAsyncResult = OnChange.BeginInvoke(this, EventArgs.Empty, null, null);
    OnChange.EndInvoke(lAsyncResult);

    Console.WriteLine("Choice 2");
    lAsyncResult = OnChange.BeginInvoke(this, EventArgs.Empty, null, null);
    lAsyncResult.AsyncWaitHandle.WaitOne();

    Console.WriteLine("Choice 3");
    lAsyncResult = OnChange.BeginInvoke(this, EventArgs.Empty, null, null);
    while (!lAsyncResult.IsCompleted) {
        Thread.Sleep(500);
        Console.WriteLine("work still not completed :(");
    }

    Console.WriteLine("Choice 4"); 
    OnChange.BeginInvoke(this, EventArgs.Empty, (xAsyncResult) => {
            Console.WriteLine("callback running");
            OnChange.EndInvoke(xAsyncResult);
        }, null);

    Console.WriteLine("press return to exit the program");
    Console.ReadLine();
}//    

example output:
Choice 1
Thread 6 mission accomplished!
Choice 2
Thread 6 mission accomplished!
work still not completed 😦
work still not completed 😦
work still not completed 😦
work still not completed 😦
Thread 10 mission accomplished!
work still not completed 😦
press return to exit the program
Thread 6 mission accomplished!
callback running

After having lived such a nice life, we have to face a major issue with BeginInvoke(). Uncomment “//OnChange += new EventHandler(DoSomeWork);”, don’t get annoyed now!
You will get an error message saying

“The delegate must have only one target”.

Although the delegate class can deal with multiple targets, asynchronous calls accept just one target.

So let’s try something else.

    public event EventHandler OnChange;

    public void DoSomeWork(object sender, object e) {
        Thread.Sleep(2100);
        Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " mission accomplished!");
    } //

    public void RunMyExample() {
        OnChange += new EventHandler(DoSomeWork);
        OnChange += new EventHandler((sender, e) => { throw new Exception("something went wrong"); });
        OnChange += new EventHandler(DoSomeWork);

        EventHandler lOnChange = OnChange;
        if (lOnChange == null) return;   // just to demonstrate the proper way to call events, not needed in this example                
        foreach (EventHandler d in lOnChange.GetInvocationList()) {
            Task lTask = Task.Factory.StartNew(() => d(this, EventArgs.Empty));
            lTask.ContinueWith((i) => { Console.WriteLine("Task canceled"); }, TaskContinuationOptions.OnlyOnCanceled);
            lTask.ContinueWith((i) => { Console.WriteLine("Task faulted"); }, TaskContinuationOptions.OnlyOnFaulted);
            lTask.ContinueWith((i) => { Console.WriteLine("Task completion"); }, TaskContinuationOptions.OnlyOnRanToCompletion);
        }

        Console.WriteLine("press return to exit the program");
        Console.ReadLine();
    }//    

It seems that Microsoft has a serious bug here.This code does not execute properly each time. I guess it has to do with the asynchronous behaviour of StartNew(). It sometimes calls the wrong method DoSomeWork() three times and does not raise the exception.
It seems foreach overrides variable “d” before it is inserted in StartNew(). With a little tweak we can avoid this bug. We simply assign “d” to a new local variable. That way we have unique copies of “d”. Weird stuff, but that is the life of a coder sometimes.

public event EventHandler OnChange;

public void DoSomeWork(object sender, object e) {
    Thread.Sleep(2100);
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " mission accomplished!");
} //

public void RunMyExample() {
    OnChange += new EventHandler(DoSomeWork);
    OnChange += new EventHandler((sender, e) => { throw new Exception("something went wrong"); });
    OnChange += new EventHandler(DoSomeWork);

    EventHandler lOnChange = OnChange;
    if (lOnChange == null) return;   // just to demonstrate the proper way to call events, not needed in this example                
    foreach (EventHandler d in lOnChange.GetInvocationList()) {
        EventHandler e = d;
        Task lTask = Task.Factory.StartNew(() => e(this, EventArgs.Empty));
        lTask.ContinueWith((i) => { Console.WriteLine("Task canceled"); }, TaskContinuationOptions.OnlyOnCanceled);
        lTask.ContinueWith((i) => { Console.WriteLine("Task faulted"); }, TaskContinuationOptions.OnlyOnFaulted);
        lTask.ContinueWith((i) => { Console.WriteLine("Task completion"); }, TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    Console.WriteLine("press return to exit the program");
    Console.ReadLine();
}//    

Example output:
press return to exit the program
Thread 11 mission accomplished!
Thread 10 mission accomplished!
Task completion
Task faulted
Task completion

This tiny tweak worked. You just have to know the compiler bugs. I hope this little notice saves you at least 3 hours of work.
You probably remember that we faced a similar issue in my post “Exiting Tasks (advanced)” https://csharphardcoreprogramming.wordpress.com/2013/12/11/exiting-tasks/ . I would suggest to only use Task.Factory.StartNew() with caution. Maybe it only happens in conjunction with lambda expressions.
The following code is also running very well. The variable “d” is not used directly in Task.Factory.StartNew().

...
 foreach (EventHandler d in lOnChange.GetInvocationList()) {
            Action a = () => d(this, EventArgs.Empty);
            Task lTask = Task.Factory.StartNew(a);
            lTask.ContinueWith((i) => { Console.WriteLine("Task canceled"); }, TaskContinuationOptions.OnlyOnCanceled);
            lTask.ContinueWith((i) => { Console.WriteLine("Task faulted"); }, TaskContinuationOptions.OnlyOnFaulted);
            lTask.ContinueWith((i) => { Console.WriteLine("Task completion"); }, TaskContinuationOptions.OnlyOnRanToCompletion);
        }
...

Events (part 2, advanced)

We are going to construct our custom event accessor now. It deals with additions and removals of subscriptions. Accessors do pretty much look like property definitions. But instead of set and get you have to use add and remove.

public class MyActionEvent4 {
    private object _Lock = new object();            // a new object simply to avoid lock conflicts
    private event EventHandler<MyArgs> _OnChange;
    private event EventHandler<MyArgs> OnChange {
        add {
            lock (_Lock) { _OnChange += value; }
        }
        remove {
            lock (_Lock) { _OnChange -= value; }
        }
    } //

    public void RaiseEvent() {
        lock (_Lock) {
            EventHandler<MyArgs> lHandler = _OnChange;
            if (lHandler == null) return;
            lHandler(this, new MyArgs(0));   
        }        
    }//
} // class

Now we have one big problem here. The RaiseEvent() method has to obtain a lock each time, which causes a serious impact on time sensitive programs. Luckily you do not need to care about changing subscriptions during the invocation. Delegate references are thread-safe, because they are immutable like strings. Let’s simply take the lock out of the RaiseEvent() method, et voilà!

public class MyActionEvent5 {
    private object _Lock = new object();
    private event EventHandler<MyArgs> _OnChange;
    private event EventHandler<MyArgs> OnChange {
        add {
            lock (_Lock) { _OnChange += value; }
        }
        remove {
            lock (_Lock) { _OnChange -= value; }
        }
    } //

    public void RaiseEvent() {
        EventHandler<MyArgs> lHandler = _OnChange;
        if (lHandler == null) return;
        lHandler(this, new MyArgs(0));   
    }//
} // class

It is clear that events are not delegates. They restrict access rights from outside of the event class. To describe events you could most likely say that they are wrappers around delegates.

Whenever an exception is thrown during an event call then all following calls will not be executed. And it is tricky to determine which calls were not executed, because the order of event calls is not guaranteed to be in sequence. In fact it does execute in sequence, but there is no guarantee.

static void EventExceptions1() {
    MyActionEvent3 lEvent = new MyActionEvent3();
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 1");
    lEvent.OnChange += (sender, e) => { throw new Exception("OMG!"); };
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 3");
    lEvent.RaiseEvent();
} //

So you have to deal with exceptions manually if you want to satisfy/execute as many event subscriptions as possible. You could add a try/catch block for each subscription. Or you could invoke the InvocationList yourself by calling the GetInvocationList() method [System.Delegate] and execute each item manually in a try/catch block. Let’s have a look at the following practical solution:

public class MyActionEvent6 {
    public event EventHandler OnChange = delegate { };

    public void RaiseEvent() {
        List<Exception> lExceptions = new List<Exception>();

        foreach (Delegate lHandler in OnChange.GetInvocationList()) {
            try { lHandler.DynamicInvoke(this, EventArgs.Empty); }
            catch (Exception ex) { lExceptions.Add(ex); }
        }

        if (lExceptions.Count > 0) throw new AggregateException(lExceptions);
    }//
} // class

static void EventExceptions6() {
    MyActionEvent6 lEvent = new MyActionEvent6();
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 1");
    lEvent.OnChange += (sender, e) => { throw new Exception("OMG!"); };
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 3");

    try { lEvent.RaiseEvent(); }
    catch (AggregateException ex) {
        foreach (Exception lException in ex.InnerExceptions) {
            Console.WriteLine(lException.InnerException.Message);
        }
    }
} //