Programación concurrente (VIII): Ejercicio resuelto de paralelización

 Artículos de la serie:


Ejercicio resuelto de paralelización


Última actualización del artículo: 6 de septiembre de 2021.

Con el fin de poner en práctica algunas de las técnicas analizadas en los artículos anteriores de esta serie, nos plantearemos el problema de, dada la ruta (path) de un directorio/carpeta de archivos (directory), mostrar en consola:
  1. El número y tamaño total de los archivos contenidos en el directorio --incluyendo posibles subdirectorios--, desglosados por orden alfabético de extensión.
  2. El número total de subdirectorios.
Por supuesto, trataremos de aprovechar la concurrencia soportada por la implementación para acumular la información de forma rápida y eficiente.

Un ejemplo del tipo de informe que deseamos obtener sería el siguiente, correspondiente a un directorio con 120 archivos y seis subdirectorios (obsérvese que cronometraremos el tiempo de ejecución del programa):

     .docx:        6 files          7563084 bytes
      .enb:        2 files         13137301 bytes
      .gif:        1 files           754805 bytes
      .jpg:        5 files          1886559 bytes
     .lynx:        1 files           244525 bytes
      .mp4:       33 files       4168469883 bytes
      .pdf:       35 files         54420137 bytes
      .png:        4 files           834334 bytes
     .pptx:       11 files          5761514 bytes
      .tmp:        2 files           283993 bytes
      .txt:        2 files             1855 bytes
      .xls:       11 files           144416 bytes
     .xlsx:        3 files            49853 bytes
      .zip:        4 files         76865037 bytes

     Total:      120 files       4330417296 bytes | 6 folders [30ms]

Iniciaremos la codificación de este programa simple incluyendo las cabeceras pertinentes tanto de C++20 como de las bibliotecas auxiliares {fmt}lib y Range-v3 (el lector puede obtener el código completo del ejercicio sin más que concatenar los bloques de código de este artículo):

   #include <atomic>    #include <chrono>    #include <cstdlib>    #include <filesystem>    #include <future>    #include <map>    #include <span>    #include <thread>    #include <utility>    #include <vector>    #include <fmt/chrono.h>    #include <range/v3/core.hpp>    #include <range/v3/view/chunk.hpp>    #include <range/v3/view/subrange.hpp>    #include <range/v3/view/zip.hpp>

