前沿拓展:
inner join
where 是先做笛卡尔积,第二再通过where条件过滤。而inn谈西er join 是直接求交集了吧。
完整代码package day06;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
/**
* @program: bigData_learn
* @description: SELECT * FROM A INNER JOIN B WHERE A.id=B.id;
* @author: Mr.逗
* @create: 2021-09-24 16:59
**/
public class InnerJoin {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParalleli**(1);
DataStreamSource<Tuple2<String, Integer>> stream1 = env
.fromElements(
Tuple2.of("a", 1),
Tuple2.of("b", 2),
Tuple2.of("a", 2)
);
DataStreamSource<Tuple2<String, String>> stream2 = env
.fromElements(
Tuple2.of("a", "a"),
Tuple2.of("b", "b"),
Tuple2.of("a", "aaa")
);
SingleOutputStreamOperator<String> process = stream1.keyBy(v -> v.f0)
.connect(stream2.keyBy(v -> v.f0))
.process(new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, String>, String>() {
private ListState<Tuple2<String, Integer>> listState1;
private ListState<Tuple2<String, String>> listState2;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
listState1 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Integer>>("list1", Types.TUPLE(Types.STRING, Types.INT)));
listState2 = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, String>>("list2", Types.TUPLE(Types.STRING, Types.STRING)));
}
@Override
public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
listState1.add(value);
for (Tuple2<String, String> e : listState2.get()) {
out.collect(value + "=>" + e);
}
}
@Override
public void processElement2(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
listState2.add(value);
for (Tuple2<String, Integer> e : listState1.get()) {
out.collect(e + "=>" + value);
}
}
});
process.print();
String name = InnerJoin.class.getName();
try {
env.execute(name);
}catch (Exception e)
{
e.printStackTrace();
}
}
}结果展示(a,1)=>(a,a)
(b,2)=>(b,b)
(a,2)=>(a,a)
(a,1)=>(a,aaa)
(a,2)=>(a,aaa)
拓展知识:
原创文章,作者:九贤生活小编,如若转载,请注明出处:http://www.wangguangwei.com/100579.html