-
Notifications
You must be signed in to change notification settings - Fork 343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add new option for bulk gets in KV #3628
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
// Copyright (c) 2023 Cloudflare, Inc. | ||
// Licensed under the Apache 2.0 license found in the LICENSE file or at: | ||
// https://opensource.org/licenses/Apache-2.0 | ||
|
||
import assert from 'node:assert'; | ||
export default { | ||
// Producer receiver (from `env.NAMESPACE`) | ||
async fetch(request, env, ctx) { | ||
let result = "example"; | ||
const { pathname } = new URL(request.url); | ||
if (pathname === '/fail-client') { | ||
return new Response(null, {status: 404}) | ||
} else if (pathname == "/fail-server") { | ||
return new Response(null, {status: 500}) | ||
} else if (pathname == "/get-json") { | ||
result = JSON.stringify({ example: "values" }); | ||
} else if (pathname == "/bulk/get") { | ||
let r = ""; | ||
const decoder = new TextDecoder(); | ||
for await (const chunk of request.body) { | ||
r += decoder.decode(chunk, { stream: true }); | ||
} | ||
r += decoder.decode(); | ||
const parsedBody = JSON.parse(r); | ||
const keys = parsedBody.keys; | ||
if (keys.length < 1 || keys.length > 100) { | ||
return new Response(null, {status: 400}) | ||
} | ||
result = {} | ||
if(parsedBody.type == "json") { | ||
for(const key of keys) { | ||
if(key == "key-not-json") { | ||
return new Response(null, {status: 500}) | ||
} | ||
const val = { example: `values-${key}`}; | ||
if (parsedBody.withMetadata) { | ||
result[key] = {value: val, metadata: "example-metadata"}; | ||
} else { | ||
result[key] = val; | ||
} | ||
} | ||
} else if (!parsedBody.type || parsedBody.type == "text") { | ||
for(const key of keys) { | ||
const val = JSON.stringify({ example: `values-${key}` });; | ||
if(key == "not-found") { | ||
result[key] = null; | ||
} else if (parsedBody.withMetadata) { | ||
result[key] = {value: val, metadata: "example-metadata"}; | ||
} else { | ||
result[key] = val; | ||
} | ||
} | ||
} else { // invalid type requested | ||
return new Response(null,{status: 500}); | ||
|
||
} | ||
result = JSON.stringify(result); | ||
} else { // generic success for get key | ||
result = "value-"+pathname.slice(1); | ||
} | ||
let response = new Response(result, {status: 200}); | ||
response.headers.set("CF-KV-Metadata", '{"someMetadataKey":"someMetadataValue","someUnicodeMeta":"🤓"}'); | ||
return response; | ||
}, | ||
|
||
|
||
async test(ctrl, env, ctx) { | ||
// Test .get() | ||
let response = await env.KV.get('success',{}); | ||
assert.strictEqual(response, "value-success"); | ||
|
||
response = await env.KV.get('fail-client'); | ||
assert.strictEqual(response, null); | ||
await assert.rejects(env.KV.get('fail-server'), { | ||
message: 'KV GET failed: 500 Internal Server Error', | ||
}); | ||
|
||
|
||
response = await env.KV.get('get-json'); | ||
assert.strictEqual(response, JSON.stringify({ example: "values" })); | ||
|
||
response = await env.KV.get('get-json', "json"); | ||
assert.deepStrictEqual(response, { example: "values" }); | ||
|
||
|
||
response = await env.KV.get('success', "stream"); | ||
let result = ""; | ||
const decoder = new TextDecoder(); | ||
for await (const chunk of response) { | ||
result += decoder.decode(chunk, { stream: true }); | ||
} | ||
result += decoder.decode(); | ||
assert.strictEqual(result, "value-success"); | ||
|
||
response = await env.KV.get('success', "arrayBuffer"); | ||
assert.strictEqual(new TextDecoder().decode(response), "value-success"); | ||
|
||
|
||
// Testing .get bulk | ||
response = await env.KV.get(["key1", "key2"]); | ||
let expected = { key1: '{\"example\":\"values-key1\"}', key2: '{\"example\":\"values-key2\"}' }; | ||
assert.deepStrictEqual(response, expected); | ||
|
||
response = await env.KV.get(["key1", "key2"],{}); | ||
expected = { key1: '{\"example\":\"values-key1\"}', key2: '{\"example\":\"values-key2\"}' }; | ||
assert.deepStrictEqual(response, expected); | ||
|
||
let fullKeysArray = []; | ||
let fullResponse = {}; | ||
for(let i = 0; i< 100; i++) { | ||
fullKeysArray.push(`key`+i); | ||
fullResponse[`key`+i] = `{\"example\":\"values-key${i}\"}`; | ||
} | ||
|
||
response = await env.KV.get(fullKeysArray,{}); | ||
assert.deepStrictEqual(response, fullResponse); | ||
|
||
//sending over 100 keys | ||
fullKeysArray.push("key100"); | ||
await assert.rejects(env.KV.get(fullKeysArray), { | ||
message: 'KV GET_BULK failed: 400 Bad Request' | ||
}); | ||
|
||
response = await env.KV.get(["key1", "not-found"],{cacheTtl: 100}); | ||
expected = { key1: '{\"example\":\"values-key1\"}', "not-found": null }; | ||
assert.deepStrictEqual(response, expected); | ||
|
||
await assert.rejects(env.KV.get([]), { | ||
message: 'KV GET_BULK failed: 400 Bad Request' | ||
}); | ||
|
||
// get bulk json | ||
response = await env.KV.get(["key1", "key2"], "json"); | ||
expected = { key1: { example: 'values-key1' }, key2: { example: 'values-key2' } }; | ||
assert.deepStrictEqual(response, expected); | ||
|
||
// get bulk json but it is not json - throws error | ||
await assert.rejects(env.KV.get(["key-not-json", "key2"], "json"), { | ||
message: 'KV GET_BULK failed: 500 Internal Server Error', | ||
}); | ||
|
||
// requested type is invalid for bulk get | ||
await assert.rejects(env.KV.get(["key-not-json", "key2"], "arrayBuffer"), { | ||
message: 'KV GET_BULK failed: 500 Internal Server Error', | ||
}); | ||
|
||
await assert.rejects(env.KV.get(["key-not-json", "key2"], {type: "banana"}), { | ||
message: 'KV GET_BULK failed: 500 Internal Server Error', | ||
}); | ||
|
||
|
||
// get with metadata | ||
response = await env.KV.getWithMetadata('key1'); | ||
expected = { | ||
value: 'value-key1', | ||
metadata: { someMetadataKey: 'someMetadataValue', someUnicodeMeta: '🤓' }, | ||
cacheStatus: null | ||
}; | ||
assert.deepStrictEqual(response, expected); | ||
|
||
response = await env.KV.getWithMetadata(['key1'],{}); | ||
expected = { key1: { metadata: 'example-metadata', value: '{"example":"values-key1"}' } }; | ||
assert.deepStrictEqual(response, expected); | ||
|
||
response = await env.KV.getWithMetadata(['key1'], "json"); | ||
expected = { key1: { metadata: 'example-metadata', value: { example: 'values-key1' } } }; | ||
assert.deepStrictEqual(response, expected); | ||
response = await env.KV.getWithMetadata(['key1', 'key2'], "json"); | ||
expected = { key1: { metadata: 'example-metadata', value: { example: 'values-key1' } }, key2: { metadata: 'example-metadata', value: { example: 'values-key2' } } }; | ||
assert.deepStrictEqual(response, expected); | ||
}, | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
using Workerd = import "/workerd/workerd.capnp"; | ||
|
||
const unitTests :Workerd.Config = ( | ||
services = [ | ||
( name = "kv-test", | ||
worker = ( | ||
modules = [ | ||
( name = "worker", esModule = embed "kv-test.js" ) | ||
], | ||
bindings = [ ( name = "KV", kvNamespace = "kv-test" ), ], | ||
compatibilityDate = "2023-07-24", | ||
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers"], | ||
) | ||
), | ||
], | ||
); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,8 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context, | |
return "kv_list"_kjc; | ||
case LimitEnforcer::KvOpType::DELETE: | ||
return "kv_delete"_kjc; | ||
case LimitEnforcer::KvOpType::GET_BULK: | ||
return "kv_get_bulk"_kjc; | ||
} | ||
} | ||
} | ||
|
@@ -155,7 +157,7 @@ kj::Own<kj::HttpClient> KvNamespace::getHttpClient(IoContext& context, | |
return client; | ||
} | ||
|
||
jsg::Promise<KvNamespace::GetResult> KvNamespace::get( | ||
jsg::Promise<KvNamespace::GetResult> KvNamespace::getSingle( | ||
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) { | ||
return js.evalNow([&] { | ||
auto resp = | ||
|
@@ -165,9 +167,130 @@ jsg::Promise<KvNamespace::GetResult> KvNamespace::get( | |
}); | ||
} | ||
|
||
jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadata( | ||
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) { | ||
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH); | ||
jsg::Promise<jsg::JsRef<jsg::JsValue>> KvNamespace::getBulk(jsg::Lock& js, | ||
kj::Array<kj::String> name, | ||
jsg::Optional<kj::OneOf<kj::String, GetOptions>> options, | ||
bool withMetadata) { | ||
return js.evalNow([&] { | ||
|
||
auto& context = IoContext::current(); | ||
|
||
kj::Url url; | ||
url.scheme = kj::str("https"); | ||
url.host = kj::str("fake-host"); | ||
url.path.add(kj::str("bulk")); | ||
url.path.add(kj::str("get")); | ||
|
||
kj::String body = formBulkBodyString(name, withMetadata, options); | ||
|
||
kj::Maybe<uint64_t> expectedBodySize = uint64_t(body.size()); | ||
auto headers = kj::HttpHeaders(context.getHeaderTable()); | ||
|
||
auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST); | ||
|
||
auto client = | ||
getHttpClient(context, headers, LimitEnforcer::KvOpType::GET_BULK, urlStr, kj::mv(options)); | ||
|
||
auto promise = context.waitForOutputLocks().then( | ||
[client = kj::mv(client), urlStr = kj::mv(urlStr), headers = kj::mv(headers), | ||
expectedBodySize, supportedBody = kj::mv(body)]() mutable { | ||
auto innerReq = client->request(kj::HttpMethod::POST, urlStr, headers, expectedBodySize); | ||
struct RefcountedWrapper: public kj::Refcounted { | ||
explicit RefcountedWrapper(kj::Own<kj::HttpClient> client): client(kj::mv(client)) {} | ||
kj::Own<kj::HttpClient> client; | ||
}; | ||
auto rcClient = kj::refcounted<RefcountedWrapper>(kj::mv(client)); | ||
auto req = attachToRequest(kj::mv(innerReq), kj::mv(rcClient)); | ||
|
||
kj::Promise<void> writePromise = nullptr; | ||
writePromise = req.body->write(supportedBody.asBytes()).attach(kj::mv(supportedBody)); | ||
|
||
return writePromise.attach(kj::mv(req.body)).then([resp = kj::mv(req.response)]() mutable { | ||
return resp.then([](kj::HttpClient::Response&& response) mutable { | ||
checkForErrorStatus("GET_BULK", response); | ||
return response.body->readAllText().attach(kj::mv(response.body)); | ||
}); | ||
}); | ||
}); | ||
|
||
return context.awaitIo(js, kj::mv(promise), [&](jsg::Lock& js, kj::String text) mutable { | ||
auto result = jsg::JsValue::fromJson(js, text); | ||
return jsg::JsRef(js, result); | ||
}); | ||
}); | ||
} | ||
|
||
kj::String KvNamespace::formBulkBodyString(kj::Array<kj::String>& names, bool withMetadata, jsg::Optional<kj::OneOf<kj::String, GetOptions>>& options) { | ||
kj::Vector<kj::String> stringVector; | ||
|
||
kj::String type = kj::str(""); | ||
kj::String cacheTtlStr = kj::str(""); | ||
KJ_IF_SOME(oneOfOptions, options) { | ||
KJ_SWITCH_ONEOF(oneOfOptions) { | ||
KJ_CASE_ONEOF(t, kj::String) { | ||
type = kj::str(t); | ||
} | ||
KJ_CASE_ONEOF(options, GetOptions) { | ||
KJ_IF_SOME(t, options.type) { | ||
type = kj::str(t); | ||
} | ||
KJ_IF_SOME(cacheTtl, options.cacheTtl) { | ||
cacheTtlStr = kj::str(cacheTtl); | ||
} | ||
} | ||
} | ||
} | ||
for (auto& str : names) { | ||
stringVector.add(kj::str("\"", str, "\"")); // Wrap each string in quotes for JSON | ||
} | ||
|
||
// Join array elements into a JSON array format | ||
kj::String jsonArray = kj::str("[", kj::strArray(stringVector, ", "), "]"); | ||
kj::String s = kj::str("{'key': 'value'}"); | ||
kj::String keys = kj::str("\"keys\": ", jsonArray); | ||
kj::String typeStr = kj::str(""); | ||
kj::String metadataStr = kj::str(""); | ||
if(type != kj::str("")) { | ||
typeStr = kj::str(",\"type\": \"", type, "\""); | ||
} | ||
if(withMetadata) { | ||
metadataStr = kj::str(",\"withMetadata\": \"true\""); | ||
} | ||
if(cacheTtlStr != kj::str("")) { | ||
cacheTtlStr = kj::str(",\"cacheTtl\": \"",cacheTtlStr,"\""); | ||
} | ||
return kj::str("{", keys, typeStr, metadataStr, cacheTtlStr, "}"); | ||
} | ||
|
||
kj::OneOf<jsg::Promise<KvNamespace::GetResult>,jsg::Promise<jsg::JsRef<jsg::JsValue>> > KvNamespace::get( | ||
jsg::Lock& js, kj::OneOf<kj::String, kj::Array<kj::String>> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) { | ||
KJ_SWITCH_ONEOF(name) { | ||
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) { | ||
return getBulk(js, kj::mv(arr), kj::mv(options), false); | ||
} | ||
KJ_CASE_ONEOF(str, kj::String) { | ||
return getSingle(js, kj::mv(str), kj::mv(options)); | ||
} | ||
} | ||
KJ_UNREACHABLE; | ||
}; | ||
|
||
jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataSingle( | ||
jsg::Lock& js, kj::String name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) { | ||
return getWithMetadataImpl(js, kj::mv(name), kj::mv(options), LimitEnforcer::KvOpType::GET_WITH); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Be sure to check the linting. Looks like the indentation on a few of these is out of whack. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That line in particular was already here, but I agree that it seems generally funky. Tried to adjust it a bit. Is there a linter I can run on top of this? |
||
|
||
kj::OneOf<jsg::Promise<KvNamespace::GetWithMetadataResult>, jsg::Promise<jsg::JsRef<jsg::JsValue>>> KvNamespace::getWithMetadata( | ||
jsg::Lock& js, kj::OneOf<kj::Array<kj::String>, kj::String> name, jsg::Optional<kj::OneOf<kj::String, GetOptions>> options) { | ||
KJ_SWITCH_ONEOF(name) { | ||
KJ_CASE_ONEOF(arr, kj::Array<kj::String>) { | ||
return getBulk(js, kj::mv(arr), kj::mv(options), true); | ||
} | ||
KJ_CASE_ONEOF(str, kj::String) { | ||
return getWithMetadataSingle(js, kj::mv(str), kj::mv(options)); | ||
} | ||
} | ||
KJ_UNREACHABLE; | ||
} | ||
|
||
jsg::Promise<KvNamespace::GetWithMetadataResult> KvNamespace::getWithMetadataImpl(jsg::Lock& js, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, We have
Response.error()
now that would end up sending a 500 response back to the eyeball.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried this one but the error message would be more generic and less specific to our particular case, so I would rather keep it like it is, if that's ok.