Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jkunle committed Jan 28, 2025
1 parent 8019cbb commit b45c2b7
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 1 deletion.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ subprojects {

soap "com.sun.xml.messaging.saaj:saaj-impl:1.5.3",
"org.apache.cxf:cxf-rt-frontend-jaxws:3.4.5",
"org.apache.cxf:cxf-rt-transports-http:3.4.5"
"org.apache.cxf:cxf-rt-transports-http-hc:3.4.5",
"org.apache.httpcomponents:httpasyncclient:4.1.4"

powertools "software.amazon.lambda:powertools-logging:${dependencyVersions.powertools_version}",
"software.amazon.lambda:powertools-metrics:${dependencyVersions.powertools_version}",
Expand Down
2 changes: 2 additions & 0 deletions lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ repositories {
}

dependencies {
implementation configurations.otel

implementation configurations.cri_common_lib,
configurations.aws,
configurations.dynamodb,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package uk.gov.di.ipv.cri.kbv.api.service;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.httpclient.JavaHttpClientTelemetry;
import org.apache.http.HttpException;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class OtelAsyncHttpClientWrapper extends CloseableHttpAsyncClient {
private final HttpClient telemetryHttpClient;

public OtelAsyncHttpClientWrapper() {
this.telemetryHttpClient =
JavaHttpClientTelemetry.builder(GlobalOpenTelemetry.get())
.build()
.newHttpClient(HttpClient.newBuilder().build());
}

@Override
public <T> Future<T> execute(
HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
HttpContext context,
FutureCallback<T> callback) {

try {
HttpRequest request = convertToHttpRequest(requestProducer);

CompletableFuture<HttpResponse<String>> futureResponse =
telemetryHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());

// Adapt future to CXF's response handling expectations
return futureResponse.handle(
(response, throwable) -> {
if (throwable != null) {
if (callback != null) {
callback.failed((Exception) throwable);
}
throw new RuntimeException(throwable);
}
try {
// Pass the response to the consumer and return the result
T result =
adaptAndConsumeResponse(response, responseConsumer, callback);
if (callback != null) {
callback.completed(result);
}
return result;
} catch (Exception e) {
if (callback != null) {
callback.failed(e);
}
throw new RuntimeException(e);
}
});
} catch (Exception e) {
// Handle synchronous conversion errors
if (callback != null) {
callback.failed(e);
}
CompletableFuture<T> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(e);
return failedFuture;
}
}

private HttpRequest convertToHttpRequest(HttpAsyncRequestProducer requestProducer)
throws HttpException, IOException {
return HttpRequest.newBuilder()
.uri(URI.create(requestProducer.generateRequest().getRequestLine().getUri()))
.build();
}

private <T> T adaptAndConsumeResponse(
HttpResponse<String> response,
HttpAsyncResponseConsumer<T> responseConsumer,
FutureCallback<T> callback)
throws Exception {

var apacheResponse =
new org.apache.http.message.BasicHttpResponse(
new org.apache.http.ProtocolVersion("HTTP", 1, 1),
response.statusCode(),
response.body());

response.headers()
.map()
.forEach(
(name, values) ->
values.forEach(value -> apacheResponse.addHeader(name, value)));

try {
responseConsumer.responseReceived(apacheResponse);
T result = responseConsumer.getResult();

if (callback != null) {
callback.completed(result);
}
return result;
} catch (Exception e) {
if (callback != null) {
callback.failed(e);
}
throw e;
}
}

@Override
public void close() {
// No action needed for java.net.http.HttpClient
}

@Override
public boolean isRunning() {
// java.net.http.HttpClient does not have a "running" state
return true;
}

@Override
public void start() {
// No explicit start action needed for java.net.http.HttpClient
}

@Override
public <T> Future<T> execute(
HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
FutureCallback<T> callback) {
// Use the main execute method with null context
return execute(requestProducer, responseConsumer, null, callback);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package uk.gov.di.ipv.cri.kbv.api.service;

import org.apache.cxf.Bus;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;

import java.io.IOException;

public class OtelAsyncHttpConduit extends AsyncHTTPConduit {

public OtelAsyncHttpConduit(
Bus bus,
EndpointInfo endpointInfo,
EndpointReferenceType endpointReferenceType,
AsyncHTTPConduitFactory factory)
throws IOException {
super(bus, endpointInfo, endpointReferenceType, factory);
}

@Override
public CloseableHttpAsyncClient getHttpAsyncClient() {
return new OtelAsyncHttpClientWrapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package uk.gov.di.ipv.cri.kbv.api.service;

import org.apache.cxf.Bus;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory;
import org.apache.cxf.ws.addressing.EndpointReferenceType;

import java.io.IOException;

public class OtelAsyncHttpConduitFactory extends AsyncHTTPConduitFactory {
public OtelAsyncHttpConduitFactory(Bus b) {
super(b);
}

@Override
public AsyncHTTPConduit createConduit(
Bus bus, EndpointInfo endpointInfo, EndpointReferenceType endpointReferenceType)
throws IOException {

return new OtelAsyncHttpConduit(bus, endpointInfo, endpointReferenceType, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.experian.uk.schema.experian.identityiq.services.webservice.IdentityIQWebService;
import com.experian.uk.wasp.TokenService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.lambda.powertools.parameters.SSMProvider;
Expand Down Expand Up @@ -89,6 +92,9 @@ KBVGateway getKbvGateway(KeyStoreLoader keyStoreLoader, KBVClientFactory kbvClie
}

private KBVClientFactory getKbvClientFactory() {
Bus bus = BusFactory.getDefaultBus();
bus.setExtension(new OtelAsyncHttpConduitFactory(bus), AsyncHTTPConduitFactory.class);

TokenService tokenService = new TokenService();
SoapToken soapToken = new SoapToken(APPLICATION, true, tokenService, configurationService);
HeaderHandler headerHandler = new HeaderHandler(soapToken);
Expand Down

0 comments on commit b45c2b7

Please sign in to comment.