Blog Archives

migration C#, Java, C++ (day 11), future and promise

There is hardly anything directly to compare with in C#. Future and promise are C++ concepts, which are probably best compared to C# task concepts. The comparison was not easy today. Have a look at the outcome.

Wiki

future and promise

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Program {
    public class Day11 {

        private class Params {
            public int Input;
            public int Result;
        }

        private void DoSomething() {
            Console.WriteLine("DoSomething()");
            Thread.Sleep(4000);
        }  //     

        private void Method1(object xObject) {
            Params lParams = xObject as Params;
            Console.WriteLine("Method1");
            Thread.Sleep(2000);
            lParams.Result = 2 * lParams.Input;
        } //

        private int Method2(int xInt1) {
            Console.WriteLine("Method2");
            Thread.Sleep(2000);
            return 2 * xInt1;
        } //

        private async Task<int> Method3(int xInt1) {
            await Task.Delay(0);
            Console.WriteLine("Method3");
            return 2 * xInt1;
        } //

        private void Method4(Params xParams, AutoResetEvent xEvent) {
            Console.WriteLine("Method4");
            Thread.Sleep(2000);
            xEvent.WaitOne();
            xParams.Result = 2 * xParams.Input;
        } //


        static void Main(string[] args) {
            Day11 lDay10 = new Day11();
            lDay10.test();
        } //

        public async void test() {
            int i;

            // use a thread to get an asynchronous result
            ParameterizedThreadStart lParams = new ParameterizedThreadStart(Method1);
            Thread thread1 = new Thread(lParams);
            Params p = new Params() { Input = 123 };
            thread1.Start(p);
            Console.WriteLine("Method1.join()");
            thread1.Join();
            Console.WriteLine("Method1 result is: " + p.Result);

            Func<int, int> t1 = new Func<int, int>(x => { return Method2(x); });
            i = t1(123); // synchronous
            IAsyncResult lResult = t1.BeginInvoke(123, null, null); // asynchronously
            lResult.AsyncWaitHandle.WaitOne();

            i = await Method3(123);
            Console.WriteLine("Method3 result is: " + i);

            p.Input = 123;
            p.Result = 0;
            AutoResetEvent lEvent = new AutoResetEvent(false);
            Task t2 = new Task(() => Method4(p, lEvent));
            t2.Start();
            lEvent.Set();
            t2.Wait();
            Console.WriteLine("Method4 result is: " + p.Result);

            Console.ReadLine();
        } //

    } // class
} // namespace

example output:
Method1.join()
Method1
Method1 result is: 246
Method2
Method2
Method3
Method3 result is: 246
Method4
Method4 result is: 246

#include <future>
#include <iostream>
#include <thread>

using namespace std;

void DoSomething() {
  cout << "DoSomething()" << endl;
  this_thread::sleep_for(chrono::seconds(4));
}  //

void Thread1(int xInt1, int &xInt2) {
  cout << "Thread1" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  xInt2 = 2 * xInt1;
} //

int Thread2(int xInt1) {
  cout << "Thread2" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  return 2 * xInt1;
} //

int Thread3(future<int> &xInt1) {
  cout << "Thread3" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  return 2 * xInt1.get(); // .get() => waits until the promise is fullfilled
} //

int Thread4(shared_future<int> xInt1) {
  cout << "Thread4" << endl;
  this_thread::sleep_for(chrono::seconds(2));
  return 2 * xInt1.get(); // returns the same value to all waiting futures
} //

