Programación concurrente (VII): Queue sincronizado

  Artículos de la serie:


Queue sincronizado


Última actualización del post: 28 de agosto de 2021.

Detalle de 'The Queue at the Fish-shop'
(1944) por Evelyn M. Dunbar
https://en.wikipedia.org/wiki/Evelyn_Dunbar
En este artículo analizaremos la implementación segura bajo hilos (thread-safe) de una estructura de datos tipo cola (queue). Para ello, emplearemos dos primitivas de sincronización disponibles a partir del estándar C++11:
  • std::mutex, que permite proteger a una variable compartida del intento de acceso simultáneo por parte de múltiples hilos (véase el quinto artículo de esta serie para más detalles, así como [1]).
  • std::condition_variable, que permite bloquear uno o más hilos de ejecución hasta que otro hilo modifique una variable compartida y notifique dicha acción. Sus funciones miembro públicas notify_one y notify_all permiten desbloquear, respectivamente, uno solo de los hilos en espera o bien todos los hilos bloqueados [2].

Con el fin de simplificar nuestro estudio, deshabilitaremos las operaciones de copia y movimiento de la cola. Una codificación preliminar en C++20 se reduciría, así, a [3, 4]:

   #include <condition_variable>    #include <mutex>    #include <queue>    #include <type_traits>    #include <utility>    template<typename T>    class Sync_queue {       std::queue<T> data_;       mutable std::mutex mtx_;       std::condition_variable cv_;    public:       using value_type = T;       Sync_queue() = default;       Sync_queue(Sync_queue const&) = delete;       autooperator=(Sync_queue const&) = delete;       auto dequeue(value_type& value) -> bool       {          auto _ = std::lock_guard{mtx_};          if (data_.empty()) return false;          value = std::move(data_.front()); // posible lanzamiento de excepción          data_.pop();          return true;       }       void dequeue_when_available(value_type& value)       {          auto ulck = std::unique_lock{mtx_};          cv_.wait(ulck, [this]{ return !data_.empty(); });          value = std::move(data_.front()); // posible lanzamiento de excepción          data_.pop();       }       template<typename S>          requires std::is_convertible_v<std::decay_t<S>, value_type>       void enqueue(S&& value)       {          auto _ = std::lock_guard{mtx_};          data_.push(std::forward<S>(value));          cv_.notify_one();       }       auto empty() const -> bool       {          auto _ = std::lock_guard{mtx_};          return data_.empty();       }    };

Como podemos comprobar, la clase posee una cola estándar no-sincronizada std::queue, un objeto std::mutex --declarado mutable con el fin de poder adquirirlo/liberarlo en funciones miembro constantes-- y un objeto std::condition_variable como atributos privados. En particular, el mutex protegerá a la estructura std::queue de la posible incidencia de data races

Distinguimos cuatro funciones miembro públicas principales:
  • dequeue, que adquiere el mutex creando un objeto std::lock_guard<std::mutex> con el fin de: (a) Si la cola está vacía, retornar false directamente sin modificar la variable value pasada como argumento. (b) Si la cola no está vacía, copiar/mover el frente en la variable value, eliminar dicho frente y retornar trueObservemos que el mutex será liberado de forma automática al finalizar la función.
  • dequeue_when_available, a través de la cual cualquier hilo que trate de acceder al frente de la cola y encuentre que ésta está vacía aguardará hasta que otro hilo inserte un elemento y así se lo haga saber mediante la variable de condición. Al recibir dicha notificación, el hilo en espera podrá copiar/mover el elemento al frente de la cola en la variable value pasada como argumento, para a continuación eliminar dicho frente. Para ello, se siguen los pasos siguientes: (a) Primeramente, se adquiere un objeto std::unique_lock<std::mutex> sobre el mutex que protege la cola. (b) Se comprueba entonces que la cola no esté vacía, condición que en nuestro caso se encuentra implementada con una lambda. De resultar dicho predicado falso, el hilo liberará el mutex y suspenderá su ejecución de forma atómica, a la espera (wait) de que le sea notificada la inserción de un nuevo elemento en la cola. (c) Cuando la variable condition_variable sea notificada (o cuando acontezca un despertar espúreo [5]), el hilo será despertado, adquiriendo el mutex atómicamente. Se comprobará entonces la condición nuevamente, en caso de que el despertar hubiese sido espúreo. De ser el predicado verdadero (es decir, si la cola no está vacía), y con el mutex adquirido, el hilo copiará/moverá el frente de la cola en la variable value pasada como argumento. De no haberse emitido una excepción, dicho frente será entonces eliminado. Finalmente, el mutex será liberado de forma automática al finalizar la función.
  • enqueue, que adquiere el mutex creando un objeto std::lock_guard<std::mutex> con el fin de realizar la inserción de forma segura de un nuevo elemento en el fondo de la cola. Una vez completada con éxito esta operación, llamamos a la función notify_one de std::condition_variable para desbloquear uno de los hilos que puedan encontrarse en espera. El mutex es finalmente liberado de forma automática al finalizar la función enqueue. Notemos que, de emplear pthreads, el hecho de que la notificación se produzca con el mutex aún adquirido permite la optimización conocida como wait morphing [6].
  • empty, un predicado que adquiere momentánemante el mutex con el fin de comprobar si la cola carece de datos.

