Blog Archives

Events (part 3, advanced)

Events: Let’s go multithreading! We want the crème de la crème. Well, multitasking is also fine.

The code does not need to run in a specific sequence. The required independence is given. Again, there are many ways to use multithreading. An easy approach is to start a task in each method that is called by the event.

C# does support BeginInvoke() for delegates. This method is not supported by the .NET Compact Framework though. We don’t care, because our hardcore programs are for serious applications, definitely not for mobile phone apps. Let’s see how good BeginInvoke() works. Maybe we don’t have to reinvent the wheel.

BeginInvoke() initiates asynchronous calls, it returns immediately and provides the IAsyncResult, which can be used to monitor the progress of the asynchronous call.
EndInvoke() retrieves the results. It blocks until the thread has completed.

You have the following options after calling BeginInvoke():
1) Call EndInvoke() to block the current thread until the call completes.
2) Obtain the WaitHandle from IAsyncResult.AsyncWaitHandle, use its WaitOne() and then call EndInvoke().
3) Poll the IAsyncResult to check the current state, after it has completed call EndInvoke().
4) Pass a callback to BeginInvoke(). The callback will use the ThreadPool to notify you. In the callback you have to call EndInvoke().

public event EventHandler OnChange;

public void DoSomeWork(object sender, object e) {
    Thread.Sleep(2100);
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " mission accomplished!");
} //

public void RunMyExample() {
    OnChange += new EventHandler(DoSomeWork);
    //OnChange += new EventHandler(DoSomeWork);
    //OnChange += new EventHandler(DoSomeWork);

    IAsyncResult lAsyncResult;

    Console.WriteLine("Choice 1");
    lAsyncResult = OnChange.BeginInvoke(this, EventArgs.Empty, null, null);
    OnChange.EndInvoke(lAsyncResult);

    Console.WriteLine("Choice 2");
    lAsyncResult = OnChange.BeginInvoke(this, EventArgs.Empty, null, null);
    lAsyncResult.AsyncWaitHandle.WaitOne();

    Console.WriteLine("Choice 3");
    lAsyncResult = OnChange.BeginInvoke(this, EventArgs.Empty, null, null);
    while (!lAsyncResult.IsCompleted) {
        Thread.Sleep(500);
        Console.WriteLine("work still not completed :(");
    }

    Console.WriteLine("Choice 4"); 
    OnChange.BeginInvoke(this, EventArgs.Empty, (xAsyncResult) => {
            Console.WriteLine("callback running");
            OnChange.EndInvoke(xAsyncResult);
        }, null);

    Console.WriteLine("press return to exit the program");
    Console.ReadLine();
}//    

example output:
Choice 1
Thread 6 mission accomplished!
Choice 2
Thread 6 mission accomplished!
work still not completed 😦
work still not completed 😦
work still not completed 😦
work still not completed 😦
Thread 10 mission accomplished!
work still not completed 😦
press return to exit the program
Thread 6 mission accomplished!
callback running

After having lived such a nice life, we have to face a major issue with BeginInvoke(). Uncomment “//OnChange += new EventHandler(DoSomeWork);”, don’t get annoyed now!
You will get an error message saying

“The delegate must have only one target”.

Although the delegate class can deal with multiple targets, asynchronous calls accept just one target.

