Asynchronous Programming with C++

Asynchronous programming is a common pattern that vastly simplifies multithreaded programming. The idea is that a thread calls a function to be executed on a background thread. The call to the function returns immediately on the calling thread, and the function continues executing on the background thread. When the function is complete, the calling thread can get the results. A large percentage of multithreaded programming can be done using the asynchronous programming pattern. If the asynchronous programming pattern can be used in a situation, it is good practice to go ahead and use the pattern instead of creating a custom multithreaded implementation. A custom multithreaded implementation will be harder for other programmers to understand, and potentially more error prone. This article will provide a sample program illustrating asynchronous programming in modern C++.

In the sample program, there is a class called BackgroundWorker. BackgroundWorker has just two methods: DoWork() and DoWorkAsync(). DoWork() is a function that simulates a CPU intensive long running task. It generates millions of random numbers and sums them. DoWorkAsync() calls DoWork() on a background thread. Multiple DoWorkAsync() calls are made one after the other so they are executed in parallel on separate threads. The results of each DoWork() (i.e. the sum of the random numbers) is then retrieved and reported. The sample program is completely useless, but it illustrates running multiple execution threads of a function in parallel and then getting the results when they complete.

With C++ 11, the standard added the ability to do cross platform multithreaded programming with just C++. You no longer need to make operating system specific threading API calls to get the job done. The C++ standard added std::async() in the header for the purpose of calling a function asynchronously. The function can be any callable object in C++. That means a pointer to a function, a pointer to a member function, a functor, a lambda expression, or a std::function template instance. std::async() will return a std::future of the callable object which represents the execution return value when the background thread is finished. This return value is retrieved by calling get() on the future. If the background thread is still executing the function when get() is called, then get() will block until the function is done or throws an exception.

Here is the source code for the C++ command prompt program.

BackgroundWorker.h


#pragma once
#include 
#include 
#include 
#include 


/**	@class BackgroundWorker
*	Use to simulate a long running function through use of summing
*	random number generation. DoWork does this work synchronously.
*	DoWorkAsyn will call DoWork asynchronously. */
class BackgroundWorker {

public:
    BackgroundWorker();

    /**	Simulate doing long running calculation by summing randomly
    *	generated integers.
    *	@param[in] iRepeats  Number of random numbers to sum.
    *	@param[in,out] fAverage  As output, the average of all the random numbers.
    *	@param[in] callBack  Function callback called when processing is done.
    *		The long long parameter is the return value of this method.
    *	@returns sum of random numbers.
    *	@throws runtime_error if a certain random number is chosen a certain
    *		number of times. This is to simulate exception handling. */
    long long DoWork(long iRepeats, float& fAverage, std::function callBack);

    /**	Essentially calls DoWork() asynchronously.
    *	@param[in] iRepeats  Number of random numbers to sum.
    *	@param[in,out] fAverage  As output, the average of all the random numbers.
    *	@param[in] callBack  Function callback called when processing is done.
    *		The long long parameter is the return value of this method.
    *	@returns future of DoWork().
    *	@throws runtime_error if a certain random number is chosen a certain
    *		number of times. This is to simulate exception handling. */
    std::future DoWorkAsync(long iRepeats, float& fAverage, std::function callBack);

}; // end class BackgroundWorker

BackgroundWorker.cpp


#include "BackgroundWorker.h"
#include 
#include 
#include 

using namespace std;


BackgroundWorker::BackgroundWorker() {
}


long long BackgroundWorker::DoWork(long iRepeats, float& fAverage, std::function callBack) {

    cout << "Entering DoWork with repeats " << iRepeats << " on thread id " << std::this_thread::get_id() << endl;

    long long llSum = 0; // This is the return value of the sum of random numbers.
    fAverage = 0.0f;

    if (iRepeats < 0) {
        callBack(llSum);
        return llSum;
    }

    std::shared_ptr spRndEngine; /**< Mersenne twister random number engine. */
    std::shared_ptr spRndDistribution; /**< Int distribution. */

    // This is used to generate a seed for the random number generator. Time is no
    // longer recommended to use as a seed.
    random_device theSeed;

    // Use the seed in instantiating the random number engine.
    // Note the use of the () operator on the theSeed.
    spRndEngine = make_shared(theSeed());

    // Set up a uniform distribution of ints from 0 to 99.
    spRndDistribution = make_shared(0, 99);

    int iBadNumberHitCountLimit = iRepeats / 100 + static_cast(iRepeats*0.00003f);
    int iBadNumberHitCount = 0;

    for (long i = 0; i != iRepeats; ++i) {
        // This call uses the random engine in the random distribution to
        // create a random number from 0 to 99.
        int iRandomNumber = (*spRndDistribution)(*spRndEngine);

        // Simulate something going wrong by throwing an exception if a certain number
        // is hit a certain number of times. 66 is the bad number.
        if (iRandomNumber == 66) {
            iBadNumberHitCount++;
            if (iBadNumberHitCount >= iBadNumberHitCountLimit) {
                // Simulate some type of exception occuring.
                stringstream ss;
                ss << "Bad value processed. Number of repeats is " << iRepeats << endl;
                throw runtime_error(ss.str());
            }
        }

        llSum += iRandomNumber;
    }

    cout << "Exiting DoWork with repeats " << iRepeats << " on thread id " << std::this_thread::get_id() << endl;

    fAverage = static_cast(llSum) / iRepeats;
    callBack(llSum);
    return llSum;
}


