Skip to content

Commit

Permalink
add flink stream base code
Browse files Browse the repository at this point in the history
  • Loading branch information
huzhanchi committed May 14, 2024
1 parent 9f9f2e2 commit 6e0012a
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 8 deletions.
38 changes: 36 additions & 2 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,50 @@ on chain realtime threat activity matching and off chain threat intelligence dat
code to illustrate how to match these activities patterns, and also figure out what the challenges we usually encounter.

# Problems
- Common challenges
- problems specific to blockchain
In the cyberspace, We just like in the game "Red Alert," the map is shrouded in a lot of fog, allowing us to see only
certain local areas. The connections between these areas are not clear. In the blockchain world, it is also not a trivial
to figure out the relationship between protocol, cex, dex, and the wallet address, and the multi public blockchains which
previous stuff based on.
![img.png](images/img.png)

- pre attack

Here we can generate Threat intelligence.

- attacking

It is easy to find a short path pattern to detect the attack.Maybe before or after a successful attack

- post attack

In the post stage, we are busy to response the attack,and explore the previous short path pattern alert and initial threat intelligence.


# The Concepts help to solve the problems

## Signal
- One shot signal
One shot signal is a no ordinary event, it is a little suspicious event, but it is not enough to trigger the alert.

- Behavior signal
Behavior signal represented a series of events, but it is also not enough to trigger the alert. Behavior signal maybe generated by
some statistical methods, like the frequency, etc.

All these two types of signals are not enough to trigger the alert, but they are the basic building blocks of the alert.
These signals will be thought of as the input of the complex event processing. Do the pattern matching and trigger the alert.

## complex event processing (CEP)
Utilize the CEP to analyze the event sequence and match the pattern. We also could use other techniques to achieve this like:
- lag or lead function provided by Flink SQL
- Multi self table join
I have done it in Dune Sql.
- Native flink stream api like `interaljon` or `coGroup`

However, CEP also provide the same feature, and it is more flexible and programmable.


# Case supported

- [x] 1. Head and End address similar Phishing scam


