Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFlow as first class object #98

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package io.github.jpmorganchase.fusion.packaging;

import com.github.tomakehurst.wiremock.client.WireMock;
import io.github.jpmorganchase.fusion.model.Application;
import io.github.jpmorganchase.fusion.model.DataFlow;
import io.github.jpmorganchase.fusion.model.Dataset;
import io.github.jpmorganchase.fusion.model.Flow;
import io.github.jpmorganchase.fusion.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson;
import static io.github.jpmorganchase.fusion.test.TestUtils.listOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class DataFlowOperationsIT extends BaseOperationsIT {

@Test
public void testCreateInputDataFlow() {
// Given
wireMockRule.stubFor(WireMock.post(WireMock.urlEqualTo("/catalogs/common/datasets/SIF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("flow/dataset-flow-SIF0001-create-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));


DataFlow flow = flow("SIF0001", "Input");

// When & Then
Assertions.assertDoesNotThrow(flow::create);
}

@Test
public void testCreateOutputDataFlow() {
// Given
wireMockRule.stubFor(WireMock.post(WireMock.urlEqualTo("/catalogs/common/datasets/SOF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("flow/dataset-flow-SOF0001-create-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));

DataFlow flow = flow("SOF0001", "Output");

// When & Then
Assertions.assertDoesNotThrow(flow::create);
}

@Test
public void testCreateDataFlowWithAddedFeedDetails() {
// Given
wireMockRule.stubFor(WireMock.post(WireMock.urlEqualTo("/catalogs/common/datasets/SIF0002"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("flow/dataset-flow-SIF0002-create-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));


DataFlow flow = flow("SIF0002", "Input")
.toBuilder()
.flow(Flow.builder()
.flowDirection("Input")
.producerApplicationId(Application.builder().sealId("123456").build())
.consumerApplicationId(Application.builder().sealId("456789").build())
.consumerApplicationId(Application.builder().sealId("901234").build())
.flowType("raw")
.startTime("10:00")
.endTime("13:00")
.timeZone("UTC+1")
.build())
.build();

// When & Then
Assertions.assertDoesNotThrow(flow::create);
}

@Test
public void testCreateFlowOverrideDefaultCatalog() {
// Given
wireMockRule.stubFor(WireMock.post(WireMock.urlEqualTo("/catalogs/foobar/datasets/SIF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("flow/dataset-flow-SIF0001-create-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));

DataFlow flow = flow("SIF0001", "Input")
.toBuilder()
.catalogIdentifier("foobar")
.build();
// When & Then
Assertions.assertDoesNotThrow(flow::create);
}

@Test
public void testUpdateDataFlow() {
// Given
wireMockRule.stubFor(WireMock.put(WireMock.urlEqualTo("/catalogs/common/datasets/SIF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("flow/dataset-flow-SIF0001-update-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));

DataFlow flow = flow("SIF0001", "Input")
.toBuilder()
.description("Updated Sample input flow description SIF0001")
.build();


// When & Then
Assertions.assertDoesNotThrow(flow::update);
}

@Test
public void testUpdateDataFlowRetrievedFromListDataFlows() {
// Given
wireMockRule.stubFor(WireMock.get(WireMock.urlEqualTo("/catalogs/common/datasets"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("flow/multiple-flow-response.json")));

wireMockRule.stubFor(WireMock.put(WireMock.urlEqualTo("/catalogs/common/datasets/SIF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("flow/dataset-flow-SIF0001-update-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));

Map<String, DataFlow> flows = getSdk().listDataFlows("common", "SIF0001", true);
DataFlow original = flows.get("SIF0001");

// When
DataFlow amended = original
.toBuilder()
.description("Updated Sample input flow description SIF0001")
.build();

// When & Then
Assertions.assertDoesNotThrow(amended::update);
}


@Test
public void testDeleteDataset() {
// Given
wireMockRule.stubFor(WireMock.delete(WireMock.urlEqualTo("/catalogs/common/datasets/SIF0001"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));

DataFlow flow = getSdk().builders().dataFlow()
.identifier("SIF0001")
.build();

// When & Then
Assertions.assertDoesNotThrow(flow::delete);
}

@Test
public void testDeleteDatasetWithCatalogOverride() {
// Given
wireMockRule.stubFor(WireMock.delete(WireMock.urlEqualTo("/catalogs/foobar/datasets/SIF0001"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)));

DataFlow flow = getSdk().builders().dataFlow()
.identifier("SIF0001")
.catalogIdentifier("foobar")
.build();

// When & Then
Assertions.assertDoesNotThrow(flow::delete);
}

@Test
public void testListDataFlows() {
// Given
wireMockRule.stubFor(WireMock.get(WireMock.urlEqualTo("/catalogs/common/datasets"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("flow/multiple-flow-response.json")));

// When
Map<String, DataFlow> flows = getSdk().listDataFlows();

// Then Verify the response
assertThat(flows.containsKey("SIF0001"), is(equalTo(true)));
assertThat(flows.containsKey("SOF0001"), is(equalTo(true)));
}

@Test
public void testListDatasetsUsingIdContains() {
// Given
wireMockRule.stubFor(WireMock.get(WireMock.urlEqualTo("/catalogs/common/datasets"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("flow/multiple-flow-response.json")));

// When
Map<String, DataFlow> datasets = getSdk().listDataFlows("common", "SIF0001", true);

// Then Verify the response
assertThat(datasets.size(), is(equalTo(1)));
assertThat(datasets.containsKey("SIF0001"), is(equalTo(true)));
}

private DataFlow flow(String identifier, String direction){
return getSdk().builders().dataFlow()
.identifier(identifier)
.description("Sample flow description " + identifier)
.linkedEntity(identifier + "/")
.title("Sample Flow | North America " + identifier)
.frequency("Daily")
.publisher("Publisher " + identifier)
.varArg("category", listOf("Category " + identifier))
.varArg("createdDate", "2022-02-06")
.varArg("coverageStartDate", "2022-02-06")
.varArg("coverageEndDate", "2023-03-09")
.varArg("isThirdPartyData", Boolean.FALSE)
.varArg("isInternalOnlyDataset", Boolean.FALSE)
.varArg("language", "English")
.varArg("maintainer", "Maintainer " + identifier)
.varArg("modifiedDate", "2023-03-09")
.varArg("region", listOf("North America"))
.varArg("source", listOf("Source System " + identifier))
.varArg("subCategory", listOf("Subcategory " + identifier))
.varArg("tag", listOf("Tag" + identifier))
.varArg("isRestricted", Boolean.FALSE)
.varArg("isRawData", Boolean.FALSE)
.varArg("hasSample", Boolean.FALSE)
.applicationId(Application.builder().sealId("12345").build())
.flow(
Flow.builder()
.flowDirection(direction)
.producerApplicationId(Application.builder().sealId("123456").build())
.consumerApplicationId(Application.builder().sealId("456789").build())
.consumerApplicationId(Application.builder().sealId("901234").build())
.build()
)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,102 +105,6 @@ public void testCreateDatasetOverrideDefaultCatalog() {
Assertions.assertDoesNotThrow(dataset::create);
}

@Test
public void testCreateDatasetOfTypeFlowInput() {
// Given
wireMockRule.stubFor(WireMock.post(WireMock.urlEqualTo("/catalogs/common/datasets/SIF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("dataset/dataset-flow-SIF0001-create-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("dataset/dataset-create-response.json")));

Dataset dataset = getSdk().builders().dataset()
.identifier("SIF0001")
.description("Sample input flow description 1")
.linkedEntity("SIF0001/")
.title("Sample Input Flow 1 | North America")
.frequency("Daily")
.publisher("Publisher 1")
.varArg("category", listOf("Category 1"))
.varArg("createdDate", "2022-02-06")
.varArg("coverageStartDate", "2022-02-06")
.varArg("coverageEndDate", "2023-03-09")
.varArg("isThirdPartyData", Boolean.FALSE)
.varArg("isInternalOnlyDataset", Boolean.FALSE)
.varArg("language", "English")
.varArg("maintainer", "Maintainer 1")
.varArg("modifiedDate", "2023-03-09")
.varArg("region", listOf("North America"))
.varArg("source", listOf("Source System 1"))
.varArg("subCategory", listOf("Subcategory 1"))
.varArg("tag", listOf("Tag1"))
.varArg("isRestricted", Boolean.FALSE)
.varArg("isRawData", Boolean.FALSE)
.varArg("hasSample", Boolean.FALSE)
.applicationId(Application.builder().sealId("12345").build())
.flow(
Flow.builder()
.flowDirection("Input")
.producerApplicationId(Application.builder().sealId("123456").build())
.consumerApplicationId(Application.builder().sealId("456789").build())
.build()
)
.build();

// When & Then
Assertions.assertDoesNotThrow(dataset::create);
}

@Test
public void testCreateDatasetOfTypeFlowOutput() {
// Given
wireMockRule.stubFor(WireMock.post(WireMock.urlEqualTo("/catalogs/common/datasets/SOF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("dataset/dataset-flow-SOF0001-create-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("dataset/dataset-create-response.json")));

Dataset dataset = getSdk().builders().dataset()
.identifier("SOF0001")
.description("Sample output flow description 1")
.linkedEntity("SOF0001/")
.title("Sample Output Flow 1 | North America")
.frequency("Daily")
.publisher("Publisher 1")
.varArg("category", listOf("Category 1"))
.varArg("createdDate", "2022-02-06")
.varArg("coverageStartDate", "2022-02-06")
.varArg("coverageEndDate", "2023-03-09")
.varArg("isThirdPartyData", Boolean.FALSE)
.varArg("isInternalOnlyDataset", Boolean.FALSE)
.varArg("language", "English")
.varArg("maintainer", "Maintainer 1")
.varArg("modifiedDate", "2023-03-09")
.varArg("region", listOf("North America"))
.varArg("source", listOf("Source System 1"))
.varArg("subCategory", listOf("Subcategory 1"))
.varArg("tag", listOf("Tag1"))
.varArg("isRestricted", Boolean.FALSE)
.varArg("isRawData", Boolean.FALSE)
.varArg("hasSample", Boolean.FALSE)
.applicationId(Application.builder().sealId("12345").build())
.flow(
Flow.builder()
.flowDirection("Output")
.producerApplicationId(Application.builder().sealId("123456").build())
.consumerApplicationId(Application.builder().sealId("456789").build())
.consumerApplicationId(Application.builder().sealId("901234").build())
.build()
)
.build();

// When & Then
Assertions.assertDoesNotThrow(dataset::create);
}

@Test
public void testUpdateDataset() {
Expand Down Expand Up @@ -257,37 +161,6 @@ public void testUpdateDatasetRetrievedFromListDatasets() {
Assertions.assertDoesNotThrow(amendedDataset::update);
}


@Test
public void testUpdateDatasetOfTypeFlowRetrievedFromListDatasets() {
// Given
wireMockRule.stubFor(WireMock.get(WireMock.urlEqualTo("/catalogs/common/datasets"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("dataset/multiple-dataset-response.json")));

wireMockRule.stubFor(WireMock.put(WireMock.urlEqualTo("/catalogs/common/datasets/SIF0001"))
.withRequestBody(equalToJson(TestUtils.loadJsonForIt("dataset/dataset-flow-SIF0001-update-request.json")))
.withHeader("Content-Type", WireMock.equalTo("application/json"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("dataset/dataset-create-response.json")));

Map<String, Dataset> datasets = getSdk().listDatasets("common", "SIF0001", true);
Dataset originalDataset = datasets.get("SIF0001");

// When
Dataset amendedDataset = originalDataset
.toBuilder()
.description("Updated Sample input flow description 1")
.build();

// When & Then
Assertions.assertDoesNotThrow(amendedDataset::update);
}

@Test
public void testDeleteDataset() {
// Given
Expand Down
Loading