So let’s try something else.

    public event EventHandler OnChange;

    public void DoSomeWork(object sender, object e) {
        Thread.Sleep(2100);
        Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " mission accomplished!");
    } //

    public void RunMyExample() {
        OnChange += new EventHandler(DoSomeWork);
        OnChange += new EventHandler((sender, e) => { throw new Exception("something went wrong"); });
        OnChange += new EventHandler(DoSomeWork);

        EventHandler lOnChange = OnChange;
        if (lOnChange == null) return;   // just to demonstrate the proper way to call events, not needed in this example                
        foreach (EventHandler d in lOnChange.GetInvocationList()) {
            Task lTask = Task.Factory.StartNew(() => d(this, EventArgs.Empty));
            lTask.ContinueWith((i) => { Console.WriteLine("Task canceled"); }, TaskContinuationOptions.OnlyOnCanceled);
            lTask.ContinueWith((i) => { Console.WriteLine("Task faulted"); }, TaskContinuationOptions.OnlyOnFaulted);
            lTask.ContinueWith((i) => { Console.WriteLine("Task completion"); }, TaskContinuationOptions.OnlyOnRanToCompletion);
        }

        Console.WriteLine("press return to exit the program");
        Console.ReadLine();
    }//    

It seems that Microsoft has a serious bug here.This code does not execute properly each time. I guess it has to do with the asynchronous behaviour of StartNew(). It sometimes calls the wrong method DoSomeWork() three times and does not raise the exception.
It seems foreach overrides variable “d” before it is inserted in StartNew(). With a little tweak we can avoid this bug. We simply assign “d” to a new local variable. That way we have unique copies of “d”. Weird stuff, but that is the life of a coder sometimes.

public event EventHandler OnChange;

public void DoSomeWork(object sender, object e) {
    Thread.Sleep(2100);
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " mission accomplished!");
} //

