Flink Streaming Source Code Trace Introduction

  • 根據Flink bash script最後一行
    1
    exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

當執行Flink bash script時,會使用Java(及為$JAVA_RUN)調用org.apache.flink.client.cli.CliFrontend,並將使用者的輸入(及為$@)傳遞給前者,示例中的使用者輸入為run examples/streaming/SocketWindowWordCount.jar –port 9000

SocketWindowWordCount.jar

Anatomy of a Flink Program中提到,每個Flink程式都包含幾個相同的基礎部分

  • Obtain an execution environment
  • Load/create the initial data
  • Specify transformations on this data
  • Specify where to put the results of your computations
  • Trigger the program execution

SocketWindowWordCount.java @org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java 示例中便很好的體現的這幾個部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static void main(String[] args) throws Exception {
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
}
// Obtain an execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Load/create the initial data
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// Specify transformations on this data
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// Specify where to put the results of your computations
windowCounts.print().setParallelism(1);
// Trigger the program execution
env.execute("Socket Window WordCount");
}