Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jkunle committed Jan 28, 2025
1 parent 8019cbb commit 633d853
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 2 deletions.
9 changes: 7 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ subprojects {

soap "com.sun.xml.messaging.saaj:saaj-impl:1.5.3",
"org.apache.cxf:cxf-rt-frontend-jaxws:3.4.5",
"org.apache.cxf:cxf-rt-transports-http:3.4.5"
"org.apache.cxf:cxf-rt-transports-http-hc:3.4.5",
"org.apache.httpcomponents:httpasyncclient:4.1.4"

powertools "software.amazon.lambda:powertools-logging:${dependencyVersions.powertools_version}",
"software.amazon.lambda:powertools-metrics:${dependencyVersions.powertools_version}",
Expand Down Expand Up @@ -136,7 +137,11 @@ subprojects {
"uk.org.webcompere:system-stubs-jupiter:${dependencyVersions.webcompere_version}"

otel "io.opentelemetry.instrumentation:opentelemetry-aws-sdk-2.2-autoconfigure:2.12.0-alpha",
"io.opentelemetry.instrumentation:opentelemetry-java-http-client:2.12.0-alpha"
"io.opentelemetry.instrumentation:opentelemetry-java-http-client:2.12.0-alpha",

'io.opentelemetry:opentelemetry-sdk:1.29.0',
'io.opentelemetry:opentelemetry-sdk-logs:1.29.0',
'io.opentelemetry:opentelemetry-exporter-otlp:1.29.0'
}

apply plugin: 'java'
Expand Down
2 changes: 2 additions & 0 deletions lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ repositories {
}

dependencies {
implementation configurations.otel

implementation configurations.cri_common_lib,
configurations.aws,
configurations.dynamodb,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package uk.gov.di.ipv.cri.kbv.api.service;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.httpclient.JavaHttpClientTelemetry;
import org.apache.http.HttpException;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class OtelAsyncHttpClientWrapper extends CloseableHttpAsyncClient {
private static final Logger LOGGER = LoggerFactory.getLogger(OtelAsyncHttpClientWrapper.class);
private final HttpClient telemetryHttpClient;

public OtelAsyncHttpClientWrapper() {
this.telemetryHttpClient =
JavaHttpClientTelemetry.builder(GlobalOpenTelemetry.get())
.build()
.newHttpClient(HttpClient.newBuilder().build());
}

@Override
public <T> Future<T> execute(
HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
HttpContext context,
FutureCallback<T> callback) {

try {
LOGGER.debug("Preparing to execute request with OpenTelemetry.");
HttpRequest request = convertToHttpRequest(requestProducer);
LOGGER.info("Request prepared: {}", request.uri());

CompletableFuture<HttpResponse<String>> futureResponse =
telemetryHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());

// Adapt future to CXF's response handling expectations
return futureResponse.handle(
(response, throwable) -> {
if (throwable != null) {
LOGGER.error(
"Request failed with error: {}",
throwable.getMessage(),
throwable);
if (callback != null) {
callback.failed((Exception) throwable);
}
throw new RuntimeException(throwable);
}
try {
LOGGER.info(
"Response received with status code: {}",
response.statusCode());
T result =
adaptAndConsumeResponse(response, responseConsumer, callback);
if (callback != null) {
callback.completed(result);
}
return result;
} catch (Exception e) {
LOGGER.error("Error during response adaptation: {}", e.getMessage(), e);
if (callback != null) {
callback.failed(e);
}
throw new RuntimeException(e);
}
});
} catch (Exception e) {
LOGGER.error("Synchronous error during request execution: {}", e.getMessage(), e);
if (callback != null) {
callback.failed(e);
}
CompletableFuture<T> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(e);
return failedFuture;
}
}

private HttpRequest convertToHttpRequest(HttpAsyncRequestProducer requestProducer)
throws HttpException, IOException {
LOGGER.debug("Converting HttpAsyncRequestProducer to HttpRequest.");
HttpRequest request =
HttpRequest.newBuilder()
.uri(
URI.create(
requestProducer
.generateRequest()
.getRequestLine()
.getUri()))
.build();
LOGGER.info("Converted HttpRequest URI: {}", request.uri());
return request;
}

private <T> T adaptAndConsumeResponse(
HttpResponse<String> response,
HttpAsyncResponseConsumer<T> responseConsumer,
FutureCallback<T> callback)
throws Exception {

LOGGER.debug("Adapting HttpResponse to Apache CXF response format.");
var apacheResponse =
new org.apache.http.message.BasicHttpResponse(
new org.apache.http.ProtocolVersion("HTTP", 1, 1),
response.statusCode(),
response.body());

response.headers()
.map()
.forEach(
(name, values) -> {
values.forEach(value -> apacheResponse.addHeader(name, value));
LOGGER.debug("Added header to Apache response: {} -> {}", name, values);
});

try {
responseConsumer.responseReceived(apacheResponse);
T result = responseConsumer.getResult();
LOGGER.info("Response successfully consumed. Result: {}", result);

if (callback != null) {
callback.completed(result);
}
return result;
} catch (Exception e) {
LOGGER.error("Error during response consumption: {}", e.getMessage(), e);

if (callback != null) {
callback.failed(e);
}
throw e;
}
}

@Override
public void close() {
LOGGER.info("Closing OtelAsyncHttpClientWrapper.");
}

@Override
public boolean isRunning() {
LOGGER.debug("Checking if OtelAsyncHttpClientWrapper is running.");
return true;
}

@Override
public void start() {
LOGGER.info("Starting OtelAsyncHttpClientWrapper.");
}

@Override
public <T> Future<T> execute(
HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
FutureCallback<T> callback) {
LOGGER.debug("Delegating execution with null HttpContext.");
return execute(requestProducer, responseConsumer, null, callback);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package uk.gov.di.ipv.cri.kbv.api.service;

import org.apache.cxf.Bus;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;

import java.io.IOException;

public class OtelAsyncHttpConduit extends AsyncHTTPConduit {

public OtelAsyncHttpConduit(
Bus bus,
EndpointInfo endpointInfo,
EndpointReferenceType endpointReferenceType,
AsyncHTTPConduitFactory factory)
throws IOException {
super(bus, endpointInfo, endpointReferenceType, factory);
}

@Override
public synchronized CloseableHttpAsyncClient getHttpAsyncClient() {
return new OtelAsyncHttpClientWrapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package uk.gov.di.ipv.cri.kbv.api.service;

import org.apache.cxf.Bus;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory;
import org.apache.cxf.ws.addressing.EndpointReferenceType;

import java.io.IOException;

public class OtelAsyncHttpConduitFactory extends AsyncHTTPConduitFactory {
public OtelAsyncHttpConduitFactory(Bus b) {
super(b);
}

@Override
public AsyncHTTPConduit createConduit(
Bus bus, EndpointInfo endpointInfo, EndpointReferenceType endpointReferenceType)
throws IOException {

return new OtelAsyncHttpConduit(bus, endpointInfo, endpointReferenceType, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.experian.uk.schema.experian.identityiq.services.webservice.IdentityIQWebService;
import com.experian.uk.wasp.TokenService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.lambda.powertools.parameters.SSMProvider;
Expand Down Expand Up @@ -89,6 +92,9 @@ KBVGateway getKbvGateway(KeyStoreLoader keyStoreLoader, KBVClientFactory kbvClie
}

private KBVClientFactory getKbvClientFactory() {
Bus bus = BusFactory.getDefaultBus();
bus.setExtension(new OtelAsyncHttpConduitFactory(bus), AsyncHTTPConduitFactory.class);

TokenService tokenService = new TokenService();
SoapToken soapToken = new SoapToken(APPLICATION, true, tokenService, configurationService);
HeaderHandler headerHandler = new HeaderHandler(soapToken);
Expand Down

0 comments on commit 633d853

Please sign in to comment.