std::future BackgroundWorker::DoWorkAsync(long iRepeats, float& fAverage, std::function callBack) {

    // Note the return value of this method is a future with return type of the function DoWork.
    // Note the use of std::ref() to pass fAverage by reference.
    // Note how address of member function is passed.
    // Note how the pointer to instance of class is passed after member function to call (e.g. this)
    // Pasing this is only required when passing a member function.
    // launch::async assures it is launched on another thread asynchronously.
    return async(launch::async, &BackgroundWorker::DoWork, this, iRepeats, std::ref(fAverage), callBack);
}

CPlusPlusAsyncCPApp.cpp


// CPlusPlusAsyncCPApp.cpp : This file contains the 'main' function. Program execution begins and ends there.
#include 
#include 
#include 
#include "BackgroundWorker.h"

using namespace std;


int main() {

    try {
        // Get the start time.
        using std::chrono::system_clock;
        auto tStart = system_clock::now();

        cout << "Starting program on thread id " << this_thread::get_id() << endl;

        unsigned int uiNumberOfCalls = 4; // This is the number of parallel, asynchronous calls
        // to DoWork to make.
        int iBaseRepeats = 10000000; // Base number of random numbers to sum.

        BackgroundWorker bw;

        // Set up vector of futures.
        vector vFutures;

        // Set up vector of averages of random number. This holds the fAverage output
        // parameter to DoWorkAsync.
        vector vfAverages;
        vfAverages.resize(uiNumberOfCalls);

        /**	Define the callback function for the asynchronous call. Note how the
        *	lambda signature matches the callback signature.
        *
        *	Note, this callback is actually running on the thread that calls it.
        *	To invoke something on the main thread, would need to post a message
        *	via OS call (e.g. PostMessage).
        *	@param[in] llSumm This will be the sum of random numbers. */
        function TheCallBack = [](long long llSumm) -> void {
            cout << "Entering callback on thread id " << std::this_thread::get_id() << endl;
            cout << "Callback is passed sum of " << llSumm << endl;
        };

        // Call DoWorkAsync uiNumberOfCalls in parallel.
        for (int i = 0; i != uiNumberOfCalls; ++i) {
            // Note how the number of repeats is incremented for each call. This is so the calls can be
            // distinguished.
            vFutures.push_back(
                bw.DoWorkAsync(iBaseRepeats + i, vfAverages[i], TheCallBack)
            );
        }

        // Note, instead of using a callback to know when the DoWork is complete,
        // you could also poll the future with the Is_ready() method. If true,
        // then the execution thread has finished. Is_ready() does not block.
        // Is_ready() is experimental. You can also use wait_for(0) to check if 
        // the future is ready. wait_for() with a non-zero duration will block
        // for up to that duration waiting for the future to be ready. get() will
        // return the return value of DoWork. If an exception is thrown
        // by DoWork, it will be transferred to this main thread at the time
        // of calling get(). get() does block until the executing thread is done
        // or an exception is thrown.
        for (int i = 0; i != uiNumberOfCalls; ++i) {
            long long llSum = vFutures[i].get();
            cout << "Returned with sum " << llSum << " and average " << vfAverages[i] << " for number of repeats " << iBaseRepeats + i << " on thread id " << this_thread::get_id() << endl;
        }

        // Get the end time and output the duration.
        auto tEnd = system_clock::now();
        auto durationRun(tEnd - tStart);
        using std::chrono::duration;
        cout << "Run duration: " << duration(durationRun).count() << " milliseconds" << endl;

        cout << "Ending program on thread id " << this_thread::get_id() << endl;
    }
    catch (exception & e) {
        cout << "Exception caught! Message: " << e.what() << endl;
    }
} // end main

The C++ code above is nicely commented. Reading those comments should explain most of the functioning. The C++ program illustrates features you would probably want to use in a real world program. For example, it shows how to pass a parameter by reference so you can use an output parameter in an asynchronous function (see fAverage parameter in DoWork() and DoWorkAsync() methods).

Exception handling is also illustrated. The DoWork() method can throw an exception if a specific random number is called a specific number of times. The exception does not happen on most runs. You have to run the program multiple times to see the difference between when DoWork() completes successfully on all threads, or when one of the threads throws and exception. If one of the threads throws an exception, the exception is percolated back to the main calling thread. It is important to understand that if an exception is thrown, it does not happen at the call to DoWorkAsync(). The call to DoWorkAsync() simply returns immediately a future to the method. The call to get() on the returned future is when the exception will percolated back to the main calling thread. Putting a try-catch around the future::get() is how you would catch exceptions thrown by the asynchronous function.