void test(){
  int i;

  // use a thread to get an asynchronous result
  thread t1(Thread1, 123, ref(i));
  cout << "Thread1.join()" << endl;
  t1.join();
  cout << "Thread1 result is: " << i << endl;

  // like a task, may be synchronous or asychronous  
  future<int> f1 = async(Thread2, 123);
  cout << "Thread2, f1.get()" << endl;
  i = f1.get(); // waits

  // like a synchronous task, which is running on the same thread
  // will be called when you get the value
  future<int> f2 = async(launch::deferred, Thread2, 123); 
  cout << "Thread2, f2.get()" << endl; 
  i = f2.get(); // waits
  cout << "Thread2 deferred result is: " << i << endl;

  // like an asynchronous task, which is running on a new thread
  future<int> f3 = async(launch::async, Thread2, 123); 
  cout << "Thread2, f3.get()" << endl; 
  i = f3.get(); // waits
  cout << "Thread2 async result is: " << i << endl;

  // same as f1, because this is the default value; the system decides what to do
  future<int> f4 = async(launch::async | launch::deferred, Thread2, 123);
  i = f4.get(); // waits

  promise<int> lPromise5; // promise to provide a value at a later time  
  future<int> f5 = lPromise5.get_future();
  future<int> f5b = async(launch::async, Thread3, ref(f5));
  DoSomething();
  cout << "lPromise5.set_value()" << endl;
  lPromise5.set_value(123); // fulfill our promise now
  cout << "f5b async result is: " << f5b.get() << endl;

  promise<int> lPromise6; // promise to provide a value at a later time
  future<int> f6 = lPromise6.get_future();
  DoSomething();
  // tell the parallel thread to stop waiting for the promise fulfillment
  lPromise6.set_exception(make_exception_ptr(runtime_error("sorry, I cannot fulfill my promise")));  

  //
  // SOME PRACTICAL ISSUES
  //
  promise<int> lPromise7; // promise to provide a value at a later time
  future<int> f7 = lPromise7.get_future();
  promise<int> lPromise8 = move(lPromise7); // you cannot assign a promise, you have to move it
  future<int> f8 = move(f7); // same with the future    

  // you cannot call future.get() more than once
  // to allow multiple consumers use:
  shared_future<int> f9 = f8.share();
  future<int> f10 = async(launch::async, Thread4, f9);
  future<int> f11 = async(launch::async, Thread4, f9);
  future<int> f12 = async(launch::async, Thread4, f9);
  future<int> f13 = async(launch::async, Thread4, f9);
  lPromise8.set_value(123);
  cout << "f10: " << f10.get() << ", f11:" << f11.get() << ", f12:" << f12.get() << ", f13:" << f13.get() << endl;

  // packaged_task
  auto t2 = bind(Thread2, 123);
  t2(); // synchronous

  packaged_task<int()> t3(bind(Thread2, 123)); // wrapper to make async calls
  t3(); // call the task  in a different context
  future<int> f14 = t3.get_future(); // getting a future is not possible with t2 
  int x = f14.get();
  cout << "f14: " << x << endl;

  cin.get();
} //

example output:
Thread1.join()
Thread1

Thread1 result is: 246
Thread2, f1.get()
Thread2

Thread2, f2.get()
Thread2
Thread2 deferred result is: 246
Thread2, f3.get()
Thread2

Thread2 async result is: 246
Thread2
DoSomething()
Thread3

lPromise5.set_value()
f5b async result is: 246
DoSomething()
Thread4
Thread4

Thread4
Thread4
f10: 246, f11:246, f12:246, f13:246
Thread2
Thread2
f14: 246

package Program;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Day11 {

  private static void Sleep(int xMilliseconds) {
    try {
      Thread.sleep(xMilliseconds);
    } catch (InterruptedException ex) {
      System.err.println(ex.getMessage());
    }
  } //

  private class Thread1 extends Thread {

    public int Input;
    public int Result;

    @Override
    public void run() {
      System.out.println("Thread1");
      Sleep(2000);
      Result = 2 * Input;
    } //
  } // class

  private class Thread2 {

    private final ExecutorService pool = Executors.newFixedThreadPool(5);

    public Future<Integer> getPromise(final int xInt1) {
      return pool.submit(() -> {
        System.out.println("Thread2");
        Sleep(2000);
        return 2 * xInt1;
      });
    } //
  } // class

  private class Thread3 implements Callable<Integer> {

    private final ExecutorService _Pool = Executors.newFixedThreadPool(5);
    private int _Int1;

    public Future<Integer> getPromise(final int xInt1) {
      _Int1 = xInt1;
      return _Pool.submit(this);
    } //

    @Override
    public Integer call() throws Exception {
      synchronized (this) {
        this.wait();
      }
      System.out.println("Thread3");
      Sleep(2000);
      return 2 * _Int1;
    } //
  } // class

  public final void test() throws InterruptedException, ExecutionException {

    // use a thread to get an asynchronous result    
    Thread1 t1 = new Thread1();
    t1.Input = 123;
    t1.start();
    System.out.println("Thread1.join()");
    t1.join();
    System.out.println("Thread1 result is: " + t1.Result);

    // like a task, may be synchronous or asychronous 
    Thread2 t2 = new Thread2();
    Future<Integer> f1 = t2.getPromise(123);
    System.out.println("Thread2, f1.getPromise(123)");
    int i = f1.get(); // waits   
    System.out.println("f1 result is: " + i);

    // deferred
    Thread3 t3 = new Thread3();
    Future<Integer> f2 = t3.getPromise(123);
    synchronized (t3) {
      t3.notify(); // start calculation manually    
    }
    System.out.println("Thread3 result is: " + f2.get()); // wait

    new Scanner(System.in).nextLine();
  } //

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    Day11 lDay11 = new Day11();
    lDay11.test();
  } //

} // class

example output:
Thread1
Thread1.join()
Thread1 result is: 246
Thread2
Thread2, f1.getPromise(123)
f1 result is: 246
Thread3
Thread3 result is: 246

Advertisements