public void RunMyExample() {
    OnChange += new EventHandler(DoSomeWork);
    OnChange += new EventHandler((sender, e) => { throw new Exception("something went wrong"); });
    OnChange += new EventHandler(DoSomeWork);

    EventHandler lOnChange = OnChange;
    if (lOnChange == null) return;   // just to demonstrate the proper way to call events, not needed in this example                
    foreach (EventHandler d in lOnChange.GetInvocationList()) {
        EventHandler e = d;
        Task lTask = Task.Factory.StartNew(() => e(this, EventArgs.Empty));
        lTask.ContinueWith((i) => { Console.WriteLine("Task canceled"); }, TaskContinuationOptions.OnlyOnCanceled);
        lTask.ContinueWith((i) => { Console.WriteLine("Task faulted"); }, TaskContinuationOptions.OnlyOnFaulted);
        lTask.ContinueWith((i) => { Console.WriteLine("Task completion"); }, TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    Console.WriteLine("press return to exit the program");
    Console.ReadLine();
}//    

Example output:
press return to exit the program
Thread 11 mission accomplished!
Thread 10 mission accomplished!
Task completion
Task faulted
Task completion

This tiny tweak worked. You just have to know the compiler bugs. I hope this little notice saves you at least 3 hours of work.
You probably remember that we faced a similar issue in my post “Exiting Tasks (advanced)” https://csharphardcoreprogramming.wordpress.com/2013/12/11/exiting-tasks/ . I would suggest to only use Task.Factory.StartNew() with caution. Maybe it only happens in conjunction with lambda expressions.
The following code is also running very well. The variable “d” is not used directly in Task.Factory.StartNew().

...
 foreach (EventHandler d in lOnChange.GetInvocationList()) {
            Action a = () => d(this, EventArgs.Empty);
            Task lTask = Task.Factory.StartNew(a);
            lTask.ContinueWith((i) => { Console.WriteLine("Task canceled"); }, TaskContinuationOptions.OnlyOnCanceled);
            lTask.ContinueWith((i) => { Console.WriteLine("Task faulted"); }, TaskContinuationOptions.OnlyOnFaulted);
            lTask.ContinueWith((i) => { Console.WriteLine("Task completion"); }, TaskContinuationOptions.OnlyOnRanToCompletion);
        }
...
Advertisements

Events (part 2, advanced)

We are going to construct our custom event accessor now. It deals with additions and removals of subscriptions. Accessors do pretty much look like property definitions. But instead of set and get you have to use add and remove.

public class MyActionEvent4 {
    private object _Lock = new object();            // a new object simply to avoid lock conflicts
    private event EventHandler<MyArgs> _OnChange;
    private event EventHandler<MyArgs> OnChange {
        add {
            lock (_Lock) { _OnChange += value; }
        }
        remove {
            lock (_Lock) { _OnChange -= value; }
        }
    } //

    public void RaiseEvent() {
        lock (_Lock) {
            EventHandler<MyArgs> lHandler = _OnChange;
            if (lHandler == null) return;
            lHandler(this, new MyArgs(0));   
        }        
    }//
} // class

Now we have one big problem here. The RaiseEvent() method has to obtain a lock each time, which causes a serious impact on time sensitive programs. Luckily you do not need to care about changing subscriptions during the invocation. Delegate references are thread-safe, because they are immutable like strings. Let’s simply take the lock out of the RaiseEvent() method, et voilà!

public class MyActionEvent5 {
    private object _Lock = new object();
    private event EventHandler<MyArgs> _OnChange;
    private event EventHandler<MyArgs> OnChange {
        add {
            lock (_Lock) { _OnChange += value; }
        }
        remove {
            lock (_Lock) { _OnChange -= value; }
        }
    } //

    public void RaiseEvent() {
        EventHandler<MyArgs> lHandler = _OnChange;
        if (lHandler == null) return;
        lHandler(this, new MyArgs(0));   
    }//
} // class

It is clear that events are not delegates. They restrict access rights from outside of the event class. To describe events you could most likely say that they are wrappers around delegates.

Whenever an exception is thrown during an event call then all following calls will not be executed. And it is tricky to determine which calls were not executed, because the order of event calls is not guaranteed to be in sequence. In fact it does execute in sequence, but there is no guarantee.

static void EventExceptions1() {
    MyActionEvent3 lEvent = new MyActionEvent3();
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 1");
    lEvent.OnChange += (sender, e) => { throw new Exception("OMG!"); };
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 3");
    lEvent.RaiseEvent();
} //

So you have to deal with exceptions manually if you want to satisfy/execute as many event subscriptions as possible. You could add a try/catch block for each subscription. Or you could invoke the InvocationList yourself by calling the GetInvocationList() method [System.Delegate] and execute each item manually in a try/catch block. Let’s have a look at the following practical solution:

public class MyActionEvent6 {
    public event EventHandler OnChange = delegate { };

    public void RaiseEvent() {
        List<Exception> lExceptions = new List<Exception>();

        foreach (Delegate lHandler in OnChange.GetInvocationList()) {
            try { lHandler.DynamicInvoke(this, EventArgs.Empty); }
            catch (Exception ex) { lExceptions.Add(ex); }
        }

        if (lExceptions.Count > 0) throw new AggregateException(lExceptions);
    }//
} // class

static void EventExceptions6() {
    MyActionEvent6 lEvent = new MyActionEvent6();
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 1");
    lEvent.OnChange += (sender, e) => { throw new Exception("OMG!"); };
    lEvent.OnChange += (sender, e) => Console.WriteLine("Executed subscription 3");

    try { lEvent.RaiseEvent(); }
    catch (AggregateException ex) {
        foreach (Exception lException in ex.InnerExceptions) {
            Console.WriteLine(lException.InnerException.Message);
        }
    }
} //

PLINQ (basics)

Language-Integrated Query (LINQ) has been implemented into C# to perform queries over any kind of data like eg. objects or databases. Parallel Language-Integrated Query (PLINQ) is a further approach to access objects. As the name tells us already, it has to do with parallelism. The evolution from sequential queries (LINQ) to parallel queries (PLINQ) was predictable. Extension methods for PLINQ are defined in the class System.Linq.ParallelEnumerable.

public static void PLINQ1() {
    int[] range = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    Console.WriteLine("old school");
    for (int i = 0, n = range.Length; i < n; i++) {         if (range[i] % 2 == 0) Console.WriteLine(range[i]);     }     Console.WriteLine("LINQ");     var linq = from i in range where (i % 2 == 0) select i;     foreach (int i in linq) Console.WriteLine(i);     Console.WriteLine("LINQ2");     var linq2 = range.Where(i => i % 2 == 0);
    foreach (int i in linq2) Console.WriteLine(i);

    Console.WriteLine("PLINQ1");
    var plinq = from i in range.AsParallel() where (i % 2 == 0) select i;
    foreach (int i in plinq) Console.WriteLine(i);

    Console.WriteLine("PLINQ2");
    var plinq2 = range.AsParallel().Where(i => i % 2 == 0);
    foreach (int i in plinq2) Console.WriteLine(i);

    Console.WriteLine("PLINQ3");
    var plinq3 = range.AsParallel().Where(i => { Thread.Sleep(1000); return (i % 2 == 0); });
    foreach (int i in plinq3) Console.WriteLine(i);

    Console.WriteLine("PLINQ3 sorted");
    var plinq3sorted = range.AsParallel().AsOrdered().Where(i => { Thread.Sleep(1000); return (i % 2 == 0); });
    foreach (int i in plinq3sorted) Console.WriteLine(i);

    Console.ReadLine();
} //

Interestingly the runtime decides by itself if it makes sense to execute your query as parallel. Thus parallel execution is not guaranteed unless you specify WithExecutionMode(ParallelExecutionMode.ForceParallelism).
You can even get more specific by using WithDegreeOfParallelism() and limit the number of processors being used.

public static void PLINQ2() {
    var range = Enumerable.Range(0, 25);
    var result = range.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).Where(i => i % 2 == 0);
    //var result = range.AsParallel().WithDegreeOfParallelism(4).Where(i => i % 2 == 0);
    //var result = range.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).WithDegreeOfParallelism(4).Where(i => i % 2 == 0);
    foreach (int i in result) Console.WriteLine(i);

    Console.ReadLine();
} //

And now we are facing something weird. We created a sorted result, but somehow the sorting gets lost! The reason is that unlike “foreach” ForAll() does not wait for all results before it starts executing.
Hence we see an unexpected result.

public static void PLINQ3() {
    var range = Enumerable.Range(0, 25);

    var plinq2sorted = range.AsParallel().AsOrdered().Where(i => { Thread.Sleep(1000); return (i % 2 == 0); });
    Console.WriteLine("sorted");
    foreach (int i in plinq2sorted) Console.WriteLine(i);
    Console.WriteLine("something is going wrong?");
    plinq2sorted.ForAll(i => Console.WriteLine(i));

    Console.ReadLine();
} //

example output:
sorted
0
2
4
6
8
10
12
14
16
18
20
22
24
something is going wrong?
6
0
4
2
14
10
12
8
22
18
16
20
24

Parallel queries can fail and throw exceptions. These exceptions will not stop the execution of the queries. They are collected and returned in one single Exception that can be caught by the usual try/catch block.

public static void PLINQ4() {
    var range = Enumerable.Range(0, 25);

    try {
        var plinq2sorted = range.AsParallel().AsOrdered().Where(i => {
            Thread.Sleep(500);
            if (i > 15) throw new ArgumentException("exception for number " + i);
            return (i % 2 == 0);
        });

        foreach (int i in plinq2sorted) Console.WriteLine(i);
    }
    catch (AggregateException e) {
        Console.WriteLine("Exception thrown for " + e.InnerExceptions.Count + " results");
        foreach (var innerException in e.InnerExceptions) Console.WriteLine(innerException.Message);
    }

    Console.ReadLine();
} //

example output:
Exception thrown for 8 results
exception for number 16
exception for number 20
exception for number 17
exception for number 21
exception for number 19
exception for number 23
exception for number 22
exception for number 18

Parallel class (basics)

The parallel class can be found in the System.Threading.Tasks namespace. It has a couple of static methods designed to execute code concurrently. It does make sense to use parallelism when your code length justifies the creation of tasks, the code does not block each other too much (eg. lock(this) {}), the processors have free capacities and the code does not have to run in a sequence. Otherwise the performance does most likely suffer.

Let’s have a look at some examples:

public static void Parallel_For() {
    Parallel.For(0, 10, i => {
        Console.WriteLine("parallel start " + i);
        Thread.Sleep(0);
        Console.WriteLine("parallel end " + i);
    });
    Console.ReadLine();
} //

example output:
parallel start 0
parallel start 1
parallel end 0
parallel start 2
parallel end 1
parallel start 3
parallel end 3
parallel start 4
parallel end 2
parallel start 5
parallel end 4
parallel start 6
parallel end 5
parallel end 6
parallel start 8
parallel start 7
parallel start 9
parallel end 9
parallel end 8
parallel end 7

public static void Parallel_ForEach() {
    int[] n = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
    Parallel.ForEach(n, i => {
        Console.WriteLine("parallel start " + i);
        Thread.Sleep(0);
        Console.WriteLine("parallel end " + i);
    });
    Console.ReadLine();
} //

example output:
parallel start 0
parallel end 0
parallel start 3
parallel start 4
parallel end 4
parallel start 5
parallel start 1
parallel start 2
parallel end 3
parallel start 6
parallel end 1
parallel start 7
parallel end 2
parallel start 8
parallel end 8
parallel start 9
parallel end 9
parallel end 6
parallel end 5
parallel end 7

public static void Parallel_ForBreak() {
    ParallelLoopResult result = Parallel.For(0, 10, (i, loopstate) => {
        Console.WriteLine("parallel start " + i);
        Thread.Sleep(0);
        if (i >= 5) loopstate.Break();
        //if (i >= 3) loopstate.Stop();
        Console.WriteLine("parallel end " + i);                                
    });
    Console.WriteLine("IsCompleted: " + result.IsCompleted);
    Console.WriteLine("LowestBreakIteration: " + result.LowestBreakIteration);
    Console.ReadLine();
} //

example output:
parallel start 0
parallel end 0
parallel start 2
parallel end 2
parallel start 3
parallel start 1
parallel end 1
parallel end 3
parallel start 6
parallel end 6
parallel start 7
parallel start 4
parallel end 7
parallel end 4
parallel start 5
parallel end 5
IsCompleted: False
LowestBreakIteration: 5

The field IsCompleted returns false when the loop did not complete. And LowestBreakIteration represents the lowest iteration number from which the break statement was called. All lower iteration numbers are executed. The function does not break, the code after break will still be executed! The Break() statement can be employed in search-based algorithms where an ordering is present in the data source.

public static void Parallel_ForBreak() {
    ParallelLoopResult result = Parallel.For(0, 100, (i, loopstate) => {
        Console.WriteLine("parallel start " + i);
        Thread.Sleep(0);
        //if (i >= 5) loopstate.Break();
        if (!loopstate.IsStopped) {
            if (i >= 25) loopstate.Stop();
            Console.WriteLine("parallel end " + i);
        }
    });
    Console.WriteLine("IsCompleted: " + result.IsCompleted);
    Console.WriteLine("LowestBreakIteration: " + result.LowestBreakIteration);
    Console.ReadLine();
} //

example output:
parallel start 0
parallel start 12
parallel end 0
parallel start 1
parallel end 1
parallel start 2
parallel end 2
parallel start 3
parallel end 3
parallel start 4
parallel end 4
parallel start 5
parallel end 5
parallel start 6
parallel start 24
parallel end 24
parallel start 25
parallel end 25
parallel end 12
IsCompleted: False
LowestBreakIteration:

The Stop() statement returns null in LowestBreakIteration. The code after Stop() is still executed!

Break() completes all iterations on all threads that are prior to the current iteration on the current thread, and then exit the loop.

Stop() stops all iterations as soon as convenient.