Skip to content
Snippets Groups Projects
streaming_producer.hpp 880 B
Newer Older
#include <chrono>
#include <deque>
Oskar Lappi's avatar
Oskar Lappi committed
#include <mutex>
#include <thread>
using namespace std::literals::chrono_literals;

#include "items.hpp"

Oskar Lappi's avatar
Oskar Lappi committed
template <size_t sleep_duration_ms> struct Producer_Streaming {
  Item consume() {
    Item ret;

    while (!done && channel.empty()) {
      std::this_thread::sleep_for(100us);
    }
    if (done && channel.empty()) {
      return Item{-1};
    }
    m.lock();
    ret = channel.front();
    channel.pop_front();
    m.unlock();
    return ret;
  }

  void produce(size_t n_times) {
    done = false;
    for (size_t i = 0; i < n_times; i++) {
      std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
      m.lock();
      channel.push_back(Item{(int)i});
      m.unlock();
    }
    done = true;
  }

  void reset() { done = false; }

private:
Oskar Lappi's avatar
Oskar Lappi committed
  bool done = false;
  std::mutex m;
  std::deque<Item> channel;