Skip to content
Snippets Groups Projects
streaming_iterator_producer.hpp 1.13 KiB
Newer Older
Oskar Lappi's avatar
Oskar Lappi committed
#include <chrono>
#include <deque>
#include <mutex>
#include <thread>
using namespace std::literals::chrono_literals;

#include "async_containers.hpp"
#include "items.hpp"

template <size_t sleep_duration_ms> struct Producer_AsyncQueue {
  Item consume() { return channel.consume(); }

  void produce(size_t n_times) {
    for (size_t i = 0; i < n_times; i++) {
      std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
      channel.produce(Item{(int)i});
    }
    channel.finish();
  }

  void reset() { channel.reset(); }

private:
  AsyncQueue<Item> channel;
};

template <size_t sleep_duration_ms> struct Producer_Async_Iterator {
  // TODO: implement forward input iterator interface
  // TODO: decide if that's here, or a separate struct
  // operator*() {}
  // operator++() {}
  Item consume() { return channel.consume(); }

  void produce(size_t n_times) {
    for (size_t i = 0; i < n_times; i++) {
      std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
      channel.produce(Item{(int)i});
    }
    channel.finish();
  }

  void reset() { channel.reset(); }

private:
  AsyncQueue<Item> channel;
};