Artículos de la serie:
- Introducción básica: clase std::thread y función std::async
- Paralelización de algoritmos
- Comprendiendo std::async en más detalle
- ¿Cómo funciona std::packaged_task?
- Mecanismo de exclusión mutua (mutex)
- std::jthread: hilo cooperativamente interrumpible
- Queue sincronizado
- Ejercicio resuelto de paralelización
Paralelización de algoritmos
Basándonos en la explicación proporcionada en el primer post de esta serie, es claro que la función std::async nos permitirá paralelizar de forma directa algoritmos tales como for_each, accumulate o quick_sort.
Un aspecto fundamental a tener en cuenta a la hora de paralelizar algoritmos es el de evitar la incidencia de data races, es decir, el acceso simultáneo a la misma localización en memoria por parte de diferentes hilos, siendo al menos uno de dichos accesos de escritura. No es éste un problema con el que debamos lidiar en este post, por lo que pospondremos su análisis por el momento (véase el artículo 5 de esta serie para más detalles).
Consideremos la paralelización del bien conocido algoritmo std::accumulate, que en su sobrecarga más sencilla suma todos los elementos contenidos en un rango [first,last) a partir de un valor inicial dado [1]. Con el fin de simplificar la discusión, emplearemos la técnica de la recursividad [2]. Aunque ello no conduzca ciertamente a la implementación más eficiente (véase la discusión al final del artículo), sí proporcionará un interesante ejercicio de codificación. Asimismo, renombraremos el algoritmo como sum con el fin de clarificar su objetivo y distinguirlo de otras posibles operaciones binarias distintas a la suma [3]:
En el código anterior, mid es un iterador que referencia al elemento situado en mitad del rango. En (A) procedemos al cálculo de la suma de la primera mitad del rango --es decir, [first,mid)-- de forma recursiva y asíncrona partiendo del valor nulo T{}. Recordemos que la llamada a la función std::async devolverá, en este caso, un objeto de tipo std::future<T>, cuyo valor requeriremos más adelante. En (B), por contra, calculamos la suma de la segunda mitad del rango --es decir, [mid,last)-- de forma recursiva y síncrona partiendo del valor inicial init proporcionado como tercer argumento a la función parallel_sum. El valor almacenado en el objeto std::future<T> obtenido en (A) es requerido en (C), donde los valores acumulados para ambas mitades del rango son sumados finalmente. El caso base para la recursión se encuentra en (*): cuando la longitud del rango [first,last) es menor que un entero parallel_sz escogido por el usuario (por defecto, igual a 5'000), la suma de valores se ejecuta síncronamente a través de una llamada al algoritmo no-paralelizado sum (en caso de que la longitud sea nula, optamos por devolver directamente el valor init).
#include <concepts>
#include <future>
#include <iterator>
#include <utility>
template<std::input_iterator I, std::sentinel_for<I> S, std::movable T>
[[nodiscard]] auto sum(I first, S last, T init) -> T
{
for (; first != last; ++first)
init = std::move(init) + *first;
return init;
}
template<std::forward_iterator I, std::sentinel_for<I> S, std::movable T>
[[nodiscard]] auto parallel_sum(I first, S last, T init,
std::iter_difference_t<I> parallel_sz = 5'000) -> T
{
auto const length = std::ranges::distance(first, last);
if (length < parallel_sz)
return length? sum(first, last, std::move(init)) : init; // (*)
auto mid = first;
std::ranges::advance(mid, length/2);
auto s_1 = std::async(parallel_sum<I, S, T>, first, mid, T{}, parallel_sz); // (A)
auto const s_2 = parallel_sum(mid, last, std::move(init), parallel_sz); // (B)
return s_1.get() + s_2; // (C)
}
En el código anterior, mid es un iterador que referencia al elemento situado en mitad del rango. En (A) procedemos al cálculo de la suma de la primera mitad del rango --es decir, [first,mid)-- de forma recursiva y asíncrona partiendo del valor nulo T{}. Recordemos que la llamada a la función std::async devolverá, en este caso, un objeto de tipo std::future<T>, cuyo valor requeriremos más adelante. En (B), por contra, calculamos la suma de la segunda mitad del rango --es decir, [mid,last)-- de forma recursiva y síncrona partiendo del valor inicial init proporcionado como tercer argumento a la función parallel_sum. El valor almacenado en el objeto std::future<T> obtenido en (A) es requerido en (C), donde los valores acumulados para ambas mitades del rango son sumados finalmente. El caso base para la recursión se encuentra en (*): cuando la longitud del rango [first,last) es menor que un entero parallel_sz escogido por el usuario (por defecto, igual a 5'000), la suma de valores se ejecuta síncronamente a través de una llamada al algoritmo no-paralelizado sum (en caso de que la longitud sea nula, optamos por devolver directamente el valor init).
El código resulta seguro ante la posible emisión de excepciones. Recordemos, en efecto, que los objetos de tipo std::future retornados por std::async aguardarán al ser destruidos a que finalicen las operaciones asíncronas a ellos asociadas. Así sucederá con el objeto std::future creado en (A) de lanzarse una excepción en (B). Asimismo, si la función ejecutada asíncronamente en (A) fuese la responsable de la excepción, ésta sería capturada y relanzada en (C) a través de la llamada a la función get().
Finalmente, resaltemos el hecho de que la política de lanzamiento empleada en std::async ha sido aquélla proporcionada por defecto: std::launch::async|std::launch::deferred. De este modo, y siempre que la implementación así lo permita, std::async podrá escoger realizar algunas de las acumulaciones de forma síncrona si el número de hilos generado resultase excesivo.
Un ejemplo de uso del algoritmo parallel_sum sería el siguiente:
// situar aquí el código anterior de implementación de parallel_sum()
#include <fmt/format.h>
#include <vector>
auto main() -> int
{
auto const nums = std::vector<double>(10'000'000, 1.e-6);
fmt::print("{:.1f}", parallel_sum(nums.begin(), nums.end(), 0.0)); // output: 10.0
}
Aunque se trate de una interesante aplicación combinada del algoritmo std::async y la técnica de recursividad, no debería esperarse resultados óptimos derivados de la paralelización proporcionada en este post. En efecto, su ejecución podría adolecer fácilmente de un problema de sobresuscripción (oversubscription) si generase un número de hilos activos muy superior al número de núcleos disponible en la computadora: el programa desperdiciaría entonces un valioso tiempo de procesamiento intercalando hilos (puede consultarse la referencia [2] para una discusión más detallada). El sexto artículo de esta serie presenta una paralelización no-recursiva que puede mitigar este defecto.
Por supuesto, resulta recomendable emplear bibliotecas profesionales que proporcionen paralelizaciones altamente eficientes. C++17 y C++20 disponen de versiones paralelizadas para un elevado número de algoritmos en cabeceras como <algorithm> y <numeric> [4]. La cabecera <execution> permite establecer la política de ejecución a seguir. A modo de ejemplo, la suma del código anterior podría realizarse mediante el algoritmo estándar std::reduce [5] empleando una política std::execute::par (ejecución paralelizada) o std::execute::par_unseq (ejecución paralelizada, con cada hilo potencialmente vectorizado) [6, 7]. El siguiente programa compara el rendimiento de los algoritmos paralelizados frente a su versión secuencial std::accumulate:
#include <chrono>
#include <execution>
#include <numeric>
#include <vector>
#include <fmt/chrono.h>
auto main() -> int
{
using namespace std;
auto const nums = vector<double>(10'000'000, 1.e-6);
auto time = []<typename F>(char const* mssg, F f){
using clock = chrono::steady_clock;
using ms = chrono::milliseconds;
auto const start = clock::now();
auto const sum = f();
auto const duration = chrono::duration_cast<ms>(clock::now() - start);
fmt::print("{:>45} | sum = {:.1f} in {}\n", mssg, sum, duration);
};
time("std::accumulate",
[&nums]{ return accumulate(nums.begin(), nums.end(), 0.0); });
time("std::reduce [std::execution::par]",
[&nums]{ return reduce(execution::par, nums.begin(), nums.end()); });
time("std::reduce [std::execution::par_unseq]",
[&nums]{ return reduce(execution::par_unseq, nums.begin(), nums.end()); });
}
Así, en una computadora con procesador Intel(R) Core(TM) i3-7020U CPU 2.30GHz con 8,00 GB de memoria RAM instalada, pudieron observarse tiempos de ejecución similares a los siguientes empleando el compilador GCC 10.3 bajo el sistema MS Windows 10 (de construir el proyecto con CMake, inclúyase tbb en la lista target_link_libraries):
std::accumulate | sum = 10.0 in 19ms
std::reduce [std::execution::par] | sum = 10.0 in 12ms
std::reduce [std::execution::par_unseq] | sum = 10.0 in 7ms
Referencias bibliográficas
- cppreference - std::accumulate - https://en.cppreference.com/w/cpp/algorithm/accumulate
- Williams A., C++ Concurrency in Action. Manning Publications, 2nd edition (2019)
- Revzin B., Hoekstra C., and Song T., A Plan for C++23 Ranges - http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p2214r0.html
- cppreference - Extensions for parallelism - https://en.cppreference.com/w/cpp/experimental/parallelism
- cppreference - std::reduce - https://en.cppreference.com/w/cpp/algorithm/reduce
- cppreference - Execution policies - https://en.cppreference.com/w/cpp/algorithm/execution_policy_tag_t
- Nvidia - Accelerating Standard C++ with GPUs using stdpar - https://developer.nvidia.com/blog/accelerating-standard-c-with-gpus-using-stdpar/
No hay comentarios:
Publicar un comentario