Programando con C++20 (parte IV): std::jthread

Artículos de la serie 'Programando con C++20': Artículos de la serie 'Programación concurrente':
  1. Concepts
  2. Lambdas 'templatizadas'
  3. Funciones std::erase y std::erase_if
  4. std::jthread
  5. CPOs (customization point objects)
  1. Introducción básica: clase std::thread y función std::async
  2. Paralelización de algoritmos
  3. Comprendiendo std::async en más detalle
  4. ¿Cómo funciona std::packaged_task?
  5. Mecanismo de exclusión mutua (mutex)
  6. std::jthread: hilo cooperativamente interrumpible
  7. Queue sincronizado
  8. Ejercicio resuelto de paralelización

Última actualización del artículo: 31 de agosto de 2021.

El estándar C++20 proporciona un nuevo hilo cooperativamente interrumpible llamado std::jthread [1]. Éste dispone de la misma funcionalidad que la clase std::thread (véase el primer artículo de esta serie), pero añade las habilidades adicionales de:

  • Realizar su unión (join) automáticamente durante su destrucción.
  • Permitir que se solicite su cancelación.

Un objeto jthread posee una variable interna privada de tipo std::stop_source [2] que mantiene un estado compartido de parada. Se trata del mecanismo a partir del cual poder realizar una solicitud de interrupción. Consideremos el constructor explícito de la clase std::jthread, de signatura:

   template<typename Func, typename... Args>    explicit jthread(Func&& f, Args&&... args);

donde f es el objeto invocable a ejecutar en el nuevo hilo y args... sus argumentos, a excepción del primero si éste fuese de clase std::stop_token [3]. En el caso concreto en que la función f tome un objeto std::stop_token [3] como primer argumento (típicamente por valor), éste será remitido por el propio constructor de jthread. En efecto, en tal caso el nuevo hilo de ejecución empezará invocando:

   std::invoke(decay_copy(std::forward<Func>(f)),                get_stop_token(),                decay_copy(std::forward<Args>(args))...);

con decay_copy definido como en el primer artículo artículo de esta serie. Aquí, la llamada por defecto a la función get_stop_token() proporcionará a la función f un token de parada stop_token asociado al estado interno stop_source del objeto jthread. El objeto stop_token será el agente a emplear por la función f para comprobar si la cancelación del hilo es requerida durante su ejecución y poder proceder así a retornar (se proporciona un ejemplo específico más abajo). Para tal fin, la clase std::jthread posee una función miembro pública request_stop() cuya invocación emitirá una solicitud de interrupción al estado interno (si ello no hubiese ocurrido ya previamente). El predicado stop_requested() de la clase std::stop_token, por su parte, será el encargado de comprobar si se ha realizado tal solicitud.

De forma notable, al final de su tiempo de vida, un hilo jthread para el que joinable() sea cierto ejecutará automáticamente una solicitud request_stop() para detener su función f e invocará después a join() con el fin de aguardar a que ésta finalice su ejecución. La llamada a request_stop() no tendrá efecto si ya se hubiese requerido la interrupción del hilo previamente. El modo de funcionamiento del destructor de std::jthread contrasta, pues, con el de la clase std::thread (C++11), que no invoca a join().

Conviene resaltar que si el objeto jthread es iniciado con una función f que no toma un token de parada como primer argumento, el hilo empezará invocando simplemente (obsérvese la ausencia de llamada a get_stop_token()):

   std::invoke(decay_copy(std::forward<Func>(f)),                decay_copy(std::forward<Args>(args))...);

En cualquier caso, el destructor invocará automáticamente a join() cuando el hilo sea destruido.

