Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new option for bulk gets in KV #3628

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@
"src/workerd/api/node/path-test.wd-test",
"src/workerd/api/node/streams-test.wd-test",
"src/workerd/api/node/string-decoder-test.wd-test",
"src/workerd/api/kv-test.wd-test",
"src/workerd/api/queue-test.wd-test",
"src/workerd/api/rtti-test.wd-test",
"src/workerd/api/sql-test.wd-test",
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ wd_test(
args = ["--experimental"],
)

wd_test(
src = "kv-test.wd-test",
args = ["--experimental"],
data = ["kv-test.js"],
)

wd_test(
src = "queue-test.wd-test",
args = ["--experimental"],
Expand Down
170 changes: 170 additions & 0 deletions src/workerd/api/kv-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';
export default {
// Producer receiver (from `env.NAMESPACE`)
async fetch(request, env, ctx) {
const options = {
status: 200,
statusText: "Success!",
headers: new Headers({
'Content-Type': 'application/json'
})
};

var result = "example";
const { pathname } = new URL(request.url);
if (pathname === '/fail-client') {
options.status = "404"
result = ""
} else if (pathname == "/fail-server") {
options.status = "500"
result = ""
} else if (pathname == "/get-json") {
result = JSON.stringify({ example: "values" });
} else if (pathname == "/get-bulk") {
var body = request.body;
const reader = body.getReader();
const decoder = new TextDecoder(); // UTF-8 by default
let r = "";
while (true) {
const { done, value } = await reader.read();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - any particular reason for doing this in stream, instead of just doing request.arrayBuffer()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried some other ways and it was not reading the stream properly. Both here and in the type "stream" test below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on what "not reading the stream properly" means? ;-)

Also... you can do the following to simplify this a bit:

for await (const chunk of request.body) {
  r += decoder.decode(value, { stream: true });
}
// But also remember to grab the final buffered chunk if any.
r += decoder.decode();

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not try this one in particular. Added it, thank you!

if (done) break;
r += decoder.decode(value, { stream: true });
}
const parsedBody = JSON.parse(r);
const keys = parsedBody.keys;
result = {}
if(parsedBody.type == "json") {
for(const key of keys) {
if(key == "key-not-json") {
return new Response(null, {status: 500})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, We have Response.error() now that would end up sending a 500 response back to the eyeball.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried this one but the error message would be more generic and less specific to our particular case, so I would rather keep it like it is, if that's ok.

}
const val = { example: "values"};
if (parsedBody.withMetadata) {
result[key] = {value: val, metadata: "example-metadata"};
} else {
result[key] = val;
}
}
} else if (!parsedBody.type || parsedBody.type == "text") {
for(const key of keys) {
const val = JSON.stringify({ example: "values" });;
if (parsedBody.withMetadata) {
result[key] = {value: val, metadata: "example-metadata"};
} else {
result[key] = val;
}
}
} else { // invalid type requested
return new Response("Requested type is invalid",{status: 500});

}
result = JSON.stringify(result);
} else { // generic success for get key
result = "value-"+pathname.slice(1);
}
let response = new Response(result, {status: 200});
response.headers.set("CF-KV-Metadata", '{"someMetadataKey":"someMetadataValue","someUnicodeMeta":"🤓"}');

return response;
},


async test(ctrl, env, ctx) {
// Test .get()
var response = await env.KV.get('success',{});
assert.strictEqual(response, "value-success");

response = await env.KV.get('fail-client');
assert.strictEqual(response, "");
try {
response = await env.KV.get('fail-server');
assert.ok(false);
} catch {
assert.ok(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the expectation is that the env.KV.get promise will reject, use

await assert.rejects(env.KV.get('fail-server'), {
  message: '... whatever the expected error message is',
});

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so much cleaner. Thanks a bunch!
And thanks for all the suggestions :)

}

response = await env.KV.get('get-json');
assert.strictEqual(response, JSON.stringify({ example: "values" }));

response = await env.KV.get('get-json', "json");
assert.deepEqual(response, { example: "values" });


var response = await env.KV.get('success', "stream");
const reader = response.getReader();
const decoder = new TextDecoder(); // UTF-8 by default
let result = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
result += decoder.decode(value, { stream: true });
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
result += decoder.decode(value, { stream: true });
}
let result = "";
for await (const chunk of response) {
result += decoder.decode(value, { stream: true });
}
result += decoder.decode();

assert.strictEqual(result, "value-success");

var response = await env.KV.get('success', "arrayBuffer");
assert.strictEqual(new TextDecoder().decode(response), "value-success");


// Testing .get bulk
var response = await env.KV.get(["key1", "key2"],{});
var expected = { key1: '{\"example\":\"values\"}', key2: '{\"example\":\"values\"}' };
assert.deepEqual(response, expected);


// get bulk text but it is json format
var response = await env.KV.get(["key1", "key2"], "json");
var expected = { key1: { example: 'values' }, key2: { example: 'values' } };
assert.deepEqual(response, expected);

// get bulk json but it is not json - throws error
try{
var response = await env.KV.get(["key-not-json", "key2"], "json");
assert.ok(false); // not reached
} catch ({ name, message }){
assert(message.includes("500"))
assert.ok(true);
}
// requested type is invalid for bulk get
try{
var response = await env.KV.get(["key-not-json", "key2"], "arrayBuffer");
assert.ok(false); // not reached
} catch ({ name, message }){
// assert(message.includes("invalid")) // this message is not processed, should it?
assert.ok(true);
}
try{
var response = await env.KV.get(["key-not-json", "key2"], {type: "banana"});
assert.ok(false); // not reached
} catch ({ name, message }){
// assert(message.includes("invalid")) // this message is not processed, should it?
assert.ok(true);
}

// get with metadata
var response = await env.KV.getWithMetadata('key1',{});
var expected = {
value: 'value-key1',
metadata: { someMetadataKey: 'someMetadataValue', someUnicodeMeta: '🤓' },
cacheStatus: null
};
assert.deepEqual(response, expected);


var response = await env.KV.getWithMetadata(['key1'],{});
var expected = { key1: { metadata: 'example-metadata', value: '{"example":"values"}' } };
assert.deepEqual(response, expected);


var response = await env.KV.getWithMetadata(['key1'], "json");
var expected = { key1: { metadata: 'example-metadata', value: { example: 'values' } } };
assert.deepEqual(response, expected);

var response = await env.KV.getWithMetadata(['key1', 'key2'], "json");
var expected = { key1: { metadata: 'example-metadata', value: { example: 'values' } }, key2: { metadata: 'example-metadata', value: { example: 'values' } } };
assert.deepEqual(response, expected);
},
};
16 changes: 16 additions & 0 deletions src/workerd/api/kv-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "kv-test",
worker = (
modules = [
( name = "worker", esModule = embed "kv-test.js" )
],
bindings = [ ( name = "KV", kvNamespace = "kv-test" ), ],
compatibilityDate = "2023-07-24",
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
)
),
],
);
120 changes: 116 additions & 4 deletions src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return "kv_list"_kjc;
case LimitEnforcer::KvOpType::DELETE:
return "kv_delete"_kjc;
case LimitEnforcer::KvOpType::GET_BULK:
return "kv_get_bulk"_kjc;
}
}
}
Expand Down Expand Up @@ -155,7 +157,7 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return client;
}

jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
jsg::Promise<KvNamespace::GetResult> KvNamespace::getSingle(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return js.evalNow([&] {
auto resp =
Expand All @@ -165,9 +167,119 @@ jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
});
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
jsg::Promise<jsg::JsRef<jsg::JsValue>> KvNamespace::getBulk(jsg::Lock& js,
kj::Array<kj::String> name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
bool withMetadata) {
return js.evalNow([&] {

auto& context = IoContext::current();

kj::Url url;
url.scheme = kj::str("https");
url.host = kj::str("fake-host");
url.path.add(kj::str("get-bulk"));

kj::String type = kj::str("");
KJ_IF_SOME(oneOfOptions, options) {
KJ_SWITCH_ONEOF(oneOfOptions) {
KJ_CASE_ONEOF(t, kj::String) {
type = kj::str(t);
}
KJ_CASE_ONEOF(options, GetOptions) {
KJ_IF_SOME(t, options.type) {
type = kj::str(t);
}
}
}
}

kj::Vector<kj::String> stringVector;
for (auto& str : name) {
stringVector.add(kj::str("\"", str, "\"")); // Wrap each string in quotes for JSON
}

// Join array elements into a JSON array format
kj::String jsonArray = kj::str("[", kj::strArray(stringVector, ", "), "]");
kj::String s = kj::str("{'key': 'value'}");
kj::String keys = kj::str("\"keys\": ", jsonArray);
kj::String typeStr = kj::str("");
kj::String metadataStr = kj::str("");
if(type != kj::str("")) {
typeStr = kj::str(",\"type\": \"", type, "\"");
}
if(withMetadata) {
metadataStr = kj::str(",\"withMetadata\": \"true\"");
}
kj::String body = kj::str("{", keys, typeStr, metadataStr, "}");

kj::Maybe<uint64_t> expectedBodySize = uint64_t(body.size());
auto headers = kj::HttpHeaders(context.getHeaderTable());

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::GET_BULK, urlStr, kj::mv(options));

auto promise = context.waitForOutputLocks().then(
[client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers),
expectedBodySize, supportedBody = kj::mv(body)]() mutable {
auto innerReq = client->request(kj::HttpMethod::POST, urlStr, headers, expectedBodySize);
struct RefcountedWrapper: public kj::Refcounted {
explicit RefcountedWrapper(kj::Own<kj::HttpClient> client): client(kj::mv(client)) {}
kj::Own<kj::HttpClient> client;
};
auto rcClient = kj::refcounted<RefcountedWrapper>(kj::mv(client));
auto req = attachToRequest(kj::mv(innerReq), kj::mv(rcClient));

kj::Promise<void> writePromise = nullptr;
writePromise = req.body->write(supportedBody.asBytes()).attach(kj::mv(supportedBody));

return writePromise.attach(kj::mv(req.body)).then([resp = kj::mv(req.response)]() mutable {
return resp.then([](kj::HttpClient::Response&& response) mutable {
checkForErrorStatus("GET_BULK", response);
return response.body->readAllText().attach(kj::mv(response.body));
});
});
});

return context.awaitIo(js, kj::mv(promise), [&](jsg::Lock& js, kj::String text) mutable {
auto result = jsg::JsValue::fromJson(js, text);
return jsg::JsRef(js, result);
});
});
}

kj::OneOf<jsg::Promise<KvNamespace::GetResult>,jsg::Promise<jsg::JsRef<jsg::JsValue>> > KvNamespace::get(
jsg::Lock& js, kj::OneOf<kj::String, kj::Array<kj::String>> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {

KJ_SWITCH_ONEOF(name) {
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) {
return getBulk(js, kj::mv(arr), kj::mv(options), false);
}
KJ_CASE_ONEOF(str, kj::String) {
return getSingle(js, kj::mv(str), kj::mv(options));
}
}
KJ_UNREACHABLE;
};

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataSingle(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be sure to check the linting. Looks like the indentation on a few of these is out of whack.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line in particular was already here, but I agree that it seems generally funky. Tried to adjust it a bit. Is there a linter I can run on top of this?
Thank you!


kj::OneOf<jsg::Promise<KvNamespace::GetWithMetadataResult>, jsg::Promise<jsg::JsRef<jsg::JsValue>>> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::OneOf<kj::Array<kj::String>, kj::String> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
KJ_SWITCH_ONEOF(name) {
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) {
return getBulk(js, kj::mv(arr), kj::mv(options), true);
}
KJ_CASE_ONEOF(str, kj::String) {
return getWithMetadataSingle(js, kj::mv(str), kj::mv(options));
}
}
KJ_UNREACHABLE;
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataImpl(jsg::Lock& js,
Expand Down
Loading