Programación concurrente (I)

 Artículos de la serie:


Última actualización de la serie: 14 de agosto de 2021

Introducción básica: clase std::thread y función std::async


El estándar ISO C++ 2011 introdujo la clase std::thread [1] en la biblioteca <thread> con el fin de permitir la creación de nuevos hilos de ejecución, de forma que los distintos hilos de un proceso puedan llevarse a efecto de forma concurrente. En particular, dicha clase proporciona un constructor variádico de signatura

   template<typename Func, typename ...Args>    explicit thread(Func&& f, Args&&... args);

que iniciará un nuevo hilo de ejecución, desde el cual se realizará una llamada a la función f con los argumentos args. Es importante tener presente que tanto dicha función como sus argumentos serán copiados o movidos por valor por el constructor de std::thread, por lo que de desear trabajar con referencias a los mismos sería necesario emplear wrappers tales como std::ref o std::cref (se proporcionará un código de ejemplo sobre este punto más adelante). En efecto, la llamada a f realizada por el nuevo hilo resulta análoga a:

   std::invoke(decay_copy(std::forward<Func>(f)), decay_copy(std::forward<Args>(args))...);

donde

   template<typename T>     auto decay_copy(T&& v) noexcept(std::is_nothrow_convertible_v<T, std::decay_t<T>>)        -> std::decay_t<T>    {       return std::forward<T>(v);    }

Se entiene aquí que las llamadas a decay_copy son evaluadas en el contexto del caller, de forma que cualquier excepción resultado de la copia/movimiento de la función f o sus argumentos args será recibida por el hilo que llame al constructor de std::thread (que identificaremos desde ahora como hilo padre) sin que llegue a iniciarse el nuevo hilo. Tal y como hacíamos notar, la función f es pues invocada con argumentos prvalues.

Distinguimos varias funciones miembro públicas de la clase std::thread de interés:
  • joinable(), un predicado que retorna true si el objeto std::thread se encuentra asociado a un hilo de ejecución activo (false en caso contrario).
  • join(), que bloquea el hilo padre hasta que el hilo identificado por *this finalice su ejecución.
  • detach(), que desliga el hilo de ejecución del objeto std::thread, lo que permite que su ejecución continúe de forma independiente.
Tras una llamada a join() o detach(), joinable() retornará false.

De forma notable, la clase std::thread no implementa la técnica RAII, por lo que el programador es responsable de garantizar que un objeto de dicha clase que poseea un hilo activo asociado llame explícitamente a join() o detach() antes de su destrucción. De hecho, si en el momento de la destrucción de un objeto std::thread éste tuviese aún un hilo asociado (joinable()==true), se invocaría std::terminate() automáticamente. Cabe señalar aquí que el estándar C++20 proporciona un nuevo tipo de hilo std::jthread cuyo destructor realiza una llamada automática a join() de ser éste unible. Puede consultarse más detalles en artículo 6 de esta serie.

Asimismo, debemos tener en cuenta que cualquier resultado retornado por la ejecución de f es ignorado por el nuevo hilo, y que la posible emisión de una excepción por parte de aquélla provocaría una llamada a std::terminate().

A modo de ejemplo de empleo de std::thread, supongamos que deseamos realizar una tarea cuyo resultado no sea inmediatamente necesario en el momento de iniciar la operación, sino que podamos posponer su uso hasta un punto posterior del código y realizar, mientras tanto, otras muchas operaciones independientes. El siguiente código en C++20 crea un vector de enteros de grandes dimensiones, para a continuación realizar la suma de todas sus entradas en un hilo de ejecución independiente del principal:

   #include <concepts>    #include <functional>    #include <iterator>    #include <thread>    #include <vector>    #include <utility>    #include <fmt/format.h>    template<std::input_iterator I, std::sentinel_for<I> S, std::movable T>    [[nodiscard]] auto sum(I first, S last, T init) -> T    {       for (; first != last; ++first)          init = std::move(init) + *first;       return init;    }    auto main() -> int    {       using Vect = std::vector<int>;       auto const nums = Vect(1'000'0001); // un millón de enteros iguales a 1              auto total = 0; // entero para acumular la suma de elementos del vector       auto f = [](Vect const& v, int& t) noexcept { t = sum(v.begin(), v.end(), 0); };       // creamos un hilo para que registre en 'total' la suma de todos los enteros del vector:       auto thr = std::thread{f, std::ref(nums), std::ref(total)};              // mientras el hilo trabaja, realizamos otras operaciones, con la precaución de no usar       // 'total' y provocar un data race (véase el artículo 5 de esta serie)...       // cuando necesitemos la suma, aguardamos a que el hilo finalice su ejecución:       thr.join();       // imprimimos el resultado en la terminal:       fmt::print("Suma total: {}\n", total); // output> Suma total: 1000000    }

A continuación, analizaremos la función estándar std::async como mecanismo especialmente conveniente a través del cual un nuevo hilo puede comunicar el resultado de su ejecución (ya sea éste un valor o una excepción) al hilo padre.



En efecto, el estándar C++11 incluyó la plantilla de función std::async para la ejecución asíncrona de funciones, potencialmente a través de nuevos hilos de ejecución. De forma notable, std::async devuelve, en su punto de invocación, un objeto de tipo std::future<> que contendrá, en el futuro, el resultado de la llamada a la función (ya sea un valor del tipo esperado o una excepción).