El siguiente código ejemplifica el modo básico de funcionamiento del token de parada. En él, un hilo std::jthread imprime mensajes en la terminal de forma regular hasta que el objeto sale fuera de ámbito y la tarea recibe una señal de finalización por parte del destructor:

   {       using namespace std::chrono_literals;       // lambda para evitar que las salidas de texto en diferentes hilos se intercalen:       auto println = [mtx = std::mutex{}](char const* txt) mutable {           auto _ = std::lock_guard{mtx};          std::cout << txt << '\n';       };       // hilo que imprime mensajes regularmente en consola hasta que es detenido:       auto worker = std::jthread{[&println](std::stop_token stp){          while (!stp.stop_requested()) {             println("tarea en curso...");             std::this_thread::sleep_for(200ms);          }          println("tarea recibe la solicitud de parada y finaliza");       }};       // dejamos que el hilo 'worker' trabaje por un tiempo antes de hacerlo salir de ámbito:       std::this_thread::sleep_for(600ms);       println("destructor de jthread solicita parar y unirse...");    } // llamada automática a worker.request_stop(), seguida de worker.join()

Así, un posible resultado del código anterior vendría dado por la siguiente secuencia de mensajes:

tarea en curso...   

tarea en curso...

tarea en curso...

destructor de jthread solicita parar y unirse...

tarea en curso...

tarea recibe la solicitud de parada y finaliza


La gestión del estado interno std::stop_source merece sin duda un análisis más detallado, que pospondremos hasta un próximo artículo. En este post, trataremos de mostrar las ventajas de programación que se derivan de la invocación automática de join() durante la destrucción de un hilo std::jthread. A este fin, nos propondremos codificar una versión paralelizada del algoritmo for_each, el cual aplicará una función unaria a cada elemento de un rango dado sirviéndose de distintos hilos concurrentes.

En primer lugar, incluiremos las cabeceras necesarias para la implementación del algoritmo y estableceremos su signatura (el lector puede concatenar los siguientes bloques de código con el fin de disponer de su desarrollo completo):

   #include <algorithm>    #include <concepts>    #include <future>    #include <iterator>    #include <ranges>    #include <thread>    #include <type_traits>    #include <utility>    #include <range/v3/core.hpp>    #include <range/v3/view/chunk.hpp>    #include <range/v3/view/zip.hpp>    namespace parallel {       template<          std::forward_iterator I, // algoritmo multi-pase unidireccional          std::sentinel_for<I> S,          typename Proj = std::identity,          std::indirectly_unary_invocable<std::projected<I, Proj>> F       >       requires std::same_as<          std::invoke_result_t<F&, typename std::projected<I, Proj>::value_type&>,          void       >       void for_each(I first, S last, F f, Proj proj = {}, int chunk_sz = 1'000)       {          namespace rn = ranges;          namespace rv = rn::views;

El algoritmo recibe un iterador first que marca el inicio del rango sobre el que actuar, así como un centinela last para interrumpir la iteración. Como tercer y cuarto argumentos del algoritmo figuran la función f a aplicar a cada elemento contenido en el rango, así como, opcionalmente, una proyección proj que aplicar a dichos elementos. Notemos aquí la importancia de garantizar que no se produzcan data races con la función f. La cláusula requires comprueba en tiempo de compilación que la función unaria f posea void como tipo de retorno.

Observemos que, en su versión paralelizada, for_each será implementado como una función multipase unidireccional, en contraste con su versión no-paralelizada, codificada de forma natural como una función unidireccional de un solo pase [4]. En efecto, la versión paralelizada subdividirá el rango de elementos [first,last) en bloques (chunks) y remitirá copias de los iteradores delimitadores de cada uno de ellos a hilos de ejecución independientes, que serán los encargados de aplicar la función f a los elementos correspondientes (véase [5] para más detalles). Todo ello justifica que el iterador first deba cumplir el concepto std::forward_iterator [6]

El quinto argumento del algoritmo (entero chunk_sz) establece el número típico de elementos que cada bloque debería contener como mínimo para que la paralelización del algoritmo se haga efectiva, punto éste que clarificaremos a continuación.

En el próximo segmento de código, averiguaremos el número de hilos concurrentes soportado por la implementación, en base al cual estableceremos el número de hilos totales num_threads a emplear por parte del algoritmo (incluido aquél en que se ejecute parallel::for_each). La longitud máxima max_chunk_sz de un bloque vendrá establecida por el cociente de enteros distance/num_threads. Si ésta fuese menor que el argumento chunk_sz (1'000 por defecto), optaríamos por aplicar la versión no-paralelizada de for_each a todo el rango:

         auto const hardw_threads = std::jthread::hardware_concurrency();          auto const num_threads = (hardw_threads > 2u)? hardw_threads : 2u;          auto const distance = rn::distance(first, last);          auto const max_chunk_sz = distance / num_threads;          if (max_chunk_sz < chunk_sz) {             std::ranges::for_each(first, last, f, proj);             return;          }

El uso de la biblioteca Range-v3 nos permitirá codificar fácilmente la división del rango [first,last) mediante la vista ranges::views::chunk [7], la cual, dado el número max_chunk_sz, producirá un nuevo rango de rangos contiguos donde cada rango interno poseerá max_chunk_sz elementos contiguos (el rango final podrá tener un número menor de elementos). Sea m el número de bloques en que queda dividido el rango [first,last). Se cumplirá entonces que la diferencia m-num_threads es igual a cero si la longitud del rango es múltiplo entero del numero de hilos, o uno en caso contrario. De existir dicho bloque extra, éste tendrá entre uno y num_threads-1 elementos. Permitiremos que num_threads-1 hilos std::jthread se encarguen de aplicar la función f a los elementos de los primeros num_threads-1 bloques, mientras que el hilo padre --aquél que se encuentra ejecutando la llamada a parallel::for_each-- se hará cargo de los restantes elementos (uno o dos bloques). 

Crearemos dos vectores, uno de ellos conteniendo los objetos std::jthread y el otro almacenando objetos std::future<void> como mecanismos de acceso a los resultados de dichas operaciones asíncronas. El hecho de que los elementos de estos dos vectores se encuentren en correspondencia uno-a-uno invita a visitarlos por parejas con la vista ranges::views::zip [8]:

         auto futures = std::vector<std::future<void>>(num_threads - 1);          auto threads = std::vector<std::jthread>(num_threads - 1);          auto chunks = rn::subrange{first, last} | rv::chunk(max_chunk_sz);          auto chunk_it = rn::begin(chunks);          for (auto [ftr, thr] : rv::zip(futures, threads)) {             auto pt = std::packaged_task<void()>{                [&, it = chunk_it++]{ std::ranges::for_each(*it, f, proj); }             };              ftr = pt.get_future();             thr = std::jthread{std::move(pt)};          }          do { std::ranges::for_each(*chunk_it++, f, proj); } while (chunk_it != rn::end(chunks));

Aguardaremos, por último, a que todos los hilos concurrentes hayan finalizado de forma exitosa, o bien a que alguno de ellos emita una excepción que relanzar desde nuestro algoritmo:

         for (auto& ftr : futures)             ftr.get();       } // parallel::for_each FUNCTION

La invocación atomática de join() por parte del destructor de std::jthread garantiza la seguridad ante excepciones de nuestro algoritmo. Por ejemplo, de producirse la emisión de una excepción por parte de alguna de las llamadas get() del bloque de código anterior, el proceso de desenredo de la pila provocará la destrucción del vector de hilos, lo que a su vez conducirá a la destrucción/unión de los hilos concurrentes.

Por último, resulta inmediato codificar una sobrecarga del algoritmo que permita operar directamente sobre rangos, en sustitución de las parejas de argumentos first y last:

      template<          std::ranges::forward_range R,          typename Proj = std::identity,          std::indirectly_unary_invocable<std::projected<std::ranges::iterator_t<R>, Proj>> F       >       requires std::same_as<          std::invoke_result_t<F&, typename std::projected<std::ranges::iterator_t<R>,                                                            Proj>::value_type&>,          void       >       void for_each(R&& r, F f, Proj proj = {}, int chunk_sz = 1'000)       {          for_each(std::ranges::begin(r), std::ranges::end(r),                   std::move(f), std::move(proj), chunk_sz);       }    } // parallel NAMESPACE

Con el fin de comprobar la correcta implementación del algoritmo y, en particular, el adecuado manejo de proyecciones, proporcionamos el siguiente código de ejemplo, que inicializa un vector con un millón de parejas idénticas de enteros {1,2}. Los primeros elementos de dichas parejas son entonces multiplicados por dos de forma paralela y posteriormente acumulados. El resultado de la suma (dos millones) se muestra finalmente en la terminal junto al tiempo de ejecución del bucle for_each encargado de la multiplicación:

   // introduce aquí los bloques de código anteriores    #include <chrono>    #include <fmt/chrono.h>    #include <numeric>    #include <vector>    #include <range/v3/numeric/accumulate.hpp>    struct S { int a, b; };    auto main() -> int    {       using clock = std::chrono::steady_clock;       using microsec = std::chrono::microseconds;       auto v = std::vector<S>(1'000'000, {12});       auto const start = clock::now();       parallel::for_each(v, [](int& i){ i *= 2; }, &S::a);       auto const duration = std::chrono::duration_cast<microsec>(clock::now() - start);       fmt::print("{} in {}\n", ranges::accumulate(v, 0, ranges::plus{}, &S::a), duration);  // output: 2000000 in ----µs    }

Referencias bibliográficas
  1. cppreference - std::jthread - https://en.cppreference.com/w/cpp/thread/jthread
  2. cppreference - std::stop_source - https://en.cppreference.com/w/cpp/thread/stop_source
  3. cppreference - std::stop_token - https://en.cppreference.com/w/cpp/thread/stop_token
  4. cppreference - std::for_each - https://en.cppreference.com/w/cpp/algorithm/for_each
  5. Williams A., C++ Concurrency in Action. Manning Publications, 2nd edition (2019)
  6. cppreference - std::forward_iterator - https://en.cppreference.com/w/cpp/iterator/forward_iterator
  7. Range-v3 - chunk view - https://github.com/ericniebler/range-v3/blob/master/include/range/v3/view/chunk.hpp
  8. Range-v3 - zip view - https://github.com/ericniebler/range-v3/blob/master/include/range/v3/view/zip.hpp

No hay comentarios:

Publicar un comentario