学习资料是官网文档 What is Apache Flink? ,简单了解下使用场景和原理。
用于处理在「一段时间内」逐渐产生的数据,即数据流,数据流中的单个数据称为事件/event。
处理流式数据有两种思路:
方式1存在的问题:
永续生成没有截止的数据,flink 将其称为「Unbounded streams」,与之相对的是「Bounded streams」,如下图所示:
flink 是一个专门用于处理流式数据的开发框架,同时支持 unbounded streams 和 bounded streams。
flink 可以自行管理服务器的资源,也可以部署到其它资源调度系统中,从第三方资源调度系统申请资源,支持以下系统:
flink 有三个基本概念:
开发者基于 flink 开发运行在 flink 上的流式处理应用,stream 是应用的输入,应用处理事件的中间态是 state(即有状态服务),开发者在应用代码中事件处理的时间策略。
整个 flink 就是围绕 state 构建的,简单说就是如何保持住中间结果。
事件到达应用的顺序和它的产生顺序可能不一致,并且事件产生和到达之间有时延,所以需要设置事件处理的时间策略。flink 支持两种时间策略:
方式1可以保证中间结果和实际情况一致,但是可能要过度等待,避免漏掉还在传输中的事件。
方式2收到事件时即处理,延迟低,但是中间输出的结果可能和实际不符。
为了协调方式1和方式2各自的优缺点,flink 提供了 Watermark Support 和 Late Data Handing。
flink 提供了三个层面的操作接口:
控制粒度最细的是 ProcessFunction,即编写事件的处理代码,直接操作到达的事件。
其次是 DataStream API,DataStream API 提供了一些汇聚函数。
最后是 SQL & Table API,提供类似 SQL 的操作接口 。
/**
* Matches keyed START and END events and computes the difference between
* both elements' timestamps. The first String field is the key attribute,
* the second String attribute marks START and END events.
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<Long> startTime;
@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}
/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}
/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {
// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}
}
// a stream of website clicks
DataStream<Click> clicks = ...
DataStream<Tuple2<String, Long>> result = clicks
// project clicks to userId and add a 1 for counting
.map(
// define function by implementing the MapFunction interface.
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// key by userId (field 0)
.keyBy(0)
// define session window with 30 minute gap
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// count clicks per session. Define function as lambda function.
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
这里只简单了解下 flink 是干嘛的,至于怎么搭建、怎么使用,使用时注意些什么,以后有时间再研究。