Async and await (advanced, .Net 4.5, C# 5)

The importance is in the details. It all looks easy, but follow each step carefully today.

Windows pauses threads that are waiting for I/O operations to complete (eg. internet or file access). The same threads cannot be used for other jobs in the meantime and new threads need to be created. You could use tasks to solve this specific problem. The program would start an asynchronous task to deal with an I/O operation. After a while the same task would trigger a follow-up procedure via continuation task. It requires some work to cover all code paths, but it can be done.

C# 5 has implemented new keywords to make your life easier. You can use async to mark methods for asynchronous operations, which start synchronously and then split up as soon as the program arrives at any await keyword.

The below Print() method prints the time, sequence and ThreadId. This information is useful to understand the program cycle.

private static void Print(int xStep) {
    Console.WriteLine(DateTime.Now.ToString("HH:mm:ss") + " step " + xStep + " , thread " + Thread.CurrentThread.ManagedThreadId);
} //

static async void AsyncCalls1() {
    Print(1);
    int i = await Task.Run<int>(() => {
        Print(2); Thread.Sleep(5000);
        Print(3); return 0;
    });
    Print(4);  // same thread as in step 3

    Console.ReadLine();
    // return void
} //

example output:
19:09:36 step 1 , thread 9
19:09:36 step 2 , thread 10
19:09:41 step 3 , thread 10
19:09:41 step 4 , thread 10

The above code is a warm up for us. The method AsyncCalls1() returns void. I emphasize this seemingly insignificant fact here. If you do not return void then the compiler will complain. It wants you to add async in the calling method as well. But if you do so, then it would also ask you to add async in the calling method, that called the calling method. It would be an endless game until you arrive at Main(). And there you would not know what to do, because you cannot use async in Main(). Novices can get quite frustrated with such minuscule glitch.

What is the program doing? It starts new task, which uses another thread from the thread pool. The original thread is then neglected, there is no follow-up. Now check this out: When the created task ends, the program continues with (Task.ContinueWith()) the same thread, which it was using in the task. It seems there is no context switching.

static async void AsyncCalls2() {
    Print(1); Task<int> task = AsyncCalls3();
    Print(4); int x = await task;
    Print(7); // same thread as in step 6

    Console.ReadLine();
    // return void
} //

static async Task<int> AsyncCalls3() {
    Print(2);
    int i = await Task.Run<int>(() => {
        Print(3); Thread.Sleep(5000);
        Print(5); return 0;
    });
    Print(6); return i;  // same thread as in step 5, returning an INTEGER !!!
} //

example output:
19:10:16 step 1 , thread 9
19:10:16 step 2 , thread 9
19:10:16 step 3 , thread 10
19:10:16 step 4 , thread 9
19:10:21 step 5 , thread 10
19:10:21 step 6 , thread 10
19:10:21 step 7 , thread 10

Method AsyncCalls3() has a return value, which is a Task. The task that is started inside this method returns an integer. But doesn’t Task.Run() have to return Task<int> according to its definition? It is the await that changes the behavior. It returns the integer value (0). await has been implemented to shorten code, and this is what it does. The code is more legible.

Method AsyncCalls2() calls AsyncCalls3()
and receives an integer and not a Task<int>. This is caused by the async keyword.
AsyncCalls2() itself returns void. This is the same issue as with AsyncCalls1(). However AsyncCalls3() can return a value to AsyncCalls2(), because AsyncCalls2() itself uses the async keyword in the method definition.

Check the program sequence. I marked the steps clearly to make comprehension easy. And then analyse the thread context switching. Between step 2 and 3 is a context switch operation, but not between 5, 6 and 7. This is the same behavior as in the first example code.

public static async void AsyncCalls4() {
    Print(1); string s = await AsyncCalls5();
    Print(4);

    Console.ReadLine();
    // return void
} //

// using System.Net.Http;
public static async Task<string> AsyncCalls5() {
    using (HttpClient lClient = new HttpClient()) {
        Print(2); string lResult = await lClient.GetStringAsync("http://www.microsoft.com");
        Print(3); return lResult; 
    }
} //

example output:
19:11:47 step 1 , thread 10
19:11:47 step 2 , thread 10
19:11:48 step 3 , thread 14
19:11:48 step 4 , thread 14

When searching for async and await on the web you will find the emphasis on I/O. Most example programs concentrate on this and don’t explain what is roughly going on inside the async-I/O method itself. Basically .Net async-I/O methods deal with tasks and use a similar construction to Task.ContinueWith(). This is why I concentrated on different examples that can be used in any context (even though not very meaningful examples). The internet download example is more or less a classical one. You can use await on many I/O methods. Keep in mind that AsyncCalls4() returns void and that you are not supposed to call AsyncCalls5() from the Main() method, because you would have to add async to it.