Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new option for bulk gets in KV #3628

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@
"src/workerd/api/node/path-test.wd-test",
"src/workerd/api/node/streams-test.wd-test",
"src/workerd/api/node/string-decoder-test.wd-test",
"src/workerd/api/kv-test.wd-test",
"src/workerd/api/queue-test.wd-test",
"src/workerd/api/rtti-test.wd-test",
"src/workerd/api/sql-test.wd-test",
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ wd_test(
args = ["--experimental"],
)

wd_test(
src = "kv-test.wd-test",
args = ["--experimental"],
data = ["kv-test.js"],
)

wd_test(
src = "queue-test.wd-test",
args = ["--experimental"],
Expand Down
172 changes: 172 additions & 0 deletions src/workerd/api/kv-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';
export default {
// Producer receiver (from `env.NAMESPACE`)
async fetch(request, env, ctx) {
let result = "example";
const { pathname } = new URL(request.url);
if (pathname === '/fail-client') {
return new Response(null, {status: 404})
} else if (pathname == "/fail-server") {
return new Response(null, {status: 500})
} else if (pathname == "/get-json") {
result = JSON.stringify({ example: "values" });
} else if (pathname == "/bulk/get") {
let r = "";
const decoder = new TextDecoder();
for await (const chunk of request.body) {
r += decoder.decode(chunk, { stream: true });
}
r += decoder.decode();
const parsedBody = JSON.parse(r);
const keys = parsedBody.keys;
if (keys.length < 1 || keys.length > 100) {
return new Response(null, {status: 400})
}
result = {}
if(parsedBody.type == "json") {
for(const key of keys) {
if(key == "key-not-json") {
return new Response(null, {status: 500})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, We have Response.error() now that would end up sending a 500 response back to the eyeball.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried this one but the error message would be more generic and less specific to our particular case, so I would rather keep it like it is, if that's ok.

}
const val = { example: `values-${key}`};
if (parsedBody.withMetadata) {
result[key] = {value: val, metadata: "example-metadata"};
} else {
result[key] = val;
}
}
} else if (!parsedBody.type || parsedBody.type == "text") {
for(const key of keys) {
const val = JSON.stringify({ example: `values-${key}` });;
if(key == "not-found") {
result[key] = null;
} else if (parsedBody.withMetadata) {
result[key] = {value: val, metadata: "example-metadata"};
} else {
result[key] = val;
}
}
} else { // invalid type requested
return new Response(null,{status: 500});

}
result = JSON.stringify(result);
} else { // generic success for get key
result = "value-"+pathname.slice(1);
}
let response = new Response(result, {status: 200});
response.headers.set("CF-KV-Metadata", '{"someMetadataKey":"someMetadataValue","someUnicodeMeta":"🤓"}');
return response;
},


async test(ctrl, env, ctx) {
// Test .get()
let response = await env.KV.get('success',{});
assert.strictEqual(response, "value-success");

response = await env.KV.get('fail-client');
assert.strictEqual(response, null);
await assert.rejects(env.KV.get('fail-server'), {
message: 'KV GET failed: 500 Internal Server Error',
});


response = await env.KV.get('get-json');
assert.strictEqual(response, JSON.stringify({ example: "values" }));

response = await env.KV.get('get-json', "json");
assert.deepStrictEqual(response, { example: "values" });


response = await env.KV.get('success', "stream");
let result = "";
const decoder = new TextDecoder();
for await (const chunk of response) {
result += decoder.decode(chunk, { stream: true });
}
result += decoder.decode();
assert.strictEqual(result, "value-success");

response = await env.KV.get('success', "arrayBuffer");
assert.strictEqual(new TextDecoder().decode(response), "value-success");


// Testing .get bulk
response = await env.KV.get(["key1", "key2"]);
let expected = { key1: '{\"example\":\"values-key1\"}', key2: '{\"example\":\"values-key2\"}' };
assert.deepStrictEqual(response, expected);

response = await env.KV.get(["key1", "key2"],{});
expected = { key1: '{\"example\":\"values-key1\"}', key2: '{\"example\":\"values-key2\"}' };
assert.deepStrictEqual(response, expected);

let fullKeysArray = [];
let fullResponse = {};
for(let i = 0; i< 100; i++) {
fullKeysArray.push(`key`+i);
fullResponse[`key`+i] = `{\"example\":\"values-key${i}\"}`;
}

response = await env.KV.get(fullKeysArray,{});
assert.deepStrictEqual(response, fullResponse);

//sending over 100 keys
fullKeysArray.push("key100");
await assert.rejects(env.KV.get(fullKeysArray), {
message: 'KV GET_BULK failed: 400 Bad Request'
});

response = await env.KV.get(["key1", "not-found"],{cacheTtl: 100});
expected = { key1: '{\"example\":\"values-key1\"}', "not-found": null };
assert.deepStrictEqual(response, expected);

await assert.rejects(env.KV.get([]), {
message: 'KV GET_BULK failed: 400 Bad Request'
});

// get bulk json
response = await env.KV.get(["key1", "key2"], "json");
expected = { key1: { example: 'values-key1' }, key2: { example: 'values-key2' } };
assert.deepStrictEqual(response, expected);

// get bulk json but it is not json - throws error
await assert.rejects(env.KV.get(["key-not-json", "key2"], "json"), {
message: 'KV GET_BULK failed: 500 Internal Server Error',
});

// requested type is invalid for bulk get
await assert.rejects(env.KV.get(["key-not-json", "key2"], "arrayBuffer"), {
message: 'KV GET_BULK failed: 500 Internal Server Error',
});

await assert.rejects(env.KV.get(["key-not-json", "key2"], {type: "banana"}), {
message: 'KV GET_BULK failed: 500 Internal Server Error',
});


// get with metadata
response = await env.KV.getWithMetadata('key1');
expected = {
value: 'value-key1',
metadata: { someMetadataKey: 'someMetadataValue', someUnicodeMeta: '🤓' },
cacheStatus: null
};
assert.deepStrictEqual(response, expected);

response = await env.KV.getWithMetadata(['key1'],{});
expected = { key1: { metadata: 'example-metadata', value: '{"example":"values-key1"}' } };
assert.deepStrictEqual(response, expected);

response = await env.KV.getWithMetadata(['key1'], "json");
expected = { key1: { metadata: 'example-metadata', value: { example: 'values-key1' } } };
assert.deepStrictEqual(response, expected);
response = await env.KV.getWithMetadata(['key1', 'key2'], "json");
expected = { key1: { metadata: 'example-metadata', value: { example: 'values-key1' } }, key2: { metadata: 'example-metadata', value: { example: 'values-key2' } } };
assert.deepStrictEqual(response, expected);
},
};
16 changes: 16 additions & 0 deletions src/workerd/api/kv-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "kv-test",
worker = (
modules = [
( name = "worker", esModule = embed "kv-test.js" )
],
bindings = [ ( name = "KV", kvNamespace = "kv-test" ), ],
compatibilityDate = "2023-07-24",
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
)
),
],
);
131 changes: 127 additions & 4 deletions src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return "kv_list"_kjc;
case LimitEnforcer::KvOpType::DELETE:
return "kv_delete"_kjc;
case LimitEnforcer::KvOpType::GET_BULK:
return "kv_get_bulk"_kjc;
}
}
}
Expand Down Expand Up @@ -155,7 +157,7 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return client;
}

jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
jsg::Promise<KvNamespace::GetResult> KvNamespace::getSingle(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return js.evalNow([&] {
auto resp =
Expand All @@ -165,9 +167,130 @@ jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
});
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
jsg::Promise<jsg::JsRef<jsg::JsValue>> KvNamespace::getBulk(jsg::Lock& js,
kj::Array<kj::String> name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
bool withMetadata) {
return js.evalNow([&] {

auto& context = IoContext::current();

kj::Url url;
url.scheme = kj::str("https");
url.host = kj::str("fake-host");
url.path.add(kj::str("bulk"));
url.path.add(kj::str("get"));

kj::String body = formBulkBodyString(name, withMetadata, options);

kj::Maybe<uint64_t> expectedBodySize = uint64_t(body.size());
auto headers = kj::HttpHeaders(context.getHeaderTable());

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::GET_BULK, urlStr, kj::mv(options));

auto promise = context.waitForOutputLocks().then(
[client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers),
expectedBodySize, supportedBody = kj::mv(body)]() mutable {
auto innerReq = client->request(kj::HttpMethod::POST, urlStr, headers, expectedBodySize);
struct RefcountedWrapper: public kj::Refcounted {
explicit RefcountedWrapper(kj::Own<kj::HttpClient> client): client(kj::mv(client)) {}
kj::Own<kj::HttpClient> client;
};
auto rcClient = kj::refcounted<RefcountedWrapper>(kj::mv(client));
auto req = attachToRequest(kj::mv(innerReq), kj::mv(rcClient));

kj::Promise<void> writePromise = nullptr;
writePromise = req.body->write(supportedBody.asBytes()).attach(kj::mv(supportedBody));

return writePromise.attach(kj::mv(req.body)).then([resp = kj::mv(req.response)]() mutable {
return resp.then([](kj::HttpClient::Response&& response) mutable {
checkForErrorStatus("GET_BULK", response);
return response.body->readAllText().attach(kj::mv(response.body));
});
});
});

return context.awaitIo(js, kj::mv(promise), [&](jsg::Lock& js, kj::String text) mutable {
auto result = jsg::JsValue::fromJson(js, text);
return jsg::JsRef(js, result);
});
});
}

kj::String KvNamespace::formBulkBodyString(kj::Array<kj::String>& names, bool withMetadata, jsg::Optional<kj::OneOf<kj::String, GetOptions>>& options) {
kj::Vector<kj::String> stringVector;

kj::String type = kj::str("");
kj::String cacheTtlStr = kj::str("");
KJ_IF_SOME(oneOfOptions, options) {
KJ_SWITCH_ONEOF(oneOfOptions) {
KJ_CASE_ONEOF(t, kj::String) {
type = kj::str(t);
}
KJ_CASE_ONEOF(options, GetOptions) {
KJ_IF_SOME(t, options.type) {
type = kj::str(t);
}
KJ_IF_SOME(cacheTtl, options.cacheTtl) {
cacheTtlStr = kj::str(cacheTtl);
}
}
}
}
for (auto& str : names) {
stringVector.add(kj::str("\"", str, "\"")); // Wrap each string in quotes for JSON
}

// Join array elements into a JSON array format
kj::String jsonArray = kj::str("[", kj::strArray(stringVector, ", "), "]");
kj::String s = kj::str("{'key': 'value'}");
kj::String keys = kj::str("\"keys\": ", jsonArray);
kj::String typeStr = kj::str("");
kj::String metadataStr = kj::str("");
if(type != kj::str("")) {
typeStr = kj::str(",\"type\": \"", type, "\"");
}
if(withMetadata) {
metadataStr = kj::str(",\"withMetadata\": \"true\"");
}
if(cacheTtlStr != kj::str("")) {
cacheTtlStr = kj::str(",\"cacheTtl\": \"",cacheTtlStr,"\"");
}
return kj::str("{", keys, typeStr, metadataStr, cacheTtlStr, "}");
}

kj::OneOf<jsg::Promise<KvNamespace::GetResult>,jsg::Promise<jsg::JsRef<jsg::JsValue>> > KvNamespace::get(
jsg::Lock& js, kj::OneOf<kj::String, kj::Array<kj::String>> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
KJ_SWITCH_ONEOF(name) {
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) {
return getBulk(js, kj::mv(arr), kj::mv(options), false);
}
KJ_CASE_ONEOF(str, kj::String) {
return getSingle(js, kj::mv(str), kj::mv(options));
}
}
KJ_UNREACHABLE;
};

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataSingle(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be sure to check the linting. Looks like the indentation on a few of these is out of whack.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line in particular was already here, but I agree that it seems generally funky. Tried to adjust it a bit. Is there a linter I can run on top of this?
Thank you!


kj::OneOf<jsg::Promise<KvNamespace::GetWithMetadataResult>, jsg::Promise<jsg::JsRef<jsg::JsValue>>> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::OneOf<kj::Array<kj::String>, kj::String> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
KJ_SWITCH_ONEOF(name) {
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) {
return getBulk(js, kj::mv(arr), kj::mv(options), true);
}
KJ_CASE_ONEOF(str, kj::String) {
return getWithMetadataSingle(js, kj::mv(str), kj::mv(options));
}
}
KJ_UNREACHABLE;
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataImpl(jsg::Lock& js,
Expand Down
Loading