It is important to understand that when get() is called on the future, if the asynchronous method is not done executing, it will block the calling thread until it is done. When it is done, you will either get the return value of the asynchronous method, or an exception will be thrown if the asynchronous method throws an exception. get() blocking may be unacceptable in some situations. If this was running on the main UI thread for example, blocking could be unacceptable. You could call wait_for(0) to see if the future is ready. The 0 passed will cause wait_for() to return immediately. Unfortunately, this amounts to polling, and polling can be awkward to orchestrate while also keeping the UI responsive.

This example also illustrates passing a callback std::function to the asynchronous method. When DoWork() is done, the callback function is called by DoWork(), passing the return value to the callback function. You do not need this callback function if you are fine with just using future::get(). I wanted to also show the option of the asynchronous method calling a callback when done. Though not shown in this example, you could post a message on the message queue in the callback function. This will trigger an event when processing is done. This will allow your UI to know right away when processing is done without polling or blocking. Since the callback is called by DoWork(), the callback runs on the background thread that DoWork() is running on. This means you will not be able to access UI controls from the callback; thus the reason the callback will need to post a message to cause something to happen on the UI.

Here is an example of the program’s output when all the threads succeed:

Starting program on thread id 13304
Entering DoWork with repeats 10000000 on thread id 10868
Entering DoWork with repeats 10000001 on thread id 14528
Entering DoWork with repeats 10000002 on thread id 9752
Entering DoWork with repeats 10000003 on thread id 1036
Exiting DoWork with repeats 10000001 on thread id 14528
Entering callback on thread id 14528
Callback is passed sum of 495023730
Exiting DoWork with repeats 10000000 on thread id 10868
Entering callback on thread id 10868
Callback is passed sum of 494943204
Returned with sum 494943204 and average 49.4943 for number of repeats 10000000 on thread id 13304
Returned with sum 495023730 and average 49.5024 for number of repeats 10000001 on thread id 13304
Exiting DoWork with repeats 10000002 on thread id 9752
Entering callback on thread id 9752
Callback is passed sum of 494986882
Returned with sum 494986882 and average 49.4987 for number of repeats 10000002 on thread id 13304
Exiting DoWork with repeats 10000003 on thread id 1036
Entering callback on thread id 1036
Callback is passed sum of 495006297
Returned with sum 495006297 and average 49.5006 for number of repeats 10000003 on thread id 13304
Run duration: 281.249 milliseconds
Ending program on thread id 13304

As can be seen from this output, four DoWork() invocations occur one after the other, all on different threads. This gets all four threads working in parallel. As the DoWork(s) complete, you see the trace of exiting DoWork and entering the callback. Since the callback is called by DoWork, the callback and DoWork run on the same thread. This is evident by the thread id reported. The statements beginning with “Returned with sum …” come from the main function on the main program thread. The finishing of the threads does not necessarily happen in the order the threads were launched. It is random the order the threads complete. But the reporting back on the main thread following the future::get() call will always be in the order the threads were launched. This is because future::get() blocks until the thread either finishes or throws an exception. This is not as bad as it might seem because, while it is blocking on the main thread waiting for the first thread to finish, all the other threads are running in the background. The time it takes to call future::get() on all four threads is roughly the time it takes for the longest running thread.

Here is an example when one of the threads throws an exception:

Starting program on thread id 10668
Entering DoWork with repeats 10000000 on thread id 2044
Entering DoWork with repeats 10000001 on thread id 11016
Entering DoWork with repeats 10000002 on thread id 5932
Entering DoWork with repeats 10000003 on thread id 13068
Exiting DoWork with repeats 10000001 on thread id 11016
Entering callback on thread id 11016
Callback is passed sum of 494945635
Exiting DoWork with repeats 10000003 on thread id 13068
Entering callback on thread id 13068
Callback is passed sum of 494999070
Exiting DoWork with repeats 10000000 on thread id 2044
Entering callback on thread id 2044
Callback is passed sum of 495124183
Returned with sum 495124183 and average 49.5124 for number of repeats 10000000 on thread id 10668
Returned with sum 494945635 and average 49.4946 for number of repeats 10000001 on thread id 10668
Exception caught! Message: Bad value processed. Number of repeats is 10000002

As can be seen, the third thread throws an exception. The exception is percolated to the main thread when future::get() is called on the third thread. The main()’s try..catch catches this exception, and thus you see the “Exception caught!” message. Once the exception is caught, the programs terminates, and you never get the results back from the fourth thread.

Well that concludes this tour of asynchronous programming in C++. A lot of your multi-threaded programming can fit into the asynchronous pattern nicely. When it can, using the asynchronous pattern saves coding time, saves time for other programmers to understand your code, and potentially saves time to debug errors. Happy coding!

Copyright © 2020 by Jeffery Lewis. All rights reserved.
Published by WalletCard.org.

Your input is valuable. Please comment.

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s