Skip to content
Snippets Groups Projects
Commit baf91755 authored by Oskar Lappi's avatar Oskar Lappi
Browse files

Iterators

parent 7e631219
No related branches found
No related tags found
No related merge requests found
#include <thread> #include "streaming_iterator_producer.hpp"
#include "streaming_producer.hpp" #include "streaming_producer.hpp"
#include <thread>
static constexpr size_t n_items = 10; static constexpr size_t n_items = 10;
constexpr size_t produce_duration = 20; constexpr size_t produce_duration = 20;
constexpr size_t consume_duration = 20; constexpr size_t consume_duration = 20;
using ProdStream = Producer_Streaming<produce_duration>; using ProdStream = Producer_Streaming<produce_duration>;
int main() using ProdBuffer = Producer_AsyncBuffer<produce_duration>;
{
ProdStream prod_str{}; int main() {
for (size_t i = 0; i < 100; i++){ /*
printf("\nRUN #%d\n",i); ProdBuffer prod{};
prod_str.reset(); for (size_t i = 0; i < 100; i++) {
std::thread t(&ProdStream::produce, &prod_str,n_items); printf("\nRUN #%d\n", i);
Item item; prod.reset();
do { std::thread t(&ProdBuffer::produce, &prod, n_items);
item = prod_str.consume(); Item item;
printf("Item{%d}\n",item.id); do {
} while (item.id != -1); printf("consuming... ");
t.join(); fflush(stdout);
} item = prod.consume();
printf("Item{%d}\n", item.id);
} while (item.id != -1);
t.join();
}
for (size_t i = 0; i < 100; i++) {
printf("\nRUN #%d\n", i);
auto ch = produce_items<produce_duration>(n_items);
Item item;
do {
printf("consuming... ");
fflush(stdout);
item = ch->consume();
printf("Item{%d}\n", item.id);
} while (item.id != -1);
}
*/
for (int i = 0; i < 10; i++) {
printf("\nRUN #%d\n", i);
auto ch = produce_items<produce_duration>(n_items);
for (auto item : *ch) {
printf("Item{%d}\n", item.id);
}
}
return 0; return 0;
} }
#include <chrono>
#include <iostream> #include <iostream>
#include <stdio.h> #include <stdio.h>
#include <thread>
#include <benchmark/benchmark.h> #include <benchmark/benchmark.h>
#include <fmt/core.h> #include <fmt/core.h>
#include "streaming_iterator_producer.hpp"
#include "streaming_producer.hpp" #include "streaming_producer.hpp"
#include "synchronous_producer.hpp" #include "synchronous_producer.hpp"
#include <chrono>
#include <thread>
static constexpr size_t n_items = 10; static constexpr size_t n_items = 10;
constexpr size_t produce_duration = 20; constexpr size_t produce_duration = 20;
...@@ -15,18 +16,15 @@ constexpr size_t consume_duration = 20; ...@@ -15,18 +16,15 @@ constexpr size_t consume_duration = 20;
using ProdSync = Producer_Sync<produce_duration>; using ProdSync = Producer_Sync<produce_duration>;
using ProdStream = Producer_Streaming<produce_duration>; using ProdStream = Producer_Streaming<produce_duration>;
using ProdChannel = Producer_AsyncQueue<produce_duration>;
using ProdBuffer = Producer_AsyncBuffer<produce_duration>;
static void sync_producer(benchmark::State &state) { static void sync_producer(benchmark::State &state) {
ProdSync prod_sync{}; ProdSync prod_sync{};
for (auto _ : state) { for (auto _ : state) {
std::thread t(&ProdSync::produce, &prod_sync, n_items); prod_sync.produce(n_items);
t.join();
Item item; Item item;
while (true) { while ((item = prod_sync.consume()).id != -1) {
item = prod_sync.consume();
if (item.id == -1) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration}); std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
}; };
} }
...@@ -39,11 +37,7 @@ static void stream_producer(benchmark::State &state) { ...@@ -39,11 +37,7 @@ static void stream_producer(benchmark::State &state) {
prod_str.reset(); prod_str.reset();
std::thread t(&ProdStream::produce, &prod_str, n_items); std::thread t(&ProdStream::produce, &prod_str, n_items);
Item item; Item item;
while (true) { while ((item = prod_str.consume()).id != -1) {
item = prod_str.consume();
if (item.id == -1) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration}); std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
}; };
t.join(); t.join();
...@@ -51,4 +45,68 @@ static void stream_producer(benchmark::State &state) { ...@@ -51,4 +45,68 @@ static void stream_producer(benchmark::State &state) {
} }
BENCHMARK(stream_producer)->Unit(benchmark::kMillisecond); BENCHMARK(stream_producer)->Unit(benchmark::kMillisecond);
static void channel_producer_block(benchmark::State &state) {
ProdChannel prod_chan{};
for (auto _ : state) {
prod_chan.reset();
std::thread t(&ProdChannel::produce, &prod_chan, n_items);
Item item;
t.join();
while ((item = prod_chan.consume()).id != -1) {
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
};
}
}
BENCHMARK(channel_producer_block)->Unit(benchmark::kMillisecond);
static void channel_producer(benchmark::State &state) {
ProdChannel prod_chan{};
for (auto _ : state) {
prod_chan.reset();
std::thread t(&ProdChannel::produce, &prod_chan, n_items);
Item item;
while ((item = prod_chan.consume()).id != -1) {
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
};
t.join();
}
}
BENCHMARK(channel_producer)->Unit(benchmark::kMillisecond);
static void buffer_producer(benchmark::State &state) {
ProdBuffer prod{};
for (auto _ : state) {
prod.reset();
std::thread t(&ProdBuffer::produce, &prod, n_items);
Item item;
while ((item = prod.consume()).id != -1) {
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
};
t.join();
}
}
BENCHMARK(buffer_producer)->Unit(benchmark::kMillisecond);
static void async_func_producer(benchmark::State &state) {
for (auto _ : state) {
auto ch = produce_items<produce_duration>(n_items);
for (Item item = ch->consume(); item.id != -1; item = ch->consume()) {
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
};
// delete ch;
}
}
BENCHMARK(async_func_producer)->Unit(benchmark::kMillisecond);
static void async_iterator_producer(benchmark::State &state) {
for (auto _ : state) {
auto ch = produce_items<produce_duration>(n_items);
for (auto item : *ch) {
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
};
delete ch;
}
}
BENCHMARK(async_iterator_producer)->Unit(benchmark::kMillisecond);
BENCHMARK_MAIN(); BENCHMARK_MAIN();
#ifndef ASYNC_CONTAINERS_HPP #ifndef ASYNC_CONTAINERS_HPP
#define ASYNC_CONTAINERS_HPP #define ASYNC_CONTAINERS_HPP
#include <condition_variable>
#include <deque> #include <deque>
#include <mutex> #include <mutex>
#include <semaphore>
template <typename T, T sentinel> struct AsyncQueueIterator;
// AsyncQueue will remove items as they are consumed // AsyncQueue will remove items as they are consumed
template <typename T, T sentinel> struct AsyncQueue { template <typename T, T sentinel> struct AsyncQueue {
std::mutex m; std::mutex channel_access;
std::counting_semaphore<1000> semaphore{0};
// TODO: for value semantics on AsyncQueue (q.begin())
// make all members std::shared_ptrs
std::deque<T> container; std::deque<T> container;
bool done = false; bool done = false;
void produce(T m) { void produce(T elem) {
m.lock(); std::lock_guard<std::mutex> g(channel_access);
container.push_back(m); container.push_back(elem);
m.unlock(); semaphore.release();
} }
// do we need to use std::optional? // do we need to use std::optional?
T consume() { T consume() {
T ret; T ret;
// Wait semaphore.acquire();
while (container.empty() && !done) { {
}; std::lock_guard<std::mutex> g(channel_access);
ret = container.front();
if (container.empty()) { container.pop_front();
// done
return sentinel;
} }
m.lock();
ret = container.front();
container.pop_front();
m.unlock();
return ret; return ret;
} }
void finish() { done = true; } void finish() {
produce(sentinel);
done = true;
}
void reset() { done = false; } void reset() { done = false; }
using iterator = AsyncQueueIterator<T, sentinel>;
iterator begin() { return iterator(this); }
iterator end() { return iterator(); }
};
// TODO: add constraint: T has operator ==
template <typename T, T sentinel> struct AsyncQueueIterator {
using value_type = T;
using difference_type = std::ptrdiff_t;
using pointer = T *;
using reference = T &;
using iterator_category = std::forward_iterator_tag;
AsyncQueue<T, sentinel> *queue;
value_type current;
bool end = true;
AsyncQueueIterator() = default;
explicit AsyncQueueIterator(AsyncQueue<T, sentinel> *queue_)
: queue{queue_}, end{false} {
current = queue->consume();
end = queue->done;
}
value_type operator*() const { return current; }
AsyncQueueIterator &operator++() {
current = queue->consume();
end = (current == sentinel);
return *this;
};
bool operator==(const AsyncQueueIterator &other) { return end && other.end; }
bool operator!=(const AsyncQueueIterator &other) {
return !end || !other.end;
}
}; };
// BUFFER
// AsyncBuffer will persist items // AsyncBuffer will persist items
template <typename T, T sentinel> struct AsyncBuffer { template <typename T, T sentinel> struct AsyncBuffer {
std::mutex m; std::mutex buffer_access;
std::deque container; std::condition_variable cond;
std::deque<T> container;
bool done = false; bool done = false;
void produce(T m) { void produce(T m) {
m.lock(); std::lock_guard<std::mutex> g(buffer_access);
container.push_back(m); container.push_back(m);
m.unlock(); cond.notify_all();
} }
//[] operator (?) //[] operator (?)
T get(size_t i) { T get(size_t i) {
T ret; T ret;
// Wait std::unique_lock<std::mutex> g(buffer_access);
while (container.size() <= i && !done) { cond.wait(g, [&] { return container.size() > i; });
};
if (container.size() <= i) {
// done
return sentinel;
}
// Is locking necessary here? hmm
m.lock();
ret = container[i]; ret = container[i];
m.unlock();
return ret; return ret;
} }
void finish() { done = true; } void finish() { produce(sentinel); }
void reset() { void reset() { container.clear(); }
container.clear();
done = false;
}
}; };
#endif #endif
...@@ -3,5 +3,6 @@ ...@@ -3,5 +3,6 @@
struct Item { struct Item {
int id; int id;
bool operator==(Item &rhs) const { return id == rhs.id; }
}; };
#endif #endif
...@@ -7,6 +7,27 @@ using namespace std::literals::chrono_literals; ...@@ -7,6 +7,27 @@ using namespace std::literals::chrono_literals;
#include "async_containers.hpp" #include "async_containers.hpp"
#include "items.hpp" #include "items.hpp"
template <size_t sleep_duration_ms> struct Producer_AsyncBuffer {
size_t i = 0;
Item consume() { return buffer.get(i++); }
void produce(size_t n_times) {
for (size_t i = 0; i < n_times; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
buffer.produce(Item{(int)i});
}
buffer.finish();
}
void reset() {
buffer.reset();
i = 0;
}
private:
AsyncBuffer<Item, Item{-1}> buffer;
};
template <size_t sleep_duration_ms> struct Producer_AsyncQueue { template <size_t sleep_duration_ms> struct Producer_AsyncQueue {
Item consume() { return channel.consume(); } Item consume() { return channel.consume(); }
...@@ -21,7 +42,7 @@ template <size_t sleep_duration_ms> struct Producer_AsyncQueue { ...@@ -21,7 +42,7 @@ template <size_t sleep_duration_ms> struct Producer_AsyncQueue {
void reset() { channel.reset(); } void reset() { channel.reset(); }
private: private:
AsyncQueue<Item> channel; AsyncQueue<Item, Item{-1}> channel;
}; };
template <size_t sleep_duration_ms> struct Producer_Async_Iterator { template <size_t sleep_duration_ms> struct Producer_Async_Iterator {
...@@ -42,5 +63,20 @@ template <size_t sleep_duration_ms> struct Producer_Async_Iterator { ...@@ -42,5 +63,20 @@ template <size_t sleep_duration_ms> struct Producer_Async_Iterator {
void reset() { channel.reset(); } void reset() { channel.reset(); }
private: private:
AsyncQueue<Item> channel; AsyncQueue<Item, Item{-1}> channel;
}; };
using ItemQueue = AsyncQueue<Item, Item{-1}>;
template <size_t sleep_duration_ms> ItemQueue *produce_items(size_t n_items) {
ItemQueue *ch = new ItemQueue;
std::thread t([ch, n_items]() {
for (size_t i = 0; i < n_items; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
ch->produce(Item{(int)i});
}
ch->finish();
});
t.detach();
return ch;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment