-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
230 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
170 changes: 170 additions & 0 deletions
170
lib/src/main/java/uk/gov/di/ipv/cri/kbv/api/service/OtelAsyncHttpClientWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
package uk.gov.di.ipv.cri.kbv.api.service; | ||
|
||
import io.opentelemetry.api.GlobalOpenTelemetry; | ||
import io.opentelemetry.instrumentation.httpclient.JavaHttpClientTelemetry; | ||
import org.apache.http.HttpException; | ||
import org.apache.http.concurrent.FutureCallback; | ||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; | ||
import org.apache.http.nio.protocol.HttpAsyncRequestProducer; | ||
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; | ||
import org.apache.http.protocol.HttpContext; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.net.http.HttpClient; | ||
import java.net.http.HttpRequest; | ||
import java.net.http.HttpResponse; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.Future; | ||
|
||
public class OtelAsyncHttpClientWrapper extends CloseableHttpAsyncClient { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(OtelAsyncHttpClientWrapper.class); | ||
private final HttpClient telemetryHttpClient; | ||
|
||
public OtelAsyncHttpClientWrapper() { | ||
this.telemetryHttpClient = | ||
JavaHttpClientTelemetry.builder(GlobalOpenTelemetry.get()) | ||
.build() | ||
.newHttpClient(HttpClient.newBuilder().build()); | ||
} | ||
|
||
@Override | ||
public <T> Future<T> execute( | ||
HttpAsyncRequestProducer requestProducer, | ||
HttpAsyncResponseConsumer<T> responseConsumer, | ||
HttpContext context, | ||
FutureCallback<T> callback) { | ||
|
||
try { | ||
LOGGER.debug("Preparing to execute request with OpenTelemetry."); | ||
HttpRequest request = convertToHttpRequest(requestProducer); | ||
LOGGER.info("Request prepared: {}", request.uri()); | ||
|
||
CompletableFuture<HttpResponse<String>> futureResponse = | ||
telemetryHttpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); | ||
|
||
// Adapt future to CXF's response handling expectations | ||
return futureResponse.handle( | ||
(response, throwable) -> { | ||
if (throwable != null) { | ||
LOGGER.error( | ||
"Request failed with error: {}", | ||
throwable.getMessage(), | ||
throwable); | ||
if (callback != null) { | ||
callback.failed((Exception) throwable); | ||
} | ||
throw new RuntimeException(throwable); | ||
} | ||
try { | ||
LOGGER.info( | ||
"Response received with status code: {}", | ||
response.statusCode()); | ||
T result = | ||
adaptAndConsumeResponse(response, responseConsumer, callback); | ||
if (callback != null) { | ||
callback.completed(result); | ||
} | ||
return result; | ||
} catch (Exception e) { | ||
LOGGER.error("Error during response adaptation: {}", e.getMessage(), e); | ||
if (callback != null) { | ||
callback.failed(e); | ||
} | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
} catch (Exception e) { | ||
LOGGER.error("Synchronous error during request execution: {}", e.getMessage(), e); | ||
if (callback != null) { | ||
callback.failed(e); | ||
} | ||
CompletableFuture<T> failedFuture = new CompletableFuture<>(); | ||
failedFuture.completeExceptionally(e); | ||
return failedFuture; | ||
} | ||
} | ||
|
||
private HttpRequest convertToHttpRequest(HttpAsyncRequestProducer requestProducer) | ||
throws HttpException, IOException { | ||
LOGGER.debug("Converting HttpAsyncRequestProducer to HttpRequest."); | ||
HttpRequest request = | ||
HttpRequest.newBuilder() | ||
.uri( | ||
URI.create( | ||
requestProducer | ||
.generateRequest() | ||
.getRequestLine() | ||
.getUri())) | ||
.build(); | ||
LOGGER.info("Converted HttpRequest URI: {}", request.uri()); | ||
return request; | ||
} | ||
|
||
private <T> T adaptAndConsumeResponse( | ||
HttpResponse<String> response, | ||
HttpAsyncResponseConsumer<T> responseConsumer, | ||
FutureCallback<T> callback) | ||
throws Exception { | ||
|
||
LOGGER.debug("Adapting HttpResponse to Apache CXF response format."); | ||
var apacheResponse = | ||
new org.apache.http.message.BasicHttpResponse( | ||
new org.apache.http.ProtocolVersion("HTTP", 1, 1), | ||
response.statusCode(), | ||
response.body()); | ||
|
||
response.headers() | ||
.map() | ||
.forEach( | ||
(name, values) -> { | ||
values.forEach(value -> apacheResponse.addHeader(name, value)); | ||
LOGGER.debug("Added header to Apache response: {} -> {}", name, values); | ||
}); | ||
|
||
try { | ||
responseConsumer.responseReceived(apacheResponse); | ||
T result = responseConsumer.getResult(); | ||
LOGGER.info("Response successfully consumed. Result: {}", result); | ||
|
||
if (callback != null) { | ||
callback.completed(result); | ||
} | ||
return result; | ||
} catch (Exception e) { | ||
LOGGER.error("Error during response consumption: {}", e.getMessage(), e); | ||
|
||
if (callback != null) { | ||
callback.failed(e); | ||
} | ||
throw e; | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
LOGGER.info("Closing OtelAsyncHttpClientWrapper."); | ||
} | ||
|
||
@Override | ||
public boolean isRunning() { | ||
LOGGER.debug("Checking if OtelAsyncHttpClientWrapper is running."); | ||
return true; | ||
} | ||
|
||
@Override | ||
public void start() { | ||
LOGGER.info("Starting OtelAsyncHttpClientWrapper."); | ||
} | ||
|
||
@Override | ||
public <T> Future<T> execute( | ||
HttpAsyncRequestProducer requestProducer, | ||
HttpAsyncResponseConsumer<T> responseConsumer, | ||
FutureCallback<T> callback) { | ||
LOGGER.debug("Delegating execution with null HttpContext."); | ||
return execute(requestProducer, responseConsumer, null, callback); | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
lib/src/main/java/uk/gov/di/ipv/cri/kbv/api/service/OtelAsyncHttpConduit.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package uk.gov.di.ipv.cri.kbv.api.service; | ||
|
||
import org.apache.cxf.Bus; | ||
import org.apache.cxf.service.model.EndpointInfo; | ||
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit; | ||
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory; | ||
import org.apache.cxf.ws.addressing.EndpointReferenceType; | ||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; | ||
|
||
import java.io.IOException; | ||
|
||
public class OtelAsyncHttpConduit extends AsyncHTTPConduit { | ||
|
||
public OtelAsyncHttpConduit( | ||
Bus bus, | ||
EndpointInfo endpointInfo, | ||
EndpointReferenceType endpointReferenceType, | ||
AsyncHTTPConduitFactory factory) | ||
throws IOException { | ||
super(bus, endpointInfo, endpointReferenceType, factory); | ||
} | ||
|
||
@Override | ||
public synchronized CloseableHttpAsyncClient getHttpAsyncClient() { | ||
return new OtelAsyncHttpClientWrapper(); | ||
} | ||
} |
23 changes: 23 additions & 0 deletions
23
lib/src/main/java/uk/gov/di/ipv/cri/kbv/api/service/OtelAsyncHttpConduitFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package uk.gov.di.ipv.cri.kbv.api.service; | ||
|
||
import org.apache.cxf.Bus; | ||
import org.apache.cxf.service.model.EndpointInfo; | ||
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit; | ||
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory; | ||
import org.apache.cxf.ws.addressing.EndpointReferenceType; | ||
|
||
import java.io.IOException; | ||
|
||
public class OtelAsyncHttpConduitFactory extends AsyncHTTPConduitFactory { | ||
public OtelAsyncHttpConduitFactory(Bus b) { | ||
super(b); | ||
} | ||
|
||
@Override | ||
public AsyncHTTPConduit createConduit( | ||
Bus bus, EndpointInfo endpointInfo, EndpointReferenceType endpointReferenceType) | ||
throws IOException { | ||
|
||
return new OtelAsyncHttpConduit(bus, endpointInfo, endpointReferenceType, this); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters