Blog Archives
Timer Finetuning, Each Microsecond Counts
This is a one day excursus from my present chart posts, which will continue with my next post.
Exact timing is not an issue for most of us. Who cares if something gets executed a few millisecond earlier or later. Well, this blog was started to highlight many issues of “hard-core” programming. Thus I do care!
Back in 1985, when I was hacking on my Commodore 64, there was no such as Context Switching. To wait for a few moments you could run a loop for a number of times. And when you were running the same loop on the same C64 model again, then you had pretty much the same passed amount of time. The emphasis in on the word “same”. But these days don’t work like that. You never face the “same” situation again. Since the invention of multithreading (in fact it was called multitasking on the good old Commodore Amiga) context switching came into place.
(There are Multimedia Timers in the windows core environment. I am not going to use these. High precision inside the standard .Net framework is possible. There is no need to leave robustness behind.)
I was running several small tests to get a feeling for the .Net timing. The first and most simple approach was using PriorityBoostEnabled on the process to boost the process priority. Unfortunately its use is limited. Your priority only gets boosted, when your main window has the focus. And when your process is on a high priority anyway, it obviously won’t change a lot.
Therefore the next step was to increase the process priority to ‘ProcessPriorityClass.RealTime’. This is a dangerous setting and should only be used by skilled programmers. Your application could consume too much privileged time at the expense of the entire system, which would result in a slowed down execution time for ALL processes and services – including your process that you were originally trying to speed-up.
One important aspect is the PrivilegedProcessorTime. We’ll assume 80 milliseconds, just to have a realistic number. This would imply that the windows system was giving your process 80 ms to execute code before it switched to another process. You have to share this order of magnitude amongst all your threads. 10 ms of these are consumed by your GUI update. Make sure to execute any time sensitive code in one piece before the next context switching takes place.
As we talk about timers, it does not help you at all to be precise on one side, but then lose processor execution time to another thread or process.
Let me come up with a short story. You take the train at exactly 1 pm after waiting for 8 hours. You only have 10 minutes left to get from Austria to Belgium. That sucks, right? You obviously have to start your journey without waiting 8 hours beforehand.
The hard-core scenario would be to get changed, pack your suitcase and sleep until 12:45pm … yes, in your business suit! Then suddenly wake up, jump out of your bed, don’t kiss your wife, run to the station and use the full 8 hours for your journey. You only make it, when you arrive in Brussels before the ticket collector kicks you out, because your ticket was only valid for 8 hours.
Let’s delve into practice. In the below example you can see that getting the time-stamp does take about half a tick. When you run a long loop and try to get the time-stamp many times, then it seems to slow down. The reason for this is the context switching. Somewhere in the middle the PrivilegedProcessorTime expired. Therefore running only a few loops can show better results.
The system timer is terribly slow. Its delays and reliability are bad as bad can be. It apparently accepts a double for microseconds rather than the usual long. You would expect a higher resolution then, wouldn’t you?
Especially the first run is unpredictable. The reason for this are the CLR runtime compilation operations. Your code gets compiled shortly before the first execution. You will observe this with any timer. Start your code earlier and skip the first callback. This improves the precision for the first callback you are really waiting for.
The thread timer is more precise, but is firing too early sometimes. Its precision is above one millisecond.
The timer with the most potential isn’t really a timer. It is a thread sleep method in a loop. In my opinion that is the most advanced solution to solve the timer precision problem. The loop is running on a thread. This thread is not shared like on a task scheduler. You own that thread and nobody else will use it. You obviously should not run hundreds of threads. Run 5 of them, and you are still in the green zone. The big advantage is that you can change the priority of this thread to ‘Highest’. This gives you the attention that real geeks need. Furthermore, you won’t have any multiple code executions. There is one event running at a time. If you miss one, because the previous event was running too long, then you can still decide to not execute the next run. A general system delay would then not queue up events that you don’t want to execute anymore … as it is too late anyway. You obviously can add this feature for any timer, but this one is the safest way to easily have only one event running at a time.
Check out my MicroTimer class. It waits a little bit less than required and then calls SpinWait multiple times. Once the process thread gets processor time assigned, you most likely won’t face context switching. You wait like a disciple for the one and only messiah, running around a rigid pole.
The MicroTimer class should give you a whopping precision of 1 microsecond …. unless my usual ‘unless’ sentence states something else 😉
The example output shows the delay at the point when the timer was leaving the inner loop to raise an event. And it shows the delay at the time it recorded the data inside the event. Obviously there are some microseconds in between. Measure that time and adjust your schedule accordingly.
Computer Specs:
Lenovo Yoga 2 11”
Intel i5-4202Y 1.6 GHz processor
128GB SSD hard disk
4GB memory
Windows 8.1, 64 bit
Admittedly, the code is a bit of Spaghetti style today. Tests are really what they are. It was clear that the MicroTimer would win in the end. So my effort for a proper style went into that direction.
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; namespace TimerPrecisionTest { class Program { private const long _NumDataPoints = 10; private static List<long> _DataPoints = new List<long>(); private static int _CurrentDataPoint = 0; private static long[] _Schedule; private static long _PeriodInMs = 3 * Stopwatch.Frequency * 1000; private static long _PeriodInTicks = 3 * Stopwatch.Frequency; private static double _FrequencyAsDouble = (double)Stopwatch.Frequency; private static AutoResetEvent _AutoResetEvent = new AutoResetEvent(false); static void Main(string[] args) { Process p = Process.GetCurrentProcess(); p.PriorityBoostEnabled = true; // every little helps p.PriorityClass = ProcessPriorityClass.Normal; Console.WriteLine("Process with normal priority:"); Console.WriteLine("Priviledged processor time for process " + p.ProcessName + " is " + p.PrivilegedProcessorTime.TotalMilliseconds.ToString("#,##0.0") + " ms"); p.PriorityClass = ProcessPriorityClass.RealTime; Console.WriteLine("Process with high priority:"); Console.WriteLine("Priviledged processor time for process " + p.ProcessName + " is " + p.PrivilegedProcessorTime.TotalMilliseconds.ToString("#,##0.0") + " ms"); Console.WriteLine("IsHighResolution system clock: " + Stopwatch.IsHighResolution); Console.WriteLine("Number of ticks per second: " + Stopwatch.Frequency.ToString("#,##0")); long a = Stopwatch.GetTimestamp(); for (int i = 0; i < 100000; i++) { long b = Stopwatch.GetTimestamp(); } long c = Stopwatch.GetTimestamp(); Console.WriteLine("Number of ticks to obtain a timestamp: " + ((c - a) / 100000.0).ToString("#,##0.00")); Console.WriteLine(); UseSystemTimer(); UseThreadingTimer(); // a simple loop Thread lThread = new Thread(new ThreadStart(UseLoop)); lThread.Priority = ThreadPriority.Highest; lThread.IsBackground = true; lThread.Start(); _AutoResetEvent.WaitOne(); testSpinWaitPrecision(); // a proper loop UseMicroTimerClass(); Console.ReadLine(); } // #region MicroTimer private static void UseMicroTimerClass() { Console.WriteLine("MICRO TIMER CLASS:"); Init(); long lMaxDelay = (3L * Stopwatch.Frequency) / 1000L; // 3 ms MicroTimer lMicroTimer = new MicroTimer(new Queue<long>(_Schedule), lMaxDelay); lMicroTimer.OnMicroTimer += OnMicroTimer; lMicroTimer.OnMicroTimerStop += OnMicroTimerStop; lMicroTimer.OnMicroTimerSkipped += OnMicroTimerSkipped; lMicroTimer.Start(); } // static void OnMicroTimerSkipped(int xSenderThreadId, long xWakeUpTimeInTicks, long xDelayInTicks) { Console.WriteLine("MicroTimer for WakeUpTime " + xWakeUpTimeInTicks + " did not run. Delay was: " + xDelayInTicks); } // static void OnMicroTimerStop(int xSenderThreadId) { Console.WriteLine("MicroTimer stopped."); PrintStats(); } // static void OnMicroTimer(int xSenderThreadId, long xWakeUpTimeInTicks, long xDelayInTicks) { RecordDatapoint(); Console.WriteLine("(Delay at wakeup time was " + xDelayInTicks.ToString("#,##0" + " tick)")); } // #endregion #region SpinWait precision private static void testSpinWaitPrecision() { Console.WriteLine(); Console.WriteLine("SpinWait tests (neglecting PrivilegedProcessorTime):"); Thread.CurrentThread.Priority = ThreadPriority.Highest; Thread.Sleep(0); // switch context at a good point long a = Stopwatch.GetTimestamp(); Thread.SpinWait(100000); long b = Stopwatch.GetTimestamp(); Console.WriteLine("Number of ticks for a SpinWait: " + (b - a).ToString("#,##0")); a = Stopwatch.GetTimestamp(); Thread.Sleep(0); for (int i = 0; i < 100; i++) { Thread.SpinWait(100000); } b = Stopwatch.GetTimestamp(); double lAverage = (b - a) / 100.0; Console.WriteLine("Average ticks for 100x SpinWaits: " + lAverage.ToString("#,##0") + " == " + (lAverage * 1000.0 * 1000.0/ Stopwatch.Frequency).ToString("#,##0.0000") + " microseconds"); // now we do get extremly precise long lEndTime = Stopwatch.GetTimestamp() + Stopwatch.Frequency; // wake up in one second Thread.Sleep(900); // imagine a timer raises an event roughly 100ms too early while (Stopwatch.GetTimestamp() < lEndTime) { Thread.SpinWait(10); // no context switching } a = Stopwatch.GetTimestamp(); Console.WriteLine("SpinWait caused an error of just: " + ((a - lEndTime) * 1000.0 * 1000.0 / _FrequencyAsDouble).ToString("#,##0.0000") + " microseconds"); Thread.CurrentThread.Priority = ThreadPriority.Normal; Console.WriteLine(); } // #endregion #region simple loop private static void UseLoop() { Thread.CurrentThread.Priority = ThreadPriority.Highest; Console.WriteLine("LOOP AND SLEEP:"); Init(); Thread.Sleep((int)getTimeMsToNextCall_Long()); while (_CurrentDataPoint < _NumDataPoints) { RecordDatapoint(); if (_CurrentDataPoint >= _NumDataPoints) break; Thread.Sleep((int)getTimeMsToNextCall_Long()); } PrintStats(); _AutoResetEvent.Set(); Thread.CurrentThread.Priority = ThreadPriority.Normal; } // #endregion #region SystemTimer private static System.Timers.Timer _SystemTimer = null; private static void UseSystemTimer() { Console.WriteLine("SYSTEM TIMER:"); Init(); _SystemTimer = new System.Timers.Timer(); _SystemTimer.AutoReset = false; _SystemTimer.Elapsed += SystemTimer_Elapsed; _SystemTimer.Interval = getTimeMsToNextCall_Double(); // do not init in constructor! _SystemTimer.Start(); _AutoResetEvent.WaitOne(); _SystemTimer.Stop(); _SystemTimer = null; PrintStats(); } // private static void SystemTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { RecordDatapoint(); // calibrate timer (we did not start with the right interval when we launched it) System.Timers.Timer lTimer = _SystemTimer; if (lTimer == null) return; if (_CurrentDataPoint >= _NumDataPoints) return; lTimer.Stop(); lTimer.Interval = getTimeMsToNextCall_Double(); lTimer.Start(); } // #endregion #region ThreadingTimer private static System.Threading.Timer _ThreadingTimer = null; private static void UseThreadingTimer() { Console.WriteLine("THREAD TIMER:"); Init(); TimerCallback lCallback = new TimerCallback(ThreadingTimer_Elapsed); _ThreadingTimer = new System.Threading.Timer(lCallback, null, getTimeMsToNextCall_Long(), (long)(Timeout.Infinite)); _AutoResetEvent.WaitOne(); _ThreadingTimer = null; PrintStats(); } // private static void ThreadingTimer_Elapsed(object xState) { RecordDatapoint(); // restart timer System.Threading.Timer lTimer = _ThreadingTimer; if (lTimer == null) return; if (_CurrentDataPoint >= _NumDataPoints) return; lTimer.Change(getTimeMsToNextCall_Long(), (long)Timeout.Infinite); } // #endregion #region miscellaneous private static void Init() { _DataPoints.Clear(); _CurrentDataPoint = 0; // init exact time schedule long lOffset = Stopwatch.GetTimestamp() + _PeriodInTicks; // we start in the future _Schedule = new long[_NumDataPoints]; for (int i = 0; i < _NumDataPoints; i++) { _Schedule[i] = lOffset; lOffset += _PeriodInTicks; } } // private static void PrintStats() { if (_DataPoints.Count < 1) return; Console.WriteLine("Average " + _DataPoints.Average()); long lMin = _DataPoints.Min(); long lMax = _DataPoints.Max(); Console.WriteLine("Min " + lMin); Console.WriteLine("Max " + lMax); Console.WriteLine("Range " + (lMax - lMin)); Console.WriteLine(); } // private static void RecordDatapoint() { long lDifference = Stopwatch.GetTimestamp() - _Schedule[_CurrentDataPoint]; // positive = late, negative = early _DataPoints.Add(lDifference); Console.WriteLine("Delay in ticks: " + lDifference.ToString("#,##0") + " == " + ((lDifference * 1000000.0) / _FrequencyAsDouble).ToString("#,##0") + " microseconds"); _CurrentDataPoint++; if (_CurrentDataPoint >= _NumDataPoints) _AutoResetEvent.Set(); } // private static long getTimeMsToNextCall_Long() { long lTicks = (_Schedule[_CurrentDataPoint] - Stopwatch.GetTimestamp()); return (1000 * lTicks) / Stopwatch.Frequency; } // private static double getTimeMsToNextCall_Double() { double lTicks = (double)(_Schedule[_CurrentDataPoint] - Stopwatch.GetTimestamp()); return (1000.0 * lTicks) / _FrequencyAsDouble; } // #endregion } // class } // namespace
using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; namespace TimerPrecisionTest { public class MicroTimer { private readonly Queue<long> _TickTimeTable; private readonly Thread _Thread; private readonly long _MaxDelayInTicks; // do not run if the delay was too long private long _NextWakeUpTickTime; public delegate void dOnMicroTimer(int xSenderThreadId, long xWakeUpTimeInTicks, long xDelayInTicks); public event dOnMicroTimer OnMicroTimer; public event dOnMicroTimer OnMicroTimerSkipped; public delegate void dQuickNote(int xSenderThreadId); public event dQuickNote OnMicroTimerStart; public event dQuickNote OnMicroTimerStop; public MicroTimer(Queue<long> xTickTimeTable, long xMaxDelayInTicks) { _TickTimeTable = xTickTimeTable; _Thread = new Thread(new ThreadStart(Loop)); _Thread.Priority = ThreadPriority.Highest; _Thread.Name = "TimerLoop"; _Thread.IsBackground = true; _MaxDelayInTicks = xMaxDelayInTicks; } // public int Start() { if ((_Thread.ThreadState & System.Threading.ThreadState.Unstarted) == 0) return -1; _Thread.Start(); return _Thread.ManagedThreadId; } // public void Stop() { _Thread.Interrupt(); } // private void Loop() { dQuickNote lOnStart = OnMicroTimerStart; if (lOnStart != null) lOnStart(_Thread.ManagedThreadId); try { while (true) { if (_TickTimeTable.Count < 1) break; _NextWakeUpTickTime = _TickTimeTable.Dequeue(); long lMilliseconds = _NextWakeUpTickTime - Stopwatch.GetTimestamp(); if (lMilliseconds < 0L) continue; lMilliseconds = (lMilliseconds * 1000) / Stopwatch.Frequency; lMilliseconds -= 50; // we want to wake up earlier and spend the last time using SpinWait Thread.Sleep((int)lMilliseconds); while (Stopwatch.GetTimestamp() < _NextWakeUpTickTime) { Thread.SpinWait(10); } long lWakeUpTimeInTicks = Stopwatch.GetTimestamp(); long lDelay = lWakeUpTimeInTicks - _NextWakeUpTickTime; if (lDelay < _MaxDelayInTicks) { dOnMicroTimer lHandler = OnMicroTimer; if (lHandler == null) continue; lHandler(_Thread.ManagedThreadId, lWakeUpTimeInTicks, lDelay); } else { dOnMicroTimer lHandler = OnMicroTimerSkipped; if (lHandler == null) continue; lHandler(_Thread.ManagedThreadId, lWakeUpTimeInTicks, lDelay); } } } catch (ThreadInterruptedException) { } catch (Exception) { Console.WriteLine("Exiting timer thread."); } dQuickNote lOnStop = OnMicroTimerStop; if (lOnStop != null) lOnStop(_Thread.ManagedThreadId); } // } // class } // namespace
Example output:
Process with normal priority:
Priviledged processor time for process TimerPrecisionTest.vshost is 109.4 ms
Process with high priority:
Priviledged processor time for process TimerPrecisionTest.vshost is 109.4 ms
IsHighResolution system clock: True
Number of ticks per second: 1,558,893
Number of ticks to obtain a timestamp: 0.27SYSTEM TIMER:
Delay in ticks: 65,625 == 42,097 microseconds
Delay in ticks: 1,414 == 907 microseconds
Delay in ticks: 1,663 == 1,067 microseconds
Delay in ticks: 1,437 == 922 microseconds
Delay in ticks: 25,829 == 16,569 microseconds
Delay in ticks: 1,532 == 983 microseconds
Delay in ticks: 14,478 == 9,287 microseconds
Delay in ticks: 14,587 == 9,357 microseconds
Delay in ticks: 14,615 == 9,375 microseconds
Delay in ticks: 14,650 == 9,398 microseconds
Average 15583
Min 1414
Max 65625
Range 64211THREAD TIMER:
Delay in ticks: 18,890 == 12,118 microseconds
Delay in ticks: 17,493 == 11,221 microseconds
Delay in ticks: 11,750 == 7,537 microseconds
Delay in ticks: 11,824 == 7,585 microseconds
Delay in ticks: 11,914 == 7,643 microseconds
Delay in ticks: 11,858 == 7,607 microseconds
Delay in ticks: 11,935 == 7,656 microseconds
Delay in ticks: 12,049 == 7,729 microseconds
Delay in ticks: 12,108 == 7,767 microseconds
Delay in ticks: 24,953 == 16,007 microseconds
Average 14477.4
Min 11750
Max 24953
Range 13203LOOP AND SLEEP:
Delay in ticks: 7,346 == 4,712 microseconds
Delay in ticks: 7,367 == 4,726 microseconds
Delay in ticks: 7,423 == 4,762 microseconds
Delay in ticks: 7,494 == 4,807 microseconds
Delay in ticks: 7,542 == 4,838 microseconds
Delay in ticks: 7,408 == 4,752 microseconds
Delay in ticks: 20,249 == 12,989 microseconds
Delay in ticks: 20,275 == 13,006 microseconds
Delay in ticks: 20,351 == 13,055 microseconds
Delay in ticks: 20,383 == 13,075 microseconds
Average 12583.8
Min 7346
Max 20383
Range 13037SpinWait tests (neglecting PrivilegedProcessorTime):
Number of ticks for a SpinWait: 4,833
Average ticks for 100x SpinWaits: 1,422 == 912.0831 microseconds
SpinWait caused an error of just: 0.0000 microsecondsMICRO TIMER CLASS:
Delay in ticks: 772 == 495 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 34 == 22 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 5 == 3 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 5 == 3 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 6 == 4 microseconds
(Delay at wakeup time was 1 tick)
Delay in ticks: 7 == 4 microseconds
(Delay at wakeup time was 2 tick)
MicroTimer stopped.
Average 85.3
Min 5
Max 772
Range 767
Protocol Buffers (part 3, advanced), Tcp Networking
Ok guys, hardcore. Unfortunately in a different way than I thought 😉
Once again it took a lot of time to prepare the next source code example. It extends the code of my last post.
I removed the ProtoType class, because inheritance is not a real strength of Protobuf-Net. It is possible, but the ease of code maintenance is a definite argument against it. The complexity increases slowly, we cannot afford code that is difficult to change.
In theory we could write a Serialize() method for each class, the result would be extreme performance. You could use the BitConverter class to convert and then merge values without causing too much overhead. And by using Assembler I guess you could even make it 10 times faster again.
I think that all modern protocols lose a lot of time due to the required reflection, which is mainly the mapping of methods and properties. And unlike Assembler the modern languages do not allow to eg. simply write an integer to a memory address and only read the first byte of it.
You can rotate bits or multiply and divide, what is much less efficient.
In C# you can use the Parallel class to boost your performance. Of course the processing time of each code piece has to be substantial, otherwise it takes longer to create tasks than to solve the problems. In slow networks it might even be worth considering packing data before it is sent.
I replaced the ProtoType class by the Header class, which tells the type of the succeeding data block and also adds a serial identifier. This identifier can be used to receive vital feedbacks. For instance the server tells the client if the data transmission was successful or not. It can also send calculation results or error messages.
The data stream has the following order: header, data, header, data, header, data … except for feedback headers; they do not need data blocks. The enum eType inside the Header class defines all possible data types: ErrorMessage, Feedback, Book or Fable.
As said before I did not use inheritance for the Header class. The code remains legible and there is no spaghetti code to achieve indirect multiple inheritance.
In my last post, the client was sending and the server was receiving data. Now the communication is bidirectional. The client uses two threads, one to send and one to receive. The sending method is still using the BlockingCollection construction and an endless loop on a separate thread.
The server can be connected to several clients simultaneously. To keep it simple I decided sending data without any context switching. This usually blocks threads during the IO operation. I have added tasks inside the event OnMessageBook to give an example on how to avoid this. Nevertheless the send method uses a lock on the client preventing simultaneous write actions on the socket. This is bad practice; you should not apply locks on objects that might be locked in other areas as well. This is out of your scope, you don’t know if the .Net framework or any other code uses a lock on the same object, which could cause undesired behavior. In the below example it would have been better to wrap the client object into another class and apply the lock on that outer shell object. But I guess this is ok in our shorter example. The code was long enough already.
public static void Send(TcpClient xClient, ProtoBufExample.Header xHeader) { if (xHeader == null) return; if (xClient == null) return; lock (xClient) { NetworkStream lNetworkStream = xClient.GetStream(); ....
The above lock problem could be avoided with this:
public class ClientData { public TcpClient Client { get; private set; } private DateTime _ConnectedSince; private string _UserName; .... public static void Send(ClientData xClient, ProtoBufExample.Header xHeader) { if (xHeader == null) return; if (xClient == null) return; lock (xClient) { NetworkStream lNetworkStream = xClient.Client.GetStream(); ....
And here is the entire source code:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; using ProtoBuf; namespace DemoApp { public class ProtoBufExample { public enum eType : byte { eError = 0, eFeedback, eBook, eFable }; [ProtoContract] public class Header { [ProtoMember(1)] public eType objectType; [ProtoMember(2)] public readonly int serialMessageId; public object data; private static int _HeaderSerialId = 0; public Header(object xData, eType xObjectType, int xSerialMessageId = 0) { data = xData; serialMessageId = (xSerialMessageId == 0) ? Interlocked.Increment(ref _HeaderSerialId) : xSerialMessageId; objectType = xObjectType; // we could use "if typeof(T) ...", but it would be slower, harder to maintain and less legible } // constructor // parameterless constructor needed for Protobuf-net public Header() { } // constructor } // class [ProtoContract] public class ErrorMessage { [ProtoMember(1)] public string Text; } // class [ProtoContract] public class Book { [ProtoMember(1)] public string author; [ProtoMember(2, DataFormat = DataFormat.Group)] public List<Fable> stories; [ProtoMember(3)] public DateTime edition; [ProtoMember(4)] public int pages; [ProtoMember(5)] public double price; [ProtoMember(6)] public bool isEbook; public override string ToString() { StringBuilder s = new StringBuilder(); s.Append("by "); s.Append(author); s.Append(", edition "); s.Append(edition.ToString("dd MMM yyyy")); s.Append(", pages "); s.Append(pages); s.Append(", price "); s.Append(price); s.Append(", isEbook "); s.Append(isEbook); s.AppendLine(); if (stories != null) foreach (Fable lFable in stories) { s.Append("title "); s.Append(lFable.title); s.Append(", rating "); s.Append(lFable.customerRatings.Average()); // Average() is an extension method of "using System.Linq;" s.AppendLine(); } return s.ToString(); } // } // class [ProtoContract] public class Fable { [ProtoMember(1)] public string title; [ProtoMember(2, DataFormat = DataFormat.Group)] public double[] customerRatings; public override string ToString() { return "title " + title + ", rating " + customerRatings.Average(); } // } // class public static Book GetData() { return new Book { author = "Aesop", price = 1.99, isEbook = false, edition = new DateTime(1975, 03, 13), pages = 203, stories = new List<Fable>(new Fable[] { new Fable{ title = "The Fox & the Grapes", customerRatings = new double[]{ 0.7, 0.7, 0.8} }, new Fable{ title = "The Goose that Laid the Golden Eggs", customerRatings = new double[]{ 0.6, 0.75, 0.5, 1.0} }, new Fable{ title = "The Cat & the Mice", customerRatings = new double[]{ 0.1, 0.0, 0.3} }, new Fable{ title = "The Mischievous Dog", customerRatings = new double[]{ 0.45, 0.5, 0.4, 0.0, 0.5} } }) }; } // } // class public class PendingFeedbacks { private readonly ConcurrentDictionary<int, ProtoBufExample.Header> _Messages = new ConcurrentDictionary<int, ProtoBufExample.Header>(); public int Count { get { return _Messages.Count; } } public void Add(ProtoBufExample.Header xHeader) { if (xHeader == null) throw new Exception("cannot add a null header"); if (!_Messages.TryAdd(xHeader.serialMessageId, xHeader)) { throw new Exception("there must be a programming error somewhere"); } } // public void Remove(ProtoBufExample.Header xHeader) { ProtoBufExample.Header lHeader; if (!_Messages.TryRemove(xHeader.serialMessageId, out lHeader)) { throw new Exception("there must be a programming error somewhere"); } switch (xHeader.objectType) { case ProtoBufExample.eType.eError: Console.WriteLine("error: " + ((ProtoBufExample.ErrorMessage)xHeader.data).Text); Console.WriteLine("the message that was sent out was: " + lHeader.objectType + " with serial id " + lHeader.serialMessageId); Console.WriteLine("please check the log files" + Environment.NewLine); break; case ProtoBufExample.eType.eFeedback: // all ok ! break; default: Console.WriteLine("warning: This message type was not expected."); break; } } // } // class public static class NetworkTest { public static void Test() { NetworkListener lServer = new NetworkListener("127.0.0.1", 65432, "Server"); NetworkClient lClient = new NetworkClient("127.0.0.1", 65432, "Client"); lServer.Connect(); lServer.OnMessageBook += new NetworkListener.dOnMessageBook(OnMessageBook); lServer.OnMessageFable += new NetworkListener.dOnMessageFable(OnMessageFable); lClient.Connect(); ProtoBufExample.Header lHeader; // send a book across the network ProtoBufExample.Book lBook = ProtoBufExample.GetData(); lHeader = new ProtoBufExample.Header(lBook, ProtoBufExample.eType.eBook); lClient.Send(lHeader); System.Threading.Thread.Sleep(1000); // remove this to see the asynchonous processing (the output will look terrible) // send a fable across the network lHeader = new ProtoBufExample.Header(lBook.stories[1], ProtoBufExample.eType.eFable); lClient.Send(lHeader); System.Threading.Thread.Sleep(1000); lClient.Disconnect(); lServer.Disconnect(); Console.ReadLine(); } // demo: synchronous processing static void OnMessageFable(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Fable xFable) { Console.WriteLine(Environment.NewLine + "received a fable: "); Console.WriteLine(xFable.ToString()); // demo: we tell the server that something went wrong ProtoBufExample.ErrorMessage lErrorMessage = new ProtoBufExample.ErrorMessage() { Text = "The fable was rejected. It is far too short." }; ProtoBufExample.Header lErrorHeader = new ProtoBufExample.Header(lErrorMessage, ProtoBufExample.eType.eError, xHeader.serialMessageId); NetworkListener.Send(xSender, lErrorHeader); } // // demo: asynchronous processing static void OnMessageBook(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Book xBook) { Task.Factory.StartNew(() => { Console.WriteLine(Environment.NewLine + "received a book: "); Console.WriteLine(xBook.ToString()); // send a feedback without any body to signal all was ok ProtoBufExample.Header lFeedback = new ProtoBufExample.Header(null, ProtoBufExample.eType.eFeedback, xHeader.serialMessageId); NetworkListener.Send(xSender, lFeedback); return; }); Console.WriteLine("Book event was raised"); } // } // class public class NetworkListener { private bool _ExitLoop = true; private TcpListener _Listener; public delegate void dOnMessageBook(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Book xBook); public event dOnMessageBook OnMessageBook; public delegate void dOnMessageFable(TcpClient xSender, ProtoBufExample.Header xHeader, ProtoBufExample.Fable xFable); public event dOnMessageFable OnMessageFable; private List<TcpClient> _Clients = new List<TcpClient>(); public int Port { get; private set; } public string IpAddress { get; private set; } public string ThreadName { get; private set; } public NetworkListener(string xIpAddress, int xPort, string xThreadName) { Port = xPort; IpAddress = xIpAddress; ThreadName = xThreadName; } // public bool Connect() { if (!_ExitLoop) { Console.WriteLine("Listener running already"); return false; } _ExitLoop = false; try { _Listener = new TcpListener(IPAddress.Parse(IpAddress), Port); _Listener.Start(); Thread lThread = new Thread(new ThreadStart(LoopWaitingForClientsToConnect)); lThread.IsBackground = true; lThread.Name = ThreadName + "WaitingForClients"; lThread.Start(); return true; } catch (Exception ex) { Console.WriteLine(ex.Message); } return false; } // public void Disconnect() { _ExitLoop = true; lock (_Clients) { foreach (TcpClient lClient in _Clients) lClient.Close(); _Clients.Clear(); } } // private void LoopWaitingForClientsToConnect() { try { while (!_ExitLoop) { Console.WriteLine("waiting for a client"); TcpClient lClient = _Listener.AcceptTcpClient(); string lClientIpAddress = lClient.Client.LocalEndPoint.ToString(); Console.WriteLine("new client connecting: " + lClientIpAddress); if (_ExitLoop) break; lock (_Clients) _Clients.Add(lClient); Thread lThread = new Thread(new ParameterizedThreadStart(LoopRead)); lThread.IsBackground = true; lThread.Name = ThreadName + "CommunicatingWithClient"; lThread.Start(lClient); } } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { _ExitLoop = true; if (_Listener != null) _Listener.Stop(); } } // private void LoopRead(object xClient) { TcpClient lClient = xClient as TcpClient; NetworkStream lNetworkStream = lClient.GetStream(); while (!_ExitLoop) { try { ProtoBufExample.Header lHeader = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Header>(lNetworkStream, ProtoBuf.PrefixStyle.Fixed32); if (lHeader == null) break; // happens during shutdown process switch (lHeader.objectType) { case ProtoBufExample.eType.eBook: ProtoBufExample.Book lBook = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Book>(lNetworkStream, ProtoBuf.PrefixStyle.Fixed32); if (lBook == null) break; lHeader.data = lBook; // not necessary, but nicer dOnMessageBook lEventBook = OnMessageBook; if (lEventBook == null) continue; lEventBook(lClient, lHeader, lBook); break; case ProtoBufExample.eType.eFable: ProtoBufExample.Fable lFable = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Fable>(lNetworkStream, ProtoBuf.PrefixStyle.Fixed32); if (lFable == null) break; lHeader.data = lFable; // not necessary, but nicer dOnMessageFable lEventFable = OnMessageFable; if (lEventFable == null) continue; lEventFable(lClient, lHeader, lFable); break; default: Console.WriteLine("Mayday, mayday, we are in big trouble."); break; } } catch (System.IO.IOException) { if (_ExitLoop) Console.WriteLine("user requested client shutdown"); else Console.WriteLine("disconnected"); } catch (Exception ex) { Console.WriteLine(ex.Message); } } Console.WriteLine("server: listener is shutting down"); } // public static void Send(TcpClient xClient, ProtoBufExample.Header xHeader) { if (xHeader == null) return; if (xClient == null) return; lock (xClient) { try { NetworkStream lNetworkStream = xClient.GetStream(); // send header (most likely a simple feedback) ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Header>(lNetworkStream, xHeader, ProtoBuf.PrefixStyle.Fixed32); // send errors if (xHeader.objectType != ProtoBufExample.eType.eError) return; ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.ErrorMessage>(lNetworkStream, (ProtoBufExample.ErrorMessage)xHeader.data, ProtoBuf.PrefixStyle.Fixed32); } catch (Exception ex) { Console.WriteLine(ex.Message); } } } // } // class public class NetworkClient { public int Port { get; private set; } public string IpAddress { get; private set; } public string ThreadName { get; private set; } private NetworkStream _NetworkStream = null; private TcpClient _Client = null; private bool _ExitLoop = true; private BlockingCollection<ProtoBufExample.Header> _Queue = new BlockingCollection<ProtoBufExample.Header>(); private readonly PendingFeedbacks _PendingFeedbacks = new PendingFeedbacks(); public NetworkClient(string xIpAddress, int xPort, string xThreadName) { Port = xPort; IpAddress = xIpAddress; ThreadName = xThreadName; } // public void Connect() { if (!_ExitLoop) return; // running already _ExitLoop = false; _Client = new TcpClient(); _Client.Connect(IpAddress, Port); _NetworkStream = _Client.GetStream(); Thread lLoopWrite = new Thread(new ThreadStart(LoopWrite)); lLoopWrite.IsBackground = true; lLoopWrite.Name = ThreadName + "Write"; lLoopWrite.Start(); Thread lLoopRead = new Thread(new ThreadStart(LoopRead)); lLoopRead.IsBackground = true; lLoopRead.Name = ThreadName + "Read"; lLoopRead.Start(); } // public void Disconnect() { _ExitLoop = true; _Queue.Add(null); if (_Client != null) _Client.Close(); //if (_NetworkStream != null) _NetworkStream.Close(); } // public void Send(ProtoBufExample.Header xHeader) { if (xHeader == null) return; _PendingFeedbacks.Add(xHeader); _Queue.Add(xHeader); } // private void LoopWrite() { while (!_ExitLoop) { try { ProtoBufExample.Header lHeader = _Queue.Take(); if (lHeader == null) break; // send header ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Header>(_NetworkStream, lHeader, ProtoBuf.PrefixStyle.Fixed32); // send data switch (lHeader.objectType) { case ProtoBufExample.eType.eBook: ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Book>(_NetworkStream, (ProtoBufExample.Book)lHeader.data, ProtoBuf.PrefixStyle.Fixed32); break; case ProtoBufExample.eType.eFable: ProtoBuf.Serializer.SerializeWithLengthPrefix<ProtoBufExample.Fable>(_NetworkStream, (ProtoBufExample.Fable)lHeader.data, ProtoBuf.PrefixStyle.Fixed32); break; default: break; } } catch (System.IO.IOException) { if (_ExitLoop) Console.WriteLine("user requested client shutdown."); else Console.WriteLine("disconnected"); } catch (Exception ex) { Console.WriteLine(ex.Message); } } _ExitLoop = true; Console.WriteLine("client: writer is shutting down"); } // private void LoopRead() { while (!_ExitLoop) { try { ProtoBufExample.Header lHeader = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.Header>(_NetworkStream, ProtoBuf.PrefixStyle.Fixed32); if (lHeader == null) break; if (lHeader.objectType == ProtoBufExample.eType.eError) { ProtoBufExample.ErrorMessage lErrorMessage = ProtoBuf.Serializer.DeserializeWithLengthPrefix<ProtoBufExample.ErrorMessage>(_NetworkStream, ProtoBuf.PrefixStyle.Fixed32); lHeader.data = lErrorMessage; } _PendingFeedbacks.Remove(lHeader); } catch (System.IO.IOException) { if (_ExitLoop) Console.WriteLine("user requested client shutdown"); else Console.WriteLine("disconnected"); } catch (Exception ex) { Console.WriteLine(ex.Message); } } Console.WriteLine("client: reader is shutting down"); } // } // class } // namespace
example output:
waiting for a client
new client connecting: 127.0.0.1:65432
waiting for a client
Book event was raisedreceived a book:
by Aesop, edition 13 Mar 1975, pages 203, price 1.99, isEbook False
title The Fox & the Grapes, rating 0.733333333333333
title The Goose that Laid the Golden Eggs, rating 0.7125
title The Cat & the Mice, rating 0.133333333333333
title The Mischievous Dog, rating 0.37received a fable:
title The Goose that Laid the Golden Eggs, rating 0.7125
error: The fable was rejected. It is far too short.
the message that was sent out was: eFable with serial id 2
please check the log filesclient: writer is shutting down
server: listener is shutting down
user requested client shutdown
client: reader is shutting down
Async and await (advanced, .Net 4.5, C# 5)
The importance is in the details. It all looks easy, but follow each step carefully today.
Windows pauses threads that are waiting for I/O operations to complete (eg. internet or file access). The same threads cannot be used for other jobs in the meantime and new threads need to be created. You could use tasks to solve this specific problem. The program would start an asynchronous task to deal with an I/O operation. After a while the same task would trigger a follow-up procedure via continuation task. It requires some work to cover all code paths, but it can be done.
C# 5 has implemented new keywords to make your life easier. You can use async to mark methods for asynchronous operations, which start synchronously and then split up as soon as the program arrives at any await keyword.
The below Print() method prints the time, sequence and ThreadId. This information is useful to understand the program cycle.
private static void Print(int xStep) { Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + " step " + xStep + " , thread " + Thread.CurrentThread.ManagedThreadId); } // static async void AsyncCalls1() { Print(1); int i = await Task.Run<int>(() => { Print(2); Thread.Sleep(5000); Print(3); return 0; }); Print(4); // same thread as in step 3 Console.ReadLine(); // return void } //
example output:
19:09:36 step 1 , thread 9
19:09:36 step 2 , thread 10
19:09:41 step 3 , thread 10
19:09:41 step 4 , thread 10
The above code is a warm up for us. The method AsyncCalls1() returns void. I emphasize this seemingly insignificant fact here. If you do not return void then the compiler will complain. It wants you to add async in the calling method as well. But if you do so, then it would also ask you to add async in the calling method, that called the calling method. It would be an endless game until you arrive at Main(). And there you would not know what to do, because you cannot use async in Main(). Novices can get quite frustrated with such minuscule glitch.
What is the program doing? It starts new task, which uses another thread from the thread pool. The original thread is then neglected, there is no follow-up. Now check this out: When the created task ends, the program continues with (Task.ContinueWith()) the same thread, which it was using in the task. It seems there is no context switching.
static async void AsyncCalls2() { Print(1); Task<int> task = AsyncCalls3(); Print(4); int x = await task; Print(7); // same thread as in step 6 Console.ReadLine(); // return void } // static async Task<int> AsyncCalls3() { Print(2); int i = await Task.Run<int>(() => { Print(3); Thread.Sleep(5000); Print(5); return 0; }); Print(6); return i; // same thread as in step 5, returning an INTEGER !!! } //
example output:
19:10:16 step 1 , thread 9
19:10:16 step 2 , thread 9
19:10:16 step 3 , thread 10
19:10:16 step 4 , thread 9
19:10:21 step 5 , thread 10
19:10:21 step 6 , thread 10
19:10:21 step 7 , thread 10
Method AsyncCalls3() has a return value, which is a Task. The task that is started inside this method returns an integer. But doesn’t Task.Run() have to return Task<int> according to its definition? It is the await that changes the behavior. It returns the integer value (0). await has been implemented to shorten code, and this is what it does. The code is more legible.
Method AsyncCalls2() calls AsyncCalls3() and receives an integer and not a Task<int>. This is caused by the async keyword.
AsyncCalls2() itself returns void. This is the same issue as with AsyncCalls1(). However AsyncCalls3() can return a value to AsyncCalls2(), because AsyncCalls2() itself uses the async keyword in the method definition.
Check the program sequence. I marked the steps clearly to make comprehension easy. And then analyse the thread context switching. Between step 2 and 3 is a context switch operation, but not between 5, 6 and 7. This is the same behavior as in the first example code.
public static async void AsyncCalls4() { Print(1); string s = await AsyncCalls5(); Print(4); Console.ReadLine(); // return void } // // using System.Net.Http; public static async Task<string> AsyncCalls5() { using (HttpClient lClient = new HttpClient()) { Print(2); string lResult = await lClient.GetStringAsync("http://www.microsoft.com"); Print(3); return lResult; } } //
example output:
19:11:47 step 1 , thread 10
19:11:47 step 2 , thread 10
19:11:48 step 3 , thread 14
19:11:48 step 4 , thread 14
When searching for async and await on the web you will find the emphasis on I/O. Most example programs concentrate on this and don’t explain what is roughly going on inside the async-I/O method itself. Basically .Net async-I/O methods deal with tasks and use a similar construction to Task.ContinueWith(). This is why I concentrated on different examples that can be used in any context (even though not very meaningful examples). The internet download example is more or less a classical one. You can use await on many I/O methods. Keep in mind that AsyncCalls4() returns void and that you are not supposed to call AsyncCalls5() from the Main() method, because you would have to add async to it.
lock and deadlocks (basics)
The main problem of multithreading is sharing data. You cannot access the same data from two different threads at the same time unless they are both reading. Use the lock() command to deal with sharing issues. It is nearly straightforward, just make sure you don’t cause deadlocks.
static void DeadLock() { object A = new object(); object B = new object(); Task.Factory.StartNew(() => { lock (A) { Thread.Sleep(1000); lock (B) Console.WriteLine("locked A, then B"); } }); Task.Factory.StartNew(() => { lock (B) { lock (A) Console.WriteLine("locked B, then A"); } }); Console.ReadLine(); } //
The example shows a simplified deadlock situation. Two tasks (=threads) wait for access to resources, in this case the objects A and B. The first task blocks access to object A. In the meantime the second task blocks access to object B and then asks for access to A. But A is blocked. The next step is that the first task finishes sleeping and asks for access to B. But B was already blocked by the second task. This results in a classical deadlock situation.
The example uses tasks, you can expect each task to use another thread. But this does not have to be the case. Don’t expect to be lucky, it won’t work in the long run. It is necessary to assume that each task equals a new thread.
A lock() on an object can be re-entered multiple times by the same thread.
To avoid deadlocks you have to make sure that locks are requested in the same order. And you should only use locks on private objects and avoid “this” as the locking object.
static void Lock() { object o = new object(); bool locked = false; try { Monitor.Enter(o, ref locked); Console.WriteLine("bla bla bla"); Thread.Sleep(1000); // do something } finally { if (locked) Monitor.Exit(o); } // equals lock (o) { Console.WriteLine("bla bla bla"); Thread.Sleep(1000); // do something } Console.ReadLine(); } //
The lock command is a shortcut replacing the above code. It uses the Monitor class. Generally there is no reason to not use lock() even though the Monitor class offers a lot of extras.
Parallel class (basics)
The parallel class can be found in the System.Threading.Tasks namespace. It has a couple of static methods designed to execute code concurrently. It does make sense to use parallelism when your code length justifies the creation of tasks, the code does not block each other too much (eg. lock(this) {}), the processors have free capacities and the code does not have to run in a sequence. Otherwise the performance does most likely suffer.
Let’s have a look at some examples:
public static void Parallel_For() { Parallel.For(0, 10, i => { Console.WriteLine("parallel start " + i); Thread.Sleep(0); Console.WriteLine("parallel end " + i); }); Console.ReadLine(); } //
example output:
parallel start 0
parallel start 1
parallel end 0
parallel start 2
parallel end 1
parallel start 3
parallel end 3
parallel start 4
parallel end 2
parallel start 5
parallel end 4
parallel start 6
parallel end 5
parallel end 6
parallel start 8
parallel start 7
parallel start 9
parallel end 9
parallel end 8
parallel end 7
public static void Parallel_ForEach() { int[] n = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; Parallel.ForEach(n, i => { Console.WriteLine("parallel start " + i); Thread.Sleep(0); Console.WriteLine("parallel end " + i); }); Console.ReadLine(); } //
example output:
parallel start 0
parallel end 0
parallel start 3
parallel start 4
parallel end 4
parallel start 5
parallel start 1
parallel start 2
parallel end 3
parallel start 6
parallel end 1
parallel start 7
parallel end 2
parallel start 8
parallel end 8
parallel start 9
parallel end 9
parallel end 6
parallel end 5
parallel end 7
public static void Parallel_ForBreak() { ParallelLoopResult result = Parallel.For(0, 10, (i, loopstate) => { Console.WriteLine("parallel start " + i); Thread.Sleep(0); if (i >= 5) loopstate.Break(); //if (i >= 3) loopstate.Stop(); Console.WriteLine("parallel end " + i); }); Console.WriteLine("IsCompleted: " + result.IsCompleted); Console.WriteLine("LowestBreakIteration: " + result.LowestBreakIteration); Console.ReadLine(); } //
example output:
parallel start 0
parallel end 0
parallel start 2
parallel end 2
parallel start 3
parallel start 1
parallel end 1
parallel end 3
parallel start 6
parallel end 6
parallel start 7
parallel start 4
parallel end 7
parallel end 4
parallel start 5
parallel end 5
IsCompleted: False
LowestBreakIteration: 5
The field IsCompleted returns false when the loop did not complete. And LowestBreakIteration represents the lowest iteration number from which the break statement was called. All lower iteration numbers are executed. The function does not break, the code after break will still be executed! The Break() statement can be employed in search-based algorithms where an ordering is present in the data source.
public static void Parallel_ForBreak() { ParallelLoopResult result = Parallel.For(0, 100, (i, loopstate) => { Console.WriteLine("parallel start " + i); Thread.Sleep(0); //if (i >= 5) loopstate.Break(); if (!loopstate.IsStopped) { if (i >= 25) loopstate.Stop(); Console.WriteLine("parallel end " + i); } }); Console.WriteLine("IsCompleted: " + result.IsCompleted); Console.WriteLine("LowestBreakIteration: " + result.LowestBreakIteration); Console.ReadLine(); } //
example output:
parallel start 0
parallel start 12
parallel end 0
parallel start 1
parallel end 1
parallel start 2
parallel end 2
parallel start 3
parallel end 3
parallel start 4
parallel end 4
parallel start 5
parallel end 5
parallel start 6
parallel start 24
parallel end 24
parallel start 25
parallel end 25
parallel end 12
IsCompleted: False
LowestBreakIteration:
The Stop() statement returns null in LowestBreakIteration. The code after Stop() is still executed!
Break() completes all iterations on all threads that are prior to the current iteration on the current thread, and then exit the loop.
Stop() stops all iterations as soon as convenient.