Flume官方Static拦截器如需设置多组键值:
agent.sources.r.interceptors = i1 i2 i3 agent.sources.r.interceptors.i1.type
=
static
agent.sources.r.interceptors.i1.key = country agent.sources.r.interceptors.i1.
value
= china agent.sources.r.interceptors.i2.type =
static
agent.sources.r.interceptors.i2.key = province agent.sources.r.interceptors.i2.
value
= fujian agent.sources.r.interceptors.i3.type =
static
agent.sources.r.interceptors.i3.key = city agent.sources.r.interceptors.i3.
value
= xiamen
Query拦截器(Query Interceptor)的等效配置:
agent.sources.r.interceptors = i agent.sources.r.interceptors.i.type =
QueryInterceptor$Builder agent.sources.r.interceptors.i.query = country=china&province=fujian&city=xiamen
参数 | 说明 |
---|---|
type | 固定值:QueryInterceptor$Builder |
query | 格式:key1=val1&key2=val2&key3=val3 |
pom.xml中加入Flume依赖:
<
dependencies
>
<
dependency
>
<
groupId
>
org.apache.flume
</
groupId
>
<
artifactId
>
flume-ng-core
</
artifactId
>
<
version
>
1.7.0
</
version
>
</
dependency
>
</
dependencies
>
自定义拦截器实现 org.apache.flume.interceptor.Interceptor 接口:
import
java.util.Map;
import
java.util.List;
import
org.apache.flume.Event;
import
org.apache.flume.Context;
import
org.apache.flume.interceptor.Interceptor;
public
class
QueryInterceptor
implements
Interceptor
{
private
final
String query;
//定义key的名称为query
public
static
class
Constants
{
public
static
final
String KEY =
"query"
; }
//Builder类(用于Flume获取拦截器实例)
public
static
class
Builder
implements
Interceptor
.
Builder
{
private
String query;
//返回拦截器实例
public
Interceptor
build
()
{
return
new
QueryInterceptor(query); }
//读取Flume配置
public
void
configure
(Context context)
{ query = context.getString(Constants.KEY);
//读取query的值
} }
//得到Builder类传来的query值
private
QueryInterceptor
(String query)
{
this
.query = query; }
//初始化操作(无)
public
void
initialize
()
{ }
//遍历Events并逐个调用intercept方法
public
List<Event>
intercept
(List<Event> events)
{
for
(Event e : events) { intercept(e); }
return
events; }
//解析query数据并放入header
public
Event
intercept
(Event event)
{ Map<String, String> headers = event.getHeaders(); String[] params
= query.split(
"&"
);
for
(String param : params) { String[] kv = param.split(
"="
);
if
(kv.length ==
2
) { headers.put(kv[
0
], kv[
1
]); } }
return
event; }
//关闭时的操作(无)
public
void
close
()
{ } }
编译项目生成jar包(依赖不用打包),把jar包放进$FLUME_HOME/plugins.d/QueryInterceptor/lib目录(需创建)
自定义拦截器的type项配置如果有package包名也需写上,并用$分割加上自定义的Builder类名:
# source
agent.sources = r agent.sources.r.type =
exec
agent.sources.r.command = tail -F /tmp/test.log agent.sources.r.interceptors
= i agent.sources.r.interceptors.i.type = QueryInterceptor$Builder agent.sources.r.interceptors.i.query
= country=china&province=fujian&city=xiamen agent.sources.r.channels
= c
# channel
agent.channels = c agent.channels.c.type = memory
# sink
agent.sinks = k agent.sinks.k.channel = c agent.sinks.k.type = logger
启动Flume时设置-DFlume.root.logger参数:
$FLUME_HOME/bin/flume-ng agent -c
<
Flume配置目录
>
-f
<
配置文件路径
>
-n agent -Dflume.root.logger=INFO,console
往/tmp/test.log文件写数据,在控制台可以看到如下信息:
...
Event
: {
headers
:{country=china, province=fujian, city=xiamen}
body
: ...