Skip to content
Snippets Groups Projects
Commit 7e631219 authored by Oskar Lappi's avatar Oskar Lappi
Browse files

Started on iterator design

parent d7a93ae4
No related branches found
No related tags found
No related merge requests found
BasedOnStyle: LLVM
AlignAfterOpenBracket: true
AlignConsecutiveAssignments: AcrossComments
AlignConsecutiveDeclarations: AcrossComments
AlignConsecutiveMacros: AcrossComments
AlignOperands: AlignAfterOperator
AllowShortBlocksOnASingleLine: Always
AllowShortCaseLabelsOnASingleLine: true
AlwaysBreakAfterReturnType: AllDefinitions
BinPackParameters: false
BreakBeforeBinaryOperators: All
BreakBeforeTernaryOperators: false
BreakBeforeBraces: WebKit
ColumnLimit: 120
ConstructorInitializerIndentWidth: 2
ContinuationIndentWidth: 2
Cpp11BracedListStyle: true
EmptyLinesBeforeAccessModifier: Always
IndentPPDirectives: AfterHash
IndentWidth: 4
PenaltyBreakAssignment: 4000
PointerAlignment: Left
SpaceAfterCStyleCase: true
UseTab: Never
#include <stdio.h>
#include <iostream> #include <iostream>
#include <stdio.h>
#include <benchmark/benchmark.h> #include <benchmark/benchmark.h>
#include <fmt/core.h> #include <fmt/core.h>
#include "synchronous_producer.hpp"
#include "streaming_producer.hpp" #include "streaming_producer.hpp"
#include "synchronous_producer.hpp"
#include <chrono> #include <chrono>
#include <thread> #include <thread>
static constexpr size_t n_items = 10; static constexpr size_t n_items = 10;
constexpr size_t produce_duration = 20; constexpr size_t produce_duration = 20;
constexpr size_t consume_duration = 20; constexpr size_t consume_duration = 20;
...@@ -17,32 +16,38 @@ constexpr size_t consume_duration = 20; ...@@ -17,32 +16,38 @@ constexpr size_t consume_duration = 20;
using ProdSync = Producer_Sync<produce_duration>; using ProdSync = Producer_Sync<produce_duration>;
using ProdStream = Producer_Streaming<produce_duration>; using ProdStream = Producer_Streaming<produce_duration>;
static void sync_producer(benchmark::State& state) { static void sync_producer(benchmark::State &state) {
ProdSync prod_sync{}; ProdSync prod_sync{};
for (auto _ : state) { for (auto _ : state) {
std::thread t(&ProdSync::produce, &prod_sync, n_items); std::thread t(&ProdSync::produce, &prod_sync, n_items);
t.join(); t.join();
Item item; Item item;
do { while (true) {
item = prod_sync.consume(); item = prod_sync.consume();
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration}); if (item.id == -1) {
} while (item.id != -1); break;
} }
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
};
}
} }
BENCHMARK(sync_producer)->Unit(benchmark::kMillisecond); BENCHMARK(sync_producer)->Unit(benchmark::kMillisecond);
static void stream_producer(benchmark::State& state) { static void stream_producer(benchmark::State &state) {
ProdStream prod_str{}; ProdStream prod_str{};
for (auto _ : state) { for (auto _ : state) {
prod_str.reset(); prod_str.reset();
std::thread t(&ProdStream::produce, &prod_str,n_items); std::thread t(&ProdStream::produce, &prod_str, n_items);
Item item; Item item;
do { while (true) {
item = prod_str.consume(); item = prod_str.consume();
std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration}); if (item.id == -1) {
} while (item.id != -1); break;
t.join(); }
} std::this_thread::sleep_for(std::chrono::milliseconds{consume_duration});
};
t.join();
}
} }
BENCHMARK(stream_producer)->Unit(benchmark::kMillisecond); BENCHMARK(stream_producer)->Unit(benchmark::kMillisecond);
......
#ifndef ASYNC_CONTAINERS_HPP
#define ASYNC_CONTAINERS_HPP
#include <deque>
#include <mutex>
// AsyncQueue will remove items as they are consumed
template <typename T, T sentinel> struct AsyncQueue {
std::mutex m;
std::deque<T> container;
bool done = false;
void produce(T m) {
m.lock();
container.push_back(m);
m.unlock();
}
// do we need to use std::optional?
T consume() {
T ret;
// Wait
while (container.empty() && !done) {
};
if (container.empty()) {
// done
return sentinel;
}
m.lock();
ret = container.front();
container.pop_front();
m.unlock();
return ret;
}
void finish() { done = true; }
void reset() { done = false; }
};
// AsyncBuffer will persist items
template <typename T, T sentinel> struct AsyncBuffer {
std::mutex m;
std::deque container;
bool done = false;
void produce(T m) {
m.lock();
container.push_back(m);
m.unlock();
}
//[] operator (?)
T get(size_t i) {
T ret;
// Wait
while (container.size() <= i && !done) {
};
if (container.size() <= i) {
// done
return sentinel;
}
// Is locking necessary here? hmm
m.lock();
ret = container[i];
m.unlock();
return ret;
}
void finish() { done = true; }
void reset() {
container.clear();
done = false;
}
};
#endif
...@@ -2,6 +2,6 @@ ...@@ -2,6 +2,6 @@
#define ITEM_HPP #define ITEM_HPP
struct Item { struct Item {
int id; int id;
}; };
#endif #endif
#include <chrono>
#include <deque>
#include <mutex>
#include <thread>
using namespace std::literals::chrono_literals;
#include "async_containers.hpp"
#include "items.hpp"
template <size_t sleep_duration_ms> struct Producer_AsyncQueue {
Item consume() { return channel.consume(); }
void produce(size_t n_times) {
for (size_t i = 0; i < n_times; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
channel.produce(Item{(int)i});
}
channel.finish();
}
void reset() { channel.reset(); }
private:
AsyncQueue<Item> channel;
};
template <size_t sleep_duration_ms> struct Producer_Async_Iterator {
// TODO: implement forward input iterator interface
// TODO: decide if that's here, or a separate struct
// operator*() {}
// operator++() {}
Item consume() { return channel.consume(); }
void produce(size_t n_times) {
for (size_t i = 0; i < n_times; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
channel.produce(Item{(int)i});
}
channel.finish();
}
void reset() { channel.reset(); }
private:
AsyncQueue<Item> channel;
};
#include <mutex>
#include <thread>
#include <chrono> #include <chrono>
#include <deque> #include <deque>
#include <mutex>
#include <thread>
using namespace std::literals::chrono_literals; using namespace std::literals::chrono_literals;
#include "items.hpp" #include "items.hpp"
template<size_t sleep_duration_ms> template <size_t sleep_duration_ms> struct Producer_Streaming {
struct Producer_Streaming { Item consume() {
Item consume(){ Item ret;
Item ret;
while (!done && channel.empty()) {
while(!done && channel.empty()){ std::this_thread::sleep_for(100us);
std::this_thread::sleep_for(10ms); }
} if (done && channel.empty()) {
if (done && channel.empty()){ return Item{-1};
return Item{-1}; }
} m.lock();
m.lock(); ret = channel.front();
ret = channel.front(); channel.pop_front();
channel.pop_front(); m.unlock();
m.unlock(); return ret;
return ret; }
};
void produce(size_t n_times) {
done = false;
for (size_t i = 0; i < n_times; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
m.lock();
channel.push_back(Item{(int)i});
m.unlock();
}
done = true;
}
void reset() { done = false; }
void produce(size_t n_times)
{
done = false;
for (size_t i = 0; i < n_times; i++){
std::this_thread::sleep_for(std::chrono::milliseconds{sleep_duration_ms});
m.lock();
channel.push_back(Item{(int)i});
m.unlock();
}
done = true;
}
void reset(){
done = false;
}
private: private:
bool done = false; bool done = false;
std::mutex m; std::mutex m;
std::deque<Item> channel; std::deque<Item> channel;
}; };
...@@ -11,7 +11,7 @@ struct Producer_Sync { ...@@ -11,7 +11,7 @@ struct Producer_Sync {
Item consume() Item consume()
{ {
while (!done){ while (!done){
std::this_thread::sleep_for(10ms); std::this_thread::sleep_for(100us);
} }
if (channel.empty()){ if (channel.empty()){
return Item{-1}; return Item{-1};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment