Skip to content

Commit

Permalink
Fixing tests after last Spring Cloud Gateway version upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
ilgrosso committed Jul 3, 2019
1 parent 4eccdae commit d8fd4c9
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -81,121 +76,80 @@ public ModifyResponseGatewayFilter(final Config config) {

@Override
public Mono<Void> filter(final ServerWebExchange exchange, final GatewayFilterChain chain) {
return chain.filter(exchange.mutate().response(decorate(exchange)).build());
}

private ServerHttpResponse decorate(final ServerWebExchange exchange) {
ServerHttpResponse originalResponse = exchange.getResponse();

DataBufferFactory bufferFactory = originalResponse.bufferFactory();
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(originalResponse) {
return new ServerHttpResponseDecorator(originalResponse) {

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;

return super.writeWith(flux.buffer().map(dataBuffers -> {
ByteArrayOutputStream payload = new ByteArrayOutputStream();
dataBuffers.forEach(buffer -> {
byte[] array = new byte[buffer.readableByteCount()];
buffer.read(array);
try {
payload.write(array);
} catch (IOException e) {
LOG.error("While reading original body content", e);
}
});

byte[] input = payload.toByteArray();

InputStream is = null;
boolean compressed = false;
byte[] output;
return super.writeWith(Flux.from(body).buffer().map(dataBuffers -> {
ByteArrayOutputStream payload = new ByteArrayOutputStream();
dataBuffers.forEach(buffer -> {
byte[] array = new byte[buffer.readableByteCount()];
buffer.read(array);
try {
if (isCompressed(input)) {
compressed = true;
is = new GZIPInputStream(new ByteArrayInputStream(input));
} else {
is = new ByteArrayInputStream(input);
}

ObjectNode content = (ObjectNode) MAPPER.readTree(is);
String[] kv = config.getData().split("=");
content.put(kv[0], kv[1]);

output = MAPPER.writeValueAsBytes(content);
payload.write(array);
} catch (IOException e) {
LOG.error("While (de)serializing as JSON", e);
output = ArrayUtils.clone(input);
} finally {
IOUtils.closeStream(is);
LOG.error("While reading original body content", e);
}
});

byte[] input = payload.toByteArray();

InputStream is = null;
boolean compressed = false;
byte[] output;
try {
if (isCompressed(input)) {
compressed = true;
is = new GZIPInputStream(new ByteArrayInputStream(input));
} else {
is = new ByteArrayInputStream(input);
}

if (compressed) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(output.length);
GZIPOutputStream gzipos = new GZIPOutputStream(baos)) {

gzipos.write(output);
gzipos.close();
output = baos.toByteArray();
} catch (IOException e) {
LOG.error("While GZIP-encoding output", e);
}
ObjectNode content = (ObjectNode) MAPPER.readTree(is);
String[] kv = config.getData().split("=");
content.put(kv[0], kv[1]);

output = MAPPER.writeValueAsBytes(content);
} catch (IOException e) {
LOG.error("While (de)serializing as JSON", e);
output = ArrayUtils.clone(input);
} finally {
IOUtils.closeStream(is);
}

if (compressed) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(output.length);
GZIPOutputStream gzipos = new GZIPOutputStream(baos)) {

gzipos.write(output);
gzipos.close();
output = baos.toByteArray();
} catch (IOException e) {
LOG.error("While GZIP-encoding output", e);
}
}

return bufferFactory.wrap(output);
}));
}
return bufferFactory.wrap(output);
}));
}

return super.writeWith(body);
@Override
public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
}
};

return chain.filter(exchange.mutate().response(responseDecorator).build());
}

@Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
}
}

public class ResponseAdapter implements ClientHttpResponse {

private final Flux<DataBuffer> flux;

private final HttpHeaders headers;

@SuppressWarnings("unchecked")
public ResponseAdapter(final Publisher<? extends DataBuffer> body, final HttpHeaders headers) {
this.headers = headers;
if (body instanceof Flux) {
flux = (Flux) body;
} else {
flux = ((Mono) body).flux();
}
}

@Override
public Flux<DataBuffer> getBody() {
return flux;
}

@Override
public HttpHeaders getHeaders() {
return headers;
}

@Override
public HttpStatus getStatusCode() {
return null;
}

@Override
public int getRawStatusCode() {
return 0;
}

@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter;
import org.springframework.cloud.gateway.handler.AsyncPredicate;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
Expand All @@ -48,31 +43,13 @@ public AsyncPredicate<ServerWebExchange> applyAsync(final Config config) {
return exchange -> {
JsonNode cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody == null) {
// Join all the DataBuffers so we have a single DataBuffer for the body
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
// Update the retain counts so we can read the body twice, once to parse into an object
// that we can test the predicate against and a second time when the HTTP client sends
// the request downstream
// Note: if we end up reading the body twice we will run into a problem, but as of right
// now there is no good use case for doing this
DataBufferUtils.retain(dataBuffer);
// Make a slice for each read so each read has its own read/write indexes
Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(
dataBuffer.slice(0, dataBuffer.readableByteCount())));

ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), MESSAGE_READERS).
bodyToMono(JsonNode.class).doOnNext(value -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, value);
exchange.getAttributes().put(AdaptCachedBodyGlobalFilter.CACHED_REQUEST_BODY_KEY, cachedFlux);
}).map(objectValue -> objectValue.has(config.getData()));
});
return ServerWebExchangeUtils.cacheRequestBodyAndRequest(
exchange, serverHttpRequest -> ServerRequest.create(
exchange.mutate().request(serverHttpRequest).build(), MESSAGE_READERS).
bodyToMono(JsonNode.class).
doOnNext(objectValue -> exchange.getAttributes().
put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue)).
map(objectValue -> objectValue.has(config.getData())));
} else {
return Mono.just(cachedBody.has(config.getData()));
}
Expand Down

0 comments on commit d8fd4c9

Please sign in to comment.