El estándar C++20 proporciona dos sobrecargas de std::async definidas como [2]:

   namespace std {       template<typename Func, typename ...Args>       [[nodiscard]] auto async(Func&& f, Args&&... args)          -> std::future<std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>>;       template<typename Func, typename ...Args>       [[nodiscard]] auto async(std::launch policy, Func&& f, Args&&... args)          -> std::future<std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>>;    }

Su modo de uso puede clarificarse mediante una adecuada recodificación del ejemplo anteriormente descrito para std::thread:

#include <concepts> #include <future> // contiene std::async y std::future #include <iterator> #include <vector>    #include <utility> #include <fmt/format.h> template<std::input_iterator I, std::sentinel_for<I> S, std::movable T>    [[nodiscard]] auto sum(I first, S last, T init) -> T {     for (; first != last; ++first)       init = std::move(init) + *first;       return init; } auto main() -> int {      using Vect = std::vector<int>;      using Iter = Vect::iterator;      using Type = Vect::value_type;     auto const v = Vect(1'000'0001); // (A) un millón de enteros iguales a 1      auto ftr = std::async(/* política de lanzamiento por establecer */, sum<Iter, Iter, Type>, v.begin(), v.end(), 0); // (B)
     // realizamos otras operaciones... (C)      // cuando realmente necesitemos la suma, requerimos el resultado:      fmt::print("Suma total: {}\n", ftr.get()); // (D) output> Suma total: 1000000 }

En el código anterior, creamos nuevamente un vector de enteros de grandes dimensiones en (A) y calculamos la suma de todos sus elementos de forma asíncrona en (B). Mientras no sea necesario utilizar dicho resultado, procedemos a realizar otras muchas operaciones independientes (C).

Analicemos la sintaxis empleada para std::async en (B):
  • El primer argumento, de carácter opcional, especifica la política de lanzamiento para la tarea a ejecutar (ver más detalles a continuación).
  • En segundo argumento contiene el nombre de la función a ejecutar: en el ejemplo, una instancia apropiada de la plantilla sum.
  • Posteriormente, se proporciona la lista de argumentos para la tarea a ejecutar: los iteradores que limitan el rango [first,last) con el que operar y el valor inicial init de la suma.
En este caso, std::async devuelve un objeto de tipo std::future<int> que contendrá la suma numérica de los enteros en el vector. Notemos, en particular, que el parámetro del objeto std::future<> es coincidente con el tipo retornado por la función de acumulación. El resultado de la operación se obtiene mediante la llamada a la función miembro pública get(). Es importante remarcar que, de producirse una excepción en la función ejecutada asíncronamente, ésta sería capturada y relanzada en la invocación de get().

Cabe distinguir dos políticas de lanzamiento fundamentales para std::async:
  • std::launch::deferred: La función se ejecuta cuando su resultado es requerido por primera vez (lazy evaluation). En el ejemplo anterior, la ejecución se iniciaría en el punto (D) cuando el objeto ftr invoca a la función get().
  • std::launch::async: Se genera un nuevo hilo para ejecutar la función de forma asíncrona, como si inicializáramos un objeto std::thread (ver figura inferior), si bien el resultado de la operación es almacenado en el estado compartido accesible mediante el objeto std::future. La función get() invocada en el punto (D) aguardaría si fuera necesario a que el resultado terminase de calcularse antes de retornarlo finalmente.
    En la imagen, se muestra la creación de un nuevo hilo de ejecución para la operación f(args...) desde la función principal. La línea discontinua representa el posible bloqueo del hilo principal desde la llamada a get() hasta la obtención de un resultado válido (ya sea un valor o una excepción).

    En ambos casos, los argumentos de la función std::async serán copiados/movidos por valor tal y como sucedía en el constructor variádico de std::thread.

    Si no especificamos una política de lanzamiento, std::async utilizará por defecto la combinación

    std::launch::async | std::launch::deferred 

    de modo que dependerá del sistema el que la función se ejecute o no en un nuevo hilo. Si deseamos explicitar una política de lanzamiento diferente, ésta debe indicarse como primer argumento de std::async. Por ejemplo, en el código anterior, podríamos exigir la generación de un nuevo hilo en la forma:

      auto ftr = std::async(std::launch::async, sum<Iter, Iter, Type>, v.begin(), v.end(), 0);

    La sintaxis de la invocación anterior puede simplificarse ligeramente mediante el uso de una expresión lambda:

      auto ftr = std::async(std::launch::async, [&]{ return sum(v.begin(), v.end(), 0); });

    Debemos resaltar, por último, que a diferencia de objetos std::future obtenidos en otros contextos, todos aquéllos retornados por llamadas a std::async aguardarán al ser destruidos (por ejemplo como consecuencia del lanzamiento de una excepción) a que las operaciones asíncronas asociadas hayan finalizado.


    Referencias bibliográficas
    1. cppreference - std::thread - https://en.cppreference.com/w/cpp/thread/thread
    2. cppreference - std::async - https://en.cppreference.com/w/cpp/thread/async

    2 comentarios: