Skip to content

Commit

Permalink
feat: add new option for bulk gets in kv
Browse files Browse the repository at this point in the history
  • Loading branch information
talves committed Feb 28, 2025
1 parent b59626b commit 6e63cdc
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 8 deletions.
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@
"src/workerd/api/node/path-test.wd-test",
"src/workerd/api/node/streams-test.wd-test",
"src/workerd/api/node/string-decoder-test.wd-test",
"src/workerd/api/kv-test.wd-test",
"src/workerd/api/queue-test.wd-test",
"src/workerd/api/rtti-test.wd-test",
"src/workerd/api/sql-test.wd-test",
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ wd_test(
args = ["--experimental"],
)

wd_test(
src = "kv-test.wd-test",
args = ["--experimental"],
data = ["kv-test.js"],
)

wd_test(
src = "queue-test.wd-test",
args = ["--experimental"],
Expand Down
170 changes: 170 additions & 0 deletions src/workerd/api/kv-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (c) 2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';
export default {
// Producer receiver (from `env.NAMESPACE`)
async fetch(request, env, ctx) {
const options = {
status: 200,
statusText: "Success!",
headers: new Headers({
'Content-Type': 'application/json'
})
};

var result = "example";
const { pathname } = new URL(request.url);
if (pathname === '/fail-client') {
options.status = "404"
result = ""
} else if (pathname == "/fail-server") {
options.status = "500"
result = ""
} else if (pathname == "/get-json") {
result = JSON.stringify({ example: "values" });
} else if (pathname == "/get-bulk") {
var body = request.body;
const reader = body.getReader();
const decoder = new TextDecoder(); // UTF-8 by default
let r = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
r += decoder.decode(value, { stream: true });
}
const parsedBody = JSON.parse(r);
const keys = parsedBody.keys;
result = {}
if(parsedBody.type == "json") {
for(const key of keys) {
if(key == "key-not-json") {
return new Response(null, {status: 500})
}
const val = { example: "values"};
if (parsedBody.withMetadata) {
result[key] = {value: val, metadata: "example-metadata"};
} else {
result[key] = val;
}
}
} else if (!parsedBody.type || parsedBody.type == "text") {
for(const key of keys) {
const val = JSON.stringify({ example: "values" });;
if (parsedBody.withMetadata) {
result[key] = {value: val, metadata: "example-metadata"};
} else {
result[key] = val;
}
}
} else { // invalid type requested
return new Response("Requested type is invalid",{status: 500});

}
result = JSON.stringify(result);
} else { // generic success for get key
result = "value-"+pathname.slice(1);
}
let response = new Response(result, {status: 200});
response.headers.set("CF-KV-Metadata", '{"someMetadataKey":"someMetadataValue","someUnicodeMeta":"🤓"}');

return response;
},


async test(ctrl, env, ctx) {
// Test .get()
var response = await env.KV.get('success',{});
assert.strictEqual(response, "value-success");

response = await env.KV.get('fail-client');
assert.strictEqual(response, "");
try {
response = await env.KV.get('fail-server');
assert.ok(false);
} catch {
assert.ok(true);
}

response = await env.KV.get('get-json');
assert.strictEqual(response, JSON.stringify({ example: "values" }));

response = await env.KV.get('get-json', "json");
assert.deepEqual(response, { example: "values" });


var response = await env.KV.get('success', "stream");
const reader = response.getReader();
const decoder = new TextDecoder(); // UTF-8 by default
let result = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
result += decoder.decode(value, { stream: true });
}
assert.strictEqual(result, "value-success");

var response = await env.KV.get('success', "arrayBuffer");
assert.strictEqual(new TextDecoder().decode(response), "value-success");


// Testing .get bulk
var response = await env.KV.get(["key1", "key2"],{});
var expected = { key1: '{\"example\":\"values\"}', key2: '{\"example\":\"values\"}' };
assert.deepEqual(response, expected);


// get bulk text but it is json format
var response = await env.KV.get(["key1", "key2"], "json");
var expected = { key1: { example: 'values' }, key2: { example: 'values' } };
assert.deepEqual(response, expected);

// get bulk json but it is not json - throws error
try{
var response = await env.KV.get(["key-not-json", "key2"], "json");
assert.ok(false); // not reached
} catch ({ name, message }){
assert(message.includes("500"))
assert.ok(true);
}
// requested type is invalid for bulk get
try{
var response = await env.KV.get(["key-not-json", "key2"], "arrayBuffer");
assert.ok(false); // not reached
} catch ({ name, message }){
// assert(message.includes("invalid")) // this message is not processed, should it?
assert.ok(true);
}
try{
var response = await env.KV.get(["key-not-json", "key2"], {type: "banana"});
assert.ok(false); // not reached
} catch ({ name, message }){
// assert(message.includes("invalid")) // this message is not processed, should it?
assert.ok(true);
}

// get with metadata
var response = await env.KV.getWithMetadata('key1',{});
var expected = {
value: 'value-key1',
metadata: { someMetadataKey: 'someMetadataValue', someUnicodeMeta: '🤓' },
cacheStatus: null
};
assert.deepEqual(response, expected);


var response = await env.KV.getWithMetadata(['key1'],{});
var expected = { key1: { metadata: 'example-metadata', value: '{"example":"values"}' } };
assert.deepEqual(response, expected);


var response = await env.KV.getWithMetadata(['key1'], "json");
var expected = { key1: { metadata: 'example-metadata', value: { example: 'values' } } };
assert.deepEqual(response, expected);

var response = await env.KV.getWithMetadata(['key1', 'key2'], "json");
var expected = { key1: { metadata: 'example-metadata', value: { example: 'values' } }, key2: { metadata: 'example-metadata', value: { example: 'values' } } };
assert.deepEqual(response, expected);
},
};
16 changes: 16 additions & 0 deletions src/workerd/api/kv-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "kv-test",
worker = (
modules = [
( name = "worker", esModule = embed "kv-test.js" )
],
bindings = [ ( name = "KV", kvNamespace = "kv-test" ), ],
compatibilityDate = "2023-07-24",
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"],
)
),
],
);
120 changes: 116 additions & 4 deletions src/workerd/api/kv.c++
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return "kv_list"_kjc;
case LimitEnforcer::KvOpType::DELETE:
return "kv_delete"_kjc;
case LimitEnforcer::KvOpType::GET_BULK:
return "kv_get_bulk"_kjc;
}
}
}
Expand Down Expand Up @@ -155,7 +157,7 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context,
return client;
}

jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
jsg::Promise<KvNamespace::GetResult> KvNamespace::getSingle(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return js.evalNow([&] {
auto resp =
Expand All @@ -165,9 +167,119 @@ jsg::Promise<KvNamespace::GetResult> KvNamespace::get(
});
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
jsg::Promise<jsg::JsRef<jsg::JsValue>> KvNamespace::getBulk(jsg::Lock& js,
kj::Array<kj::String> name,
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options,
bool withMetadata) {
return js.evalNow([&] {

auto& context = IoContext::current();

kj::Url url;
url.scheme = kj::str("https");
url.host = kj::str("fake-host");
url.path.add(kj::str("get-bulk"));

kj::String type = kj::str("");
KJ_IF_SOME(oneOfOptions, options) {
KJ_SWITCH_ONEOF(oneOfOptions) {
KJ_CASE_ONEOF(t, kj::String) {
type = kj::str(t);
}
KJ_CASE_ONEOF(options, GetOptions) {
KJ_IF_SOME(t, options.type) {
type = kj::str(t);
}
}
}
}

kj::Vector<kj::String> stringVector;
for (auto& str : name) {
stringVector.add(kj::str("\"", str, "\"")); // Wrap each string in quotes for JSON
}

// Join array elements into a JSON array format
kj::String jsonArray = kj::str("[", kj::strArray(stringVector, ", "), "]");
kj::String s = kj::str("{'key': 'value'}");
kj::String keys = kj::str("\"keys\": ", jsonArray);
kj::String typeStr = kj::str("");
kj::String metadataStr = kj::str("");
if(type != kj::str("")) {
typeStr = kj::str(",\"type\": \"", type, "\"");
}
if(withMetadata) {
metadataStr = kj::str(",\"withMetadata\": \"true\"");
}
kj::String body = kj::str("{", keys, typeStr, metadataStr, "}");

kj::Maybe<uint64_t> expectedBodySize = uint64_t(body.size());
auto headers = kj::HttpHeaders(context.getHeaderTable());

auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST);

auto client =
getHttpClient(context, headers, LimitEnforcer::KvOpType::GET_BULK, urlStr, kj::mv(options));

auto promise = context.waitForOutputLocks().then(
[client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers),
expectedBodySize, supportedBody = kj::mv(body)]() mutable {
auto innerReq = client->request(kj::HttpMethod::POST, urlStr, headers, expectedBodySize);
struct RefcountedWrapper: public kj::Refcounted {
explicit RefcountedWrapper(kj::Own<kj::HttpClient> client): client(kj::mv(client)) {}
kj::Own<kj::HttpClient> client;
};
auto rcClient = kj::refcounted<RefcountedWrapper>(kj::mv(client));
auto req = attachToRequest(kj::mv(innerReq), kj::mv(rcClient));

kj::Promise<void> writePromise = nullptr;
writePromise = req.body->write(supportedBody.asBytes()).attach(kj::mv(supportedBody));

return writePromise.attach(kj::mv(req.body)).then([resp = kj::mv(req.response)]() mutable {
return resp.then([](kj::HttpClient::Response&& response) mutable {
checkForErrorStatus("GET_BULK", response);
return response.body->readAllText().attach(kj::mv(response.body));
});
});
});

return context.awaitIo(js, kj::mv(promise), [&](jsg::Lock& js, kj::String text) mutable {
auto result = jsg::JsValue::fromJson(js, text);
return jsg::JsRef(js, result);
});
});
}

kj::OneOf<jsg::Promise<KvNamespace::GetResult>,jsg::Promise<jsg::JsRef<jsg::JsValue>> > KvNamespace::get(
jsg::Lock& js, kj::OneOf<kj::String, kj::Array<kj::String>> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {

KJ_SWITCH_ONEOF(name) {
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) {
return getBulk(js, kj::mv(arr), kj::mv(options), false);
}
KJ_CASE_ONEOF(str, kj::String) {
return getSingle(js, kj::mv(str), kj::mv(options));
}
}
KJ_UNREACHABLE;
};

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataSingle(
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH);
}

kj::OneOf<jsg::Promise<KvNamespace::GetWithMetadataResult>, jsg::Promise<jsg::JsRef<jsg::JsValue>>> KvNamespace::getWithMetadata(
jsg::Lock& js, kj::OneOf<kj::Array<kj::String>, kj::String> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) {
KJ_SWITCH_ONEOF(name) {
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) {
return getBulk(js, kj::mv(arr), kj::mv(options), true);
}
KJ_CASE_ONEOF(str, kj::String) {
return getWithMetadataSingle(js, kj::mv(str), kj::mv(options));
}
}
KJ_UNREACHABLE;
}

jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataImpl(jsg::Lock& js,
Expand Down
Loading

0 comments on commit 6e63cdc

Please sign in to comment.