Definamos a continuación un agregado de datos capaz de almacenar la información relevante para cada extensión (número total de archivos y tamaño acumulado en bytes):

   struct Extension_info {       std::uintmax_t num_files, // número total de archivos con una determinada extensión                      total_size; // tamaño total en bytes acumulado por dichos archivos    };

El siguiente bloque de código almacenará en un vector estándar de nombre paths las rutas de tipo std::filesystem::path [1] asociadas a todos los archivos y subdirectorios del directorio que el usuario haya indicado a modo de segundo argumento en la línea de comandos/órdenes. Observemos, en particular, el empleo de un iterador recursivo estándar recursive_directory_iterator [2] para realizar la búsqueda de direcciones, así como el inicio de un cronómetro en el hilo principal, que detendremos al finalizar el programa:

   auto main(int argc, char* argv[]) -> int    {       if (argc != 2) {          fmt::print(stderr"invalid number of command line arguments");          return EXIT_FAILURE;       }       namespace fs = std::filesystem;       auto const root = fs::path{argv[1]};       if (!fs::is_directory(root)) {          fmt::print(stderr"second command line argument must be an actual directory");          return EXIT_FAILURE;       }       // empezamos a cronometrar la ejecución del programa:       using clock = std::chrono::steady_clock;       auto const start = clock::now();       // vector de rutas contenidas en el directorio:       auto const paths = fs::recursive_directory_iterator{root}                        | ranges::to<std::vector<fs::path>>;

Nos serviremos de un mapa estándar de nombre processed_data para asociar a cada nombre de extensión (std::string) su información asociada (Extension_info). Una variable atómica de tipo entero nos permitirá acumular el número total de subdirectorios encontrados en el directorio facilitado por el usuario, garantizando que no se produzcan data races al realizar dicho conteo de forma concurrente (en efecto, el modelo de memoria introducido por C++11 permite un proceso de escritura sobre un objeto atómico a la par que otros hilos proceden a su lectura [3]):

      using map_type = std::map<std::string, Extension_info>;       auto processed_data = map_type{};       auto num_subdirectories = std::atomic<std::uintmax_t>{};

Procediendo de forma similar a la explicada en el sexto artículo de esta serie, dividiremos el vector paths en bloques (chunks) que puedan ser analizados en paralelo por diversos hilos. Cada hilo procesará las rutas fs::path de un bloque, clasificándolas según sean archivos o subdirectorios, de forma que podamos:
  1. De tratarse de un archivo, actualizar/registrar la pareja extensión-(número de archivos, tamaño total) correspondiente en un mapa parcial auxiliar asociado a dicho bloque.
  2. De tratarse de un subdirectorio, incrementar la variable atómica compartida num_subdirectories
Dichas acciones quedan recogidas en la función lambda generate_map. El hilo principal se encargará de recolectar los distintos mapas parciales así producidos y fusionarlos en el mapa principal processed_data (véase la lambda process_map). Observemos, en particular, que no es necesario proteger el mapa principal con un mutex:

      // generación de un mapa parcial a partir de un bloque 'chunk' del vector 'paths':       auto generate_map = [&num_subdirectories](std::span<fs::path const> chunk) -> map_type {          auto res = map_type{};          for (auto& pth : chunk) {             if (fs::is_regular_file(pth)) {                auto const ext = pth.extension().string();                ++res[ext].num_files;                res[ext].total_size += fs::file_size(pth);             }             else if (fs::is_directory(pth))                ++num_subdirectories;          }          return res;       };       // fusión de un mapa parcial 'map' en el mapa principal 'processed_data':       auto process_map = [&processed_data](map_type const& map) {          for (auto const& [ext, info] : map) {             auto const [nm, sz] = info;             processed_data[ext].num_files += nm;             processed_data[ext].total_size += sz;          }       };       // número de objetos futuro a generar:       auto const hardw_threads = std::jthread::hardware_concurrency();       auto const num_futures = (hardw_threads > 2u)? hardw_threads - 1 : 1u;       // dividiremos el vector siempre que el tamaño típico de los bloques sea mayor que 50:       if (auto const max_chunk_sz = paths.size()/(num_futures + 1); max_chunk_sz > 50u) {          auto futures = std::vector<std::future<map_type>>(num_futures);          auto threads = std::vector<std::jthread>(num_futures);          // dividimos el vector y lanzamos 'num_futures' hilos adicionales para analizar los          // 'num_futures' primeros bloques:          auto chunks = paths | ranges::views::chunk(max_chunk_sz);          auto chunk_it = ranges::begin(chunks);          for (auto [f, t] : ranges::views::zip(futures, threads)) {             auto pt = std::packaged_task<map_type(decltype(*chunk_it))>{generate_map};             f = pt.get_future();             t = std::jthread{std::move(pt), *chunk_it++};          }          // los elementos restantes son analizados por el hilo principal:          do process_map(generate_map(*chunk_it++)); while (chunk_it != ranges::end(chunks));          // recopilamos la información procesada por el resto de hilos:          for (auto& f : futures)              process_map(f.get());       }       else {          // el hilo principal analiza todo el vector de paths:          processed_data = generate_map(paths);       }

Llegados a este punto, tan sólo resta imprimir en consola los resultados obtenidos siguiendo un esquema análogo al mostrado en el ejemplo facilitado al inicio de este post:

      // variables para acumular la información total del directorio:       auto dir_files = std::uintmax_t{};       auto dir_size = std::uintmax_t{};       // imprimimos la información relevante para cada extensión:       for (auto const& [ext, info] : processed_data) {          auto const [nm, sz] = info;          fmt::print("{:>10}: {:>8} files {:>16} bytes\n", ext, nm, sz);          dir_files += nm;          dir_size += sz;       }       // imprimimos la información total del directorio y el tiempo de ejecución:       using ms = std::chrono::milliseconds;       auto const duration = std::chrono::duration_cast<ms>(clock::now() - start);       fmt::print("\n{:>10}: {:>8} files {:>16} bytes | {} folders [{}]",                  "Total", dir_files, dir_size, num_subdirectories, duration);    }


Referencias bibliográficas
  1. cppreference - Filesystem path - https://en.cppreference.com/w/cpp/filesystem/path
  2. cppreference - Filesystem recursive_directory_iterator - https://en.cppreference.com/w/cpp/filesystem/recursive_directory_iterator
  3. cppreference - std::atomic - https://en.cppreference.com/w/cpp/atomic/atomic

No hay comentarios:

Publicar un comentario