Es importante resaltar el hecho de que, en su versión concurrente, la cola deba fusionar los métodos front() y pop() de la clase estándar std::queue en una misma sección crítica. En efecto, consideremos dos hilos, cada uno de los cuales realiza una lectura estándar front() seguida de una expulsión estándar pop(). Asumamos que dichas llamadas individuales se encuentran sincronizadas con un mutex. Dado el carácter concurrente de este escenario, podría sin duda producirse una secuencia front-front-pop-pop en la que los dos hilos terminasen leyendo el mismo frente de cola y un hipotético elemento posterior se perdiera sin llegar a ser utilizado (véase [4] para más detalles). Esta posibilidad queda descartada gracias a la función dequeue y su versión de bloqueo dequeue_when_available.

El siguiente código de ejemplo inicializa un hilo productor que inserta progresivamente los números naturales {1,2,3,...} en una cola compartida. Un segundo hilo consumidor (aquél que ejecuta la función principal main) imprimirá los diez primeros naturales en la terminal, realizando operaciones dequeue_when_available de forma segura. El hilo productor, de la clase std::jthread, se verá interrumpido una vez que finalice la ejecución de main() (véase el sexto artículo de esta serie para más detalles):

   // insertar aquí el bloque de código anterior    #include <chrono>    #include <fmt/format.h>    #include <range/v3/view/indices.hpp>    #include <string>    #include <thread>    auto main() -> int    {       auto queue = Sync_queue<int>{};       // hilo productor:       auto worker_thread = std::jthread{[&queue](std::stop_token stk){        using namespace std::chrono_literals;          auto i = 0;          while (!stk.stop_requested()) {             queue.enqueue(++i);             std::this_thread::sleep_for(100ms);          }       }};       // hilo consumidor:       for (auto i = 0[[maybe_unused]] auto const _ : ranges::views::indices(10)) {          queue.dequeue_when_available(i);          fmt::print("{}\n", i);       }    }

La implementación de la cola concurrente analizada en este post ha constituido sin duda un ejercicio de fácil codificación. Por supuesto, sería posible seguir enriqueciendo su interfaz mediante nuevas funciones miembro, por ejemplo para aguardar a la extracción de elementos durante intervalos de tiempo configurables por el usuario:

   template<typename Rep, typename Period>    auto try_dequeue_for(value_type& value,   std::chrono::duration<Rep, Period> const& rel_time) -> bool    {       auto ulck = std::unique_lock{mtx_};       if (!cv_.wait_for(ulck, rel_time, [this]{ return !data_.empty(); }))          return false; // el tiempo rel_time ha expirado y la cola sigue vacía       value = std::move(data_.front());       data_.pop();       return true;    }

El lector puede consultar el diseño de estructuras más elaboradas, de clase fine-grained locking o lock-free, en [3], [4] y [7]. 
 

Referencias bibliográficas
  1. cppreference - std::mutex - https://en.cppreference.com/w/cpp/thread/mutex
  2. cppreference - std::condition_variable - https://en.cppreference.com/w/cpp/thread/condition_variable
  3. Herlihy M. y Shavit N., The Art of Multiprocessor Programming. Morgan Kaufmann, Revised edición (2012)
  4. Williams A., C++ Concurrency in Action. Manning Publications, 2nd edition (2019)
  5. Wikipedia - Spurious wakeup - https://en.wikipedia.org/wiki/Spurious_wakeup
  6. Butenhof D. R., Programming with POSIX Threads. Addison-Wesley (1997)
  7. Boost.Lockfree - https://www.boost.org/doc/libs/1_77_0/doc/html/lockfree.html

No hay comentarios:

Publicar un comentario