Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion GNUmakefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ stamp-h: config.h.in config.status
echo > stamp-h

clean:
rm -f mtd mtclient mttest test_string test_atomics *.o libjson.a
rm -f mtd mtclient mttest scantest test_string test_atomics *.o libjson.a
rm -rf .deps

DEPFILES := $(wildcard $(DEPSDIR)/*.d)
Expand Down
34 changes: 34 additions & 0 deletions kvrow.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class query {
void run_scan(T& table, Json& request, threadinfo& ti);
template <typename T>
void run_rscan(T& table, Json& request, threadinfo& ti);
template <typename T>
void run_iscan(T& table, Json& request, threadinfo& ti);

const loginfo::query_times& query_times() const {
return qtimes_;
Expand Down Expand Up @@ -318,4 +320,36 @@ void query<R>::run_rscan(T& table, Json& request, threadinfo& ti) {
table.rscan(scanf.firstkey(), true, scanf, ti);
}

template <typename R> template <typename T>
void query<R>::run_iscan(T& table, Json& request, threadinfo& ti) {
assert(request[3].as_i() > 0);
f_.clear();
for (int i = 4; i != request.size(); ++i)
f_.push_back(request[i].as_i());
int nleft = request[3].as_i();
lcdf::String firstkey;
std::swap(request[2].value().as_s(), firstkey);
request.resize(2);
scankeypos_ = 0;
typename T::iterator it = table.iterate_from(firstkey, ti);
for (; nleft != 0 && it != table.end(ti); nleft--) {
Str key = it->first;
R* value = it->second;
if (row_is_marker(value))
break;
// NB the `key` is not stable! We must save space for it.
while (scankeypos_ + key.length() > scankey_.length()) {
scankey_ = lcdf::String::make_uninitialized(scankey_.length() ? scankey_.length() * 2 : 1024);
scankeypos_ = 0;
}
memcpy(const_cast<char*>(scankey_.data() + scankeypos_),
key.data(), key.length());
request.push_back(scankey_.substr(scankeypos_, key.length()));
scankeypos_ += key.length();
request.push_back(lcdf::Json());
emit_fields1(value, request.back(), ti);
it++;
}
}

#endif
208 changes: 208 additions & 0 deletions kvtest.hh
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,214 @@ void kvtest_rscan1(C &client, double writer_quiet)
client.report(result);
}

template <typename C>
void kvtest_iscan1(C &client, double writer_quiet)
{
int n, wq65536 = int(writer_quiet * 65536);
if (client.limit() == ~(uint64_t) 0)
n = 10000;
else
n = std::min(client.limit(), (uint64_t) 97655);
Json result;

if (client.id() % 24 == 0) {
for (int i = 0; i < n; ++i)
client.put_key8(i * 1024, i);
client.wait_all();

int pos = 0, mypos = 0, scansteps = 0;
quick_istr key;
std::vector<Str> keys, values;
Json errj;
while (!client.timeout(0) && errj.size() < 1000) {
key.set(pos, 8);
client.iscan_sync(key.string(), 100, keys, values);
if (keys.size() == 0) {
if (mypos < n * 1024)
errj.push_back("missing " + String(mypos) + " through " + String((n - 1) * 1024));
pos = mypos = 0;
} else {
for (size_t i = 0; i < keys.size(); ++i) {
int val = keys[i].to_i();
if (val < 0) {
errj.push_back("unexpected key " + String(keys[i].s, keys[i].len));
continue;
}
if (val < pos)
errj.push_back("got " + String(keys[i].s, keys[i].len) + ", expected " + String(pos) + " or later");
pos = val + 1;
while (val > mypos) {
errj.push_back("got " + String(keys[i].s, keys[i].len) + ", missing " + String(mypos) + " @" + String(scansteps) + "+" + String(i));
mypos += 1024;
}
if (val == mypos) {
mypos = val + 1024;
++scansteps;
}
}
}
client.rcu_quiesce();
}
if (errj.size() >= 1000)
errj.push_back("too many errors, giving up");
result.set("ok", errj.empty()).set("scansteps", scansteps);
if (errj)
result.set("errors", errj);

} else {
int delta = 1 + (client.id() % 30) * 32, rounds = 0;
while (!client.timeout(0)) {
int first = (client.rand.next() % n) * 1024 + delta;
int rand = client.rand.next() % 65536;
if (rand < wq65536) {
for (int d = 0; d < 31; ++d)
relax_fence();
} else if (rounds > 100 && (rand % 2) == 1) {
for (int d = 0; d < 31; ++d)
client.remove_key8(d + first);
} else {
for (int d = 0; d < 31; ++d)
client.put_key8(d + first, d + first);
}
++rounds;
client.rcu_quiesce();
}
}

client.report(result);
}

// Test scans where nodes are repeatedly removed and inserted
template <typename C>
void kvtest_iscan2(C &client)
{
unsigned n;
Json result;

if (client.limit() == ~(uint64_t) 0)
n = 10000;
else
n = std::min(client.limit(), (uint64_t) 97655);

if (client.id() == 0) {
Json errj;
unsigned rounds = 0, nfound = 0;

for (unsigned i = 0; i < n; ++i) {
client.put_key16(i * 1024, i * 1024);
}

client.rcu_quiesce();
client.wait_all();
client.puts_done();

while (!client.timeout(0) && errj.size() < 1000) {
int pos = 0;
std::vector<Str> keys, values;

client.iscan_sync("", INT_MAX, keys, values);
for (size_t i = 0; i < keys.size(); ++i) {
int val = keys[i].to_i();
while (val > pos) {
errj.push_back("got " + String(keys[i].s, keys[i].len) + ", missing " + String(pos) + " @" + String(rounds));
pos += 1024;
}
if (val == pos) {
nfound++;
pos += 1024;
}
}
client.rcu_quiesce();
++rounds;
}

if (errj.size() >= 1000)
errj.push_back("too many errors, giving up");

if (errj)
result.set("errors", errj);

result.set("nfound", nfound);
result.set("rounds", rounds);
result.set("success", (float) nfound / rounds);
} else {
while (!client.timeout(0)) {
unsigned i;
do {
i = client.rand.next() % (n * 1024);
} while (i % 1024 == 0);
client.remove_key16(i);
client.put_key16(i, i);
client.rcu_quiesce();
}
}

client.report(result);
}


// Test scans where nodes in a deeper layer are removed
template <typename C>
void kvtest_iscan3(C &client)
{
Json report;

if (client.id() == 0) {
unsigned rounds = 0;

client.put_key8(1, 1);
client.put_key16(300000002, 1);

while (1) {
client.remove_key8(2);
client.put_key16(300000001, 1);

std::vector<Str> keys, values;
client.iscan_sync("", INT_MAX, keys, values);

// Valid key sequences:
// - 00000001, 00000002, 0000000300000002
// - 00000001, 00000002, 0000000300000001, 0000000300000002
// - 00000001, 0000000300000001, 0000000300000002
//
// Invalid key sequences:
// - 00000001, 0000000300000002

if (keys.size() <= 2) {
String seq;
for (size_t i = 0; i < keys.size(); ++i) {
if (i != 0)
seq += ", ";

seq += String(keys[i]);
}
client.fail(("invalid key sequence detected: " + seq).c_str());
}

while (client.get_sync_key16(300000001) && !client.timeout(0))
;
if (client.timeout(0))
break;

rounds++;
}

report.set("rounds", rounds);
} else if (client.id() == 1) {
while (1) {
while (!client.get_sync_key16(300000001) && !client.timeout(0))
client.rcu_quiesce();
if (client.timeout(0))
break;

client.put_key8(2, 1);
client.remove_key16(300000001);
}
}

client.report(report);
}

// test concurrent splits with removes in lower layers
template <typename C>
void kvtest_splitremove1(C &client)
Expand Down
6 changes: 6 additions & 0 deletions masstree.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class basic_table {
typedef typename P::threadinfo_type threadinfo;
typedef unlocked_tcursor<P> unlocked_cursor_type;
typedef tcursor<P> cursor_type;
typedef std::pair<Str, value_type> itvalue_type;

inline basic_table();

Expand All @@ -76,6 +77,11 @@ class basic_table {
template <typename F>
int rscan(Str firstkey, bool matchfirst, F& scanner, threadinfo& ti) const;

class iterator;
iterator begin(threadinfo& ti);
iterator iterate_from(Str firstkey, threadinfo& ti);
iterator end(threadinfo& ti);

template <typename F>
inline int modify(Str key, F& f, threadinfo& ti);
template <typename F>
Expand Down
Loading