Flume Query拦截器组件

发布时间: 2017-11-13热度: 1933

拦截器效果

Flume官方Static拦截器如需设置多组键值:

    
        agent.sources.r.interceptors = i1 i2 i3 agent.sources.r.interceptors.i1.type
        =
        
            static
        
        agent.sources.r.interceptors.i1.key = country agent.sources.r.interceptors.i1.
        
            value
        
        = china agent.sources.r.interceptors.i2.type =
        
            static
        
        agent.sources.r.interceptors.i2.key = province agent.sources.r.interceptors.i2.
        
            value
        
        = fujian agent.sources.r.interceptors.i3.type =
        
            static
        
        agent.sources.r.interceptors.i3.key = city agent.sources.r.interceptors.i3.
        
            value
        
        = xiamen
    

Query拦截器(Query Interceptor)的等效配置:

    
        agent.sources.r.interceptors = i agent.sources.r.interceptors.i.type =
        QueryInterceptor$Builder agent.sources.r.interceptors.i.query = country=china&province=fujian&city=xiamen
    

拦截器配置

参数说明
type固定值:QueryInterceptor$Builder
query格式:key1=val1&key2=val2&key3=val3

创建Maven项目

pom.xml中加入Flume依赖:

    
        
            <
            
                dependencies
            
            >
        
        
            <
            
                dependency
            
            >
        
        
            <
            
                groupId
            
            >
        
        org.apache.flume
        
            </
            
                groupId
            
            >
        
        
            <
            
                artifactId
            
            >
        
        flume-ng-core
        
            </
            
                artifactId
            
            >
        
        
            <
            
                version
            
            >
        
        1.7.0
        
            </
            
                version
            
            >
        
        
            </
            
                dependency
            
            >
        
        
            </
            
                dependencies
            
            >
        
    

开发拦截器

自定义拦截器实现 org.apache.flume.interceptor.Interceptor 接口:

    
        
            import
        
        java.util.Map;
        
            import
        
        java.util.List;
        
            import
        
        org.apache.flume.Event;
        
            import
        
        org.apache.flume.Context;
        
            import
        
        org.apache.flume.interceptor.Interceptor;
        
            public
        
        
            
                class
            
            
                QueryInterceptor
            
            
                implements
            
            
                Interceptor
            
        
        {
        
            private
        
        
            final
        
        String query;
        
            //定义key的名称为query
        
        
            public
        
        
            static
        
        
            
                class
            
            
                Constants
            
        
        {
        
            public
        
        
            static
        
        
            final
        
        String KEY =
        
            "query"
        
        ; }
        
            //Builder类(用于Flume获取拦截器实例)
        
        
            public
        
        
            static
        
        
            
                class
            
            
                Builder
            
            
                implements
            
            
                Interceptor
            
            .
            
                Builder
            
        
        {
        
            private
        
        String query;
        
            //返回拦截器实例
        
        
            
                public
            
            Interceptor
            
                build
            
            
                ()
            
        
        {
        
            return
        
        
            new
        
        QueryInterceptor(query); }
        
            //读取Flume配置
        
        
            
                public
            
            
                void
            
            
                configure
            
            
                (Context context)
            
        
        { query = context.getString(Constants.KEY);
        
            //读取query的值
        
        } }
        
            //得到Builder类传来的query值
        
        
            
                private
            
            
                QueryInterceptor
            
            
                (String query)
            
        
        {
        
            this
        
        .query = query; }
        
            //初始化操作(无)
        
        
            
                public
            
            
                void
            
            
                initialize
            
            
                ()
            
        
        { }
        
            //遍历Events并逐个调用intercept方法
        
        
            
                public
            
            List<Event>
            
                intercept
            
            
                (List<Event> events)
            
        
        {
        
            for
        
        (Event e : events) { intercept(e); }
        
            return
        
        events; }
        
            //解析query数据并放入header
        
        
            
                public
            
            Event
            
                intercept
            
            
                (Event event)
            
        
        { Map<String, String> headers = event.getHeaders(); String[] params
        = query.split(
        
            "&"
        
        );
        
            for
        
        (String param : params) { String[] kv = param.split(
        
            "="
        
        );
        
            if
        
        (kv.length ==
        
            2
        
        ) { headers.put(kv[
        
            0
        
        ], kv[
        
            1
        
        ]); } }
        
            return
        
        event; }
        
            //关闭时的操作(无)
        
        
            
                public
            
            
                void
            
            
                close
            
            
                ()
            
        
        { } }
    

启用拦截器

生成Jar包

编译项目生成jar包(依赖不用打包),把jar包放进$FLUME_HOME/plugins.d/QueryInterceptor/lib目录(需创建)

Flume配置

自定义拦截器的type项配置如果有package包名也需写上,并用$分割加上自定义的Builder类名:

    
        
            # source
        
        agent.sources = r agent.sources.r.type =
        
            exec
        
        agent.sources.r.command = tail -F /tmp/test.log agent.sources.r.interceptors
        = i agent.sources.r.interceptors.i.type = QueryInterceptor$Builder agent.sources.r.interceptors.i.query
        = country=china&province=fujian&city=xiamen agent.sources.r.channels
        = c
        
            # channel
        
        agent.channels = c agent.channels.c.type = memory
        
            # sink
        
        agent.sinks = k agent.sinks.k.channel = c agent.sinks.k.type = logger
    

启动Flume

启动Flume时设置-DFlume.root.logger参数:

    
        $FLUME_HOME/bin/flume-ng agent -c
        
            <
            
                Flume配置目录
            
            >
        
        -f
        
            <
            
                配置文件路径
            
            >
        
        -n agent -Dflume.root.logger=INFO,console
    

往/tmp/test.log文件写数据,在控制台可以看到如下信息:

    
        ...
        
            Event
        
        : {
        
            headers
        
        :{country=china, province=fujian, city=xiamen}
        
            body
        
        : ...
    

请在下方留下您的评论.加入TG吹水群