Binary file added images/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<testcontainers-java-module-ganache.version>0.0.4</testcontainers-java-module-ganache.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -79,6 +80,12 @@
<version>5.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.ganchix</groupId>
<artifactId>testcontainers-java-module-ganache</artifactId>
<version>${testcontainers-java-module-ganache.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
19 changes: 13 additions & 6 deletions src/main/java/com/oarmour/Main.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
package com.oarmour;

import com.oarmour.datasource.EthereumSource;
import com.oarmour.datasource.WrappedTransaction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.web3j.protocol.core.methods.response.Transaction;

public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create an instance of EthereumSource
EthereumSource source = new EthereumSource();

// Create a DataStream using the EthereumSource
DataStream<Transaction> stream = env.addSource(source);
stream.process(new ProcessFunction<Transaction, Object>() {
DataStream keyedStream = stream.process(new ProcessFunction<Transaction, WrappedTransaction>() {
@Override
public void processElement(Transaction value, ProcessFunction<Transaction, Object>.Context ctx, Collector<Object> out) throws Exception {
System.out.println(value.getFrom() + " -> " + value.getTo() + " : " + value.getValue());
public void processElement(Transaction value, ProcessFunction<Transaction, WrappedTransaction>.Context ctx, Collector<WrappedTransaction> out) throws Exception {
out.collect(WrappedTransaction.FromKey(value, value.getFrom()));
out.collect(WrappedTransaction.FromKey(value, value.getTo()));
}
}).print();
}).keyBy(tx -> tx.Key).process(new KeyedProcessFunction<String, WrappedTransaction, Object>() {
@Override
public void processElement(WrappedTransaction value, Context ctx, Collector<Object> out) throws Exception {
out.collect(value);
System.out.println(value);
}
});
env.execute();
}
}
12 changes: 12 additions & 0 deletions src/main/java/com/oarmour/datasource/EthereumSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,22 @@ public class EthereumSource extends RichSourceFunction<Transaction> {
Disposable disposable;
Web3j web3j;
Object waitLock;
String rpcUrl;

public EthereumSource() {
}

public EthereumSource(String rpcUrl) {
this.rpcUrl = rpcUrl;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (this.rpcUrl != null) {
web3j = Web3j.build(new HttpService(this.rpcUrl));
return;
}
web3j = Web3j.build(new HttpService("https://little-few-paper.quiknode.pro/b5d1d2678912de9078cba3c29d6180a685732418/"));
}

Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/oarmour/datasource/MockSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.oarmour.datasource;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.web3j.protocol.core.methods.response.Transaction;

public class MockSource {
public static DataStream<Transaction> MockTransactions(StreamExecutionEnvironment env) {
return env.fromData(
SimpleTransaction("0x123", "0x456", "1000", "0x", "0x"),
SimpleTransaction("0x456", "0x123", "1000", "0x","0x")
);
}

public static Transaction SimpleTransaction(String from, String to, String value, String input, String creates) {
Transaction tx = new Transaction();
tx.setFrom(from);
tx.setTo(to);
tx.setValue(value);
tx.setInput(input);
tx.setCreates(creates);
return tx;
}

public static String StringOfTX(Transaction tx) {
return String.format("from: %s, to: %s, value: %s, input: %s", tx.getFrom(), tx.getTo(), tx.getValue(), tx.getInput());
}
}
28 changes: 28 additions & 0 deletions src/main/java/com/oarmour/datasource/WrappedTransaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.oarmour.datasource;

import org.web3j.protocol.core.methods.response.Transaction;

public class WrappedTransaction {
public Transaction transaction;
public String Key;

public WrappedTransaction() {
}

public Transaction getTransaction() {
return transaction;
}

public WrappedTransaction(Transaction transaction, String Key) {
this.transaction = transaction;
this.Key = Key;
}
public static WrappedTransaction FromKey(Transaction transaction, String key) {
return new WrappedTransaction(transaction, key);
}

public String toString() {
return String.format("from: %s, to: %s, value: %s, input: %s createAt: %s", this.getTransaction().getFrom(), this.getTransaction().getTo(), this.getTransaction().getValue(), this.getTransaction().getInput(), this.getTransaction().getCreates());
}

}
31 changes: 31 additions & 0 deletions src/main/java/com/oarmour/pattern/PFishing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.oarmour.pattern;

import com.oarmour.datasource.WrappedTransaction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;

public class PFishing {
public static Pattern getPattern() {
return Pattern.<WrappedTransaction>begin("token-send")
.where(new IterativeCondition<WrappedTransaction>() {
@Override
public boolean filter(WrappedTransaction value, Context<WrappedTransaction> ctx) throws Exception {
if (value.getTransaction().getValue().longValue() > 1000L)
return true;
return false;
}
}).followedBy("poisoning-address").where(new IterativeCondition<WrappedTransaction>() {
@Override
public boolean filter(WrappedTransaction value, Context<WrappedTransaction> ctx) throws Exception {
if (value.transaction.getTo().equals("0x123") && value.getTransaction().getValue().longValue() == 0)
return true;
return false;
}
}).followedBy("fishing-success").where(new IterativeCondition<WrappedTransaction>() {
@Override
public boolean filter(WrappedTransaction value, Context<WrappedTransaction> ctx) throws Exception {
return false;
}
});
}
}
58 changes: 58 additions & 0 deletions src/test/java/com/oarmour/datasource/EthereumSourceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.oarmour.datasource;

import com.oarmour.pattern.PFishing;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;
import org.web3j.protocol.core.methods.response.Transaction;

import java.util.List;
import java.util.Map;

import static org.apache.flink.cep.CEP.pattern;

public class EthereumSourceTest {

StreamExecutionEnvironment env;
DataStream<Transaction> mockStream;
@Before
public void setUpClass() throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
mockStream = env.fromData(
MockSource.SimpleTransaction("0x123", "0x456", "1000", "0x", "0x"),
MockSource.SimpleTransaction("0x456", "0x123", "1000", "0x", "0x" )
);
}

@Test
public void testConnectStream() throws Exception {
DataStream keyedStream = mockStream.process(new ProcessFunction<Transaction, WrappedTransaction>() {
@Override
public void processElement(Transaction value, ProcessFunction<Transaction, WrappedTransaction>.Context ctx, Collector<WrappedTransaction> out) throws Exception {
out.collect(WrappedTransaction.FromKey(value, value.getFrom()));
out.collect(WrappedTransaction.FromKey(value, value.getTo()));
}
}).keyBy(tx -> tx.Key).process(new KeyedProcessFunction<String, WrappedTransaction, WrappedTransaction>() {
@Override
public void processElement(WrappedTransaction value, Context ctx, Collector<WrappedTransaction> out) throws Exception {
out.collect(value);
}
});


CEP.pattern(keyedStream, PFishing.getPattern()).select(new PatternSelectFunction<WrappedTransaction, String>() {
@Override
public String select(Map<String, List<WrappedTransaction>> pattern) throws Exception {
return "Fishing Alert";
}
}).print();
env.execute();

}
}
62 changes: 62 additions & 0 deletions src/test/java/com/oarmour/datasource/ProviderTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.oarmour.datasource;

import io.reactivex.disposables.Disposable;
import org.junit.Before;
import org.junit.Test;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;

public class ProviderTest {
Object waitLock;
Disposable disposable;
Web3j web3j;

@Before
public void setUpClass() throws Exception {
web3j = Web3j.build(new HttpService("https://little-few-paper.quiknode.pro/b5d1d2678912de9078cba3c29d6180a685732418/"));
}

@Test
public void testBlock() throws InterruptedException {
waitLock = new Object();
disposable = web3j.blockFlowable(true).subscribe(
blk -> {
System.out.println(blk.getBlock().getNumber());
},
error -> {
System.out.println(error.getMessage());
},
() -> {
System.out.println("complete");
}
);

while (!disposable.isDisposed()) {
synchronized (waitLock) {
waitLock.wait(100L);
}
}
}

@Test
public void testTransaction() throws InterruptedException {
waitLock = new Object();
disposable = web3j.transactionFlowable().subscribe(
tx -> {
System.out.println(tx.getFrom() + " -> " + tx.getTo() + " : " + tx.getValue());
},
error -> {
System.out.println(error.getMessage());
},
() -> {
System.out.println("complete");
}
);

while (!disposable.isDisposed()) {
synchronized (waitLock) {
waitLock.wait(100L);
}
}
}
}

0 comments on commit 6e0012a

Please sign in to comment.