Programación concurrente (III)

 Artículos de la serie:


Comprendiendo std::async en más detalle


En post anteriores empezamos a acostumbrarnos al uso de objetos std::future<> en el marco de la programación concurrente. Éstos proporcionan un mecanismo de acceso a los resultados producidos por operaciones asíncronas. Su función miembro pública get(), en particular, aguarda a que el objeto std::future contenga un resultado válido (valor o excepción) para retornarlo al agente invocador.

Ignoremos por el momento la existencia de la función estándar std::async con política de lanzamiento std::launch::async --muy ligada al uso de std::future y de enorme facilidad de uso-- y planteémonos la tarea (nada trivial) de replicar su comportamiento, es decir: ejecutar una operación de forma asíncrona en un nuevo hilo independiente y remitir el valor o excepción producida durante su ejecución al hilo padre. El objetivo no es otro, por supuesto, que el de aprender más acerca de la propia implementación de std::async.

Para ello, empecemos considerando la plantilla de clase estándar std::packaged_task<> [1], un envoltorio (wrapper) para la función, expresión lambda u objeto función a ejecutar asíncronamente. El parámetro de std::packaged_task no es otro que la signatura de la función a ejecutar, cualesquiera que sean sus tipos de argumentos y de retorno. Algunos ejemplos serían std::packaged_task<int()> para una función sin argumentos que devuelva un entero, o std::packaged_task<void(std::string const&, int)> para una función que tome dos argumentos (una referencia a un string constante y un entero) y no retorne ningún valor.

std::packaged_task<> ejecutará la función que le asociemos mediante una llamada a su operador miembro público void operator(). Dicho operador deberá recibir los argumentos apropiados, a saber, los mismos que proporcionaríamos para la función asociada. Observemos sin embargo que este operador no devuelve ningún resultado: el valor o excepción generado por la llamada a la función asociada será almacenado en un estado compartido, de forma que cualquier hilo que lo esté esperando sea desbloqueado y pueda capturarlo.

Si deseamos que dicha tarea se realice asíncronamente en un hilo de ejecución independiente, deberemos mover el objeto std::packaged_task<> a un nuevo hilo creado a tal efecto (std::packaged_task<> es movible pero no copiable). La pregunta pertinente es, aquí, qué canal de comunicación deberá existir entre la operación asíncrona y el agente que creó el objeto std::packaged_task en primer lugar y que aguarda el resultado. La respuesta es, por supuesto, un objeto std::future que el propio objeto std::packaged_task proporcionará, antes de ser movido, al agente en espera. Dicho objeto std::future se obtiene invocando a la función miembro get_future() de la clase std::packaged_task. A diferencia de un objeto std::future creado por std::async, sin embargo, el obtenido mediante std::packaged_task no aguardará a que finalice la operación asíncrona al ser destruido, hecho éste que debemos tener muy presente en el resto de este artículo.

En resumen, si deseamos ejecutar una función asíncronamente en un nuevo hilo de ejecución, los pasos a seguir serían:
  1. Envolver la función en un objeto std::packaged_task con la signatura apropiada como parámetro de la plantilla.
  2. Obtener un objeto std::future que contendrá en el futuro el resultado deseado; para ello, invocamos la función get_future() a través del objeto std::packaged_task recién creado.
  3. Ejecutar la función asociada a std::packaged_task con los argumentos pertinentes en un hilo independiente, utilizando por ejemplo la clase std::jthread de C++20 [2] (véase el artículo 6 de esta serie para más detalles).
  4. Aguardar a que el hilo termine de ejecutarse --idealmente, realizando en el ínterin otras muchas operaciones independientes-- de forma segura ante excepciones (de producirse una excepción inesperada en una de dichas acciones intermedias, el destructor de std::jthread aguardará a que el hilo del punto 3 finalice su ejecución).
  5. Invocar la función get() a través del objeto std::future generado en el punto 2 para obtener finalmente el resultado (valor o excepción) de la operación asíncrona.
Un posible ejemplo de uso sería (los puntos de la lista anterior se encuentran indicados en el código):

#include <future> #include <thread> #include <fmt/format.h> auto sum(int a, int b) -> int { return a + b; } auto main() -> int {     auto task = std::packaged_task<int(int,int)>{sum}; // (1)     auto ftr = task.get_future(); // (2) canal de comunicación de tipo std::future<int>     {        auto _ = std::jthread{std::move(task), 16}; // (3) sumemos 1 y 6 en un nuevo hilo        // realizamos otras otras operaciones mientras aguardamos        // el resultado de forma segura ante excepciones...     }  // (4) el destructor de jthread invoca join() automáticamente    fmt::print("{}\n", ftr.get()); // (5) oputput: 7 }

De acuerdo con lo dicho, podríamos imitar el comportamiento de la función std::async con política de lanzamiento std::launch::async empleando las clases:
  • std::packaged_task<>,
  • std::future<>,
  • std::thread.
A tal fin, consideremos la siguiente función variádica (adaptación del Listing 4.14 de la referencia [3]), que permite la ejecución asíncrona de cualquier objeto función f independientemente de su número de argumentos args:

template<typename Func, typename... Args>      requires std::invocable<std::decay_t<Func>, std::decay_t<Args>...>   [[nodiscard]] auto async_executor(Func&& f, Args&&... args)    {       using namespace std;       using Res = invoke_result_t<decay_t<Func>, decay_t<Args>...>;       auto pt = packaged_task<Res(decay_t<Args>...)>{forward<Func>(f)};       auto ftr = pt.get_future();       thread{move(pt), forward<Args>(args)...}.detach(); // (*)       return Waiting_future{std::move(ftr)};   }

Aquí, la función detach() permite que el nuevo hilo (*) siga ejecutándose de forma independiente tras abandonar async_executor, quedando desligado por completo del objeto local std::thread que lo inicia y que, de este modo, puede ser destruido de forma segura. Recordemos que es crucial proceder así, pues el destructor de un objeto std::thread ligado a un hilo activo que no haya invocado previamente a join() o detach() llamará a std::terminate() automáticamente (véase el artículo 1 de esta serie para más detalles).

El objeto Waiting_future retornado por async_executor permitirá acceder al valor o excepción obtenida al ejecutar f(args...) en el nuevo hilo. Se trata de un mero envoltorio para std::future cuyo destructor aguarda por seguridad a que el resultado de la operación asíncrona esté disponible:

   template<typename Ret>    class Waiting_future : public std::future<Ret> {    public:       Waiting_future() noexcept = default;       Waiting_future(std::future<Ret>&& handle) noexcept          : std::future<Ret>{std::move(handle)} { }       Waiting_future(Waiting_future&&) noexcept = default;       Waiting_future& operator=(Waiting_future&&) noexcept = default;       ~Waiting_future() { if (this->valid()) this->wait(); }    };

El ejemplo anterior se reduciría entonces simplemente a:

auto sum(int a, int b) -> int { return a + b; } auto main() -> int {     auto ftr = async_executor(sum, 16);     // ...     fmt::print("{}\n", ftr.get()); // oputput: 7 }

Como hemos visto, std::packaged_task<> constituye una herramienta de bajo nivel que permite construir soluciones de más alto nivel como async_executor. El usuario ocasional encontraría sin duda más sencillo el empleo de esta última función, pero para un programador experimentado, std::packaged_task<> puede resultar una herramienta muy útil para paralelizar algoritmos de forma eficiente y segura ante excepciones, implementar piscinas de hilos, etcétera.

En efecto, tal y como se analiza en múltiples ejemplos en [3], consideremos un algoritmo que ejecute una acción específica e independiente sobre cada uno de los elementos contenidos en un rango [first,last). Varios algoritmos estándar responden a un esquema de este tipo, tales como std::accumulate(), std::fill() o std::for_each(). Es posible paralelizar estas funciones sin más que permitir que varios hilos se encarguen de forma independiente (y potencialmente en paralelo) de bloques distintos del rango (chunks). Será necesario crear, primeramente, un vector de objetos std::future<> para obtener los potenciales resultados y, en segundo lugar, un vector de objetos std::jthread para registrar los hilos iniciados. Sea num_threads el número de hilos concurrentes soportado por la implementación, obtenido mediante una llamada a la función miembro estática std::jthread::hardware_concurrency() (asumiremos por sencillez que el valor devuelto es mayor que cero). Será éste el número de bloques de trabajo. El esquema general de nuestro código tomaría entonces la forma:

   // vector para almacenar los resultados de trabajo en    // los (numThreads - 1) primeros bloques:    auto futures = vector<future< /* tipo retornado */ >>(numThreads - 1);    // vector de hilos (inicialmente vacío):    auto threads = vector<jthread>{};    // enviamos a un nuevo hilo cada uno de los (numThreads - 1) primeros    // bloques de trabajo:    for (auto& ftr : futures) {       // operaciones que dependerán del algoritmo...       auto task = packaged_task< /* signatura de la función */ >{/* nombre de la función */};       ftr = task.get_future();       threads.emplace_back(move(task), /* argumentos de la función */ );       // ...    }    // ejecutamos la función sobre el último bloque en el hilo padre,    // posiblemente adquiriendo un valor retornado    // ...adquirimos los resultados obtenidos en los restantes bloques:    for (auto& ftr : futures) {       // utilizamos de forma apropiada los valores retornados por ftr.get()       // ...    }

El uso de la estructura std::vector<std::jthread> definida al inicio del código garantiza que, de emitirse una excepción, los hilos almacenados en el vector finalicen su ejecución de forma segura.


Referencias bibliográficas
  1. cppreference - std::packaged_task - https://en.cppreference.com/w/cpp/thread/packaged_task
  2. cppreference - std::jthread - https://en.cppreference.com/w/cpp/thread/jthread
  3. Williams A., C++ Concurrency in Action. Manning Publications, 2nd edition (2019)

No hay comentarios:

Publicar un comentario