Flume 实战(2)--Flume-ng-sdk源码分析_lskyne的博客-程序员资料

技术标签: Flume  

转自:http://www.cnblogs.com/mumuxinfei/p/3823266.html

具体参考: 官方用户手册和开发指南

http://flume.apache.org/FlumeDeveloperGuide.html

*) 定位和简单例子

1). Flume-ng-sdk是用于编写往flume agent发送数据的client sdk
2). 简单示例

RpcClient client = null;
try {
  client = RpcClientFactory.getDefaultInstance("127.0.0.1",41414);
  Event event = EventBuilder.withBody("hello flume", Charset.forName("UTF-8"));
  client.append(event);
}catch (EventDeliveryException e) {
  e.printStackTrace();
}finally {
  if ( client != null ) {
    client.close();
  }
}
*) Event设计和类层次结构

1. Event类设计
在Flume中Event是个接口类

1
2
3
4
5
6
public interface Event {
  public Map<String, String> getHeaders();
  public void setHeaders(Map<String, String> headers);
  public byte[] getBody();
  public void setBody(byte[] body);
}


由代码可得, Event由Header集合和消息负载两部分构成.

2. Builder设计模式
在org.apache.flume.event下, 有两个Event的具体实现类: SimpleEvent, JSonEvent.
EventBuilder类顾名思义, 采用Builder的方式来组装对象的成员, 并产生最终的对象.

public class EventBuilder {
 
  public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();
    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);
    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }
    return event;
  }
 
  public static Event withBody(byte[] body) {
    return withBody(body,null);
  }
 
  public static Event withBody(String body, Charset charset,
      Map<String, String> headers) {
    return withBody(body.getBytes(charset), headers);
  }
 
  public static Event withBody(String body, Charset charset) {
    return withBody(body, charset, null);
  }
 
}

java的访问控制符: public/default/protected/private, default表示同package可见
不过另人意外的是, 其对应的SimpleEvent的构造函数的修饰符是public, 即不是default, 也不是protected, 这点让EventBuilder的引入有些失败.把Builder模式, 用到极致的是Google Protocol Buffer(java), 其每个PB对象, 都是用相应的Builder类来组装和生成. 采用这种Builder模式的用途是, 把一个对象元素的修改和读取彻底分离, 使得一个PB对象,从诞生后就是一个immutable对象, 只能读取其属性信息, 而不能修改其属性.

*) RpcClient设计和类层次结构

1. RpcClient的接口定义:

public interface RpcClient {
  public int getBatchSize();
    public void append(Event event) throws EventDeliveryException;
    public void appendBatch(List<Event> events) throws EventDeliveryException;
    public boolean isActive();
    public void close()throws FlumeException;
}
2. AbstractRpcClient的抽象类定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractRpcClientimplements RpcClient {
 
    protected int batchSize = RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
    protected long connectTimeout = RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
    protected long requestTimeout = RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
 
    @Override
    public int getBatchSize(){
        return batchSize;
    }
 
    protected abstract void configure(Properties properties) throws FlumeException;
 
}


新增了一些常量定义, 和新的抽象函数configure(Properties prop);

3. RpcClient工厂类的使用
RpcClientFactory的定义

public class RpcClientFactory {
 
    public static RpcClient getInstance(Properties properties) throws FlumeException {
        // 1). 获取具体rpcclient的类型信息
        properties.getProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE);
        // 2). 利用反射,获取类的class
        Class<?extends AbstractRpcClient> clazz = (Class<? extends AbstractRpcClient>) Class.forName(...);
        // 3). 产生类对象
        RpcClient client = clazz.newInstance();
        // 4). 进行具体rpcclient实例的配置初始化
        client.configure(properties);
        // 5). 返回对象
        return client;
    }
 
}


RpcClientFactory借助静态方法getInstance, 其依据Properties里的相应key/value来, 来产生不同的对象实例, 配置不同的属性. 同时RpcClient的具体实例, 其构造方法的访问限定符都是protected, 这一点做的, 比之前EventBuilder设计和实现要规范和清晰.

clazz = Class.forName(...);
client = class.newInstance();
client.configure(...);

是种非常好的实践代码, 把面向对象的多态性用到极致

4. 具体的RpcClient类的实现
其SDK提供了两大类, 具体的实现类ThriftRpcClient和AvroRpcClient
4.1. 对ThriftRpcClient的解读
4.1.1 thrift idl的定义
idl文件(src/main/thrift/flume.thrift)的定义

namespace java org.apache.flume.thrift
 
struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}
 
enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}
 
service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),
}


分别对应源码包org.apache.flume.thrift下

Status, ThriftFlumeEvent, ThriftSourceProtocol类
4.1.2 ThriftRpcClient的实现
ThriftRpcClient并不是简单对ThriftSourceProtocol的客户端的简单封装

1
2
3
4
5
6
public class ThriftRpcClientextends AbstractRpcClient {
  private ConnectionPoolManager connectionManager;
  private final ExecutorService callTimeoutPool;
  private final AtomicLong threadCounter;
  // ......
}

评注: 粗略观察其类成员, 其借助线程池(ExecutorService)和连接池(ConnectionManager)管理, 来实现RpcClient的发送接口, 这样append(), appendBatch()的接口都是线程安全的, 该客户端的实例能用于多线程并发使用.

AvroRpcClient代码结构差不多, 先一笔带过.

5. 两个重要的实现类
FailOverRpcClient的源码解析:
这边采用装饰模式(Decorator Pattern), FailOverRpcClient继承自RpcClient, 同时又拥有实际的RpcClient实例, 只是在实际RpcClient基础上, 添加了失败后重试的能力.

FailOver是失败后重试的机制, 通常借助带尝试次数的重试来实现
其append(Event e)方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int tries = 0;
while (tries < maxTries) {
  try {
    tries++;
    localClient = getClient();
    localClient.append(event);
    return;
  }catch (EventDeliveryException e) {
    localClient.close();
    localClient = null;
  }catch (Exception e2) {
    throw new EventDeliveryException(
        "Failed to send event. Exception follows: ", e2);
   }
}

这段代码采用相对简单的.

getNextClient()的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for (int count = lastCheckedhost + 1; count < limit; count++) {
  HostInfo hostInfo = hosts.get(count);
  try {
    setDefaultProperties(hostInfo, props);
    localClient = RpcClientFactory.getInstance(props);
    lastCheckedhost = count;
    return localClient;
  }catch (FlumeException e) {
    logger.info("Could not connect to " + hostInfo, e);
    continue;
  }
}
for(int count = 0; count <= lastCheckedhost; count++) {
  HostInfo hostInfo = hosts.get(count);
  try {
    setDefaultProperties(hostInfo, props);
    localClient = RpcClientFactory.getInstance(props);
    lastCheckedhost = count;
    return localClient;
  }catch (FlumeException e) {
    logger.info("Could not connect to " + hostInfo, e);
    continue;
  }
}

HostInfo封装了一个远端服务的ip地址
FailOver简单的轮询了各个服务地址.

LoadBalancingRpcClient的源码解析:
LoadBalancingRpcClient顾名思义, 采用负载均衡的策略来实现, 其还是采用遍历(轮询/随机)+反馈的机制, 来动态的调整服务列表的候选顺序.
在append(Event)方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Iterator<HostInfo> it = selector.createHostIterator();
while (it.hasNext()) {
  HostInfo host = it.next();
  try {
    RpcClient client = getClient(host);
    client.append(event);
    eventSent = true;
    break;
  }catch (Exception ex) {
    selector.informFailure(host);
    LOGGER.warn("Failed to send event to host " + host, ex);
  }
}
if (!eventSent) {
  throw new EventDeliveryException("Unable to send event to any host");
}

selector.createHostIterator() 创建当前服务候选列表的一个快照, 同时递进一个轮询单元.
selector.informFailure(host) 是对失败的服务进行降级处理

而HostSelector接口定义如下:

1
2
3
4
5
public interface HostSelector {
  void setHosts(List<HostInfo> hosts);
  Iterator<HostInfo> createHostIterator();
  void informFailure(HostInfo failedHost);
}

其具体实现类

#). RoundRobinHostSelector, 借助轮询的方式来实现
#). RandomOrderHostSelector, 借助随机的方式来实现
这两个类, 都是借助OrderSelector<T>的实现类来实现, OrderSelector封装了对错误服务机器列表的屏蔽策略
该屏蔽策略如下所示:
失败一次, 设置一个恢复时间点, 未到该恢复时间点, 则不允许获取该机器ip/port
同时为了惩罚多次失败, 减少获取该服务机器的ip/port, 采用1000 * (1 << sequentialFails), 连续失败次数, 其恢复时间的间隔要加大.

*) Properties的属性配置
基本的属性配置

1
2
3
4
5
6
client.type = default (for avro) or thrift (for thrift)
hosts = h1 # default client accepts only 1 host
hosts.h1 = host1.example.org:41414 # host and port must both be specified
batch-size = 100 # Must be >=1 (default:100)
connect-timeout = 20000 # Must be >=1000 (default:20000)
request-timeout = 20000 # Must be >=1000 (default:20000)

FailOver支持的配置

1
2
3
client.type = default_failover
hosts = h1 h2 h3 # at least one is required, but 2 or more makes better sense
max-attempts = 3 # Must be >=0 (default: number of hosts

Balancing支持的配置

1
2
3
4
5
client.type = default_loadbalance
hosts = h1 h2 h3 # At least 2 hosts are required
backoff = false # Specifies whether the client should back-off from a failed host
maxBackoff = 0 # Max timeout in millis
host-selector = round_robin # The host selection strategy used

*) 异常类定义

EventDeliveryException和FlumeException


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lskyne/article/details/38708313

智能推荐

在RT-Thread中给STM32F413添加CAN3_stm32f413例程_linhongshou的博客-程序员资料

在RT-Thread中给STM32F413添加CAN3简介目前最新的RT-Thread v4.0.2并不能直接支持STM32F413的CAN3,从配置项添加的CAN驱动只能够支持CAN1 / CAN2,而F413的CAN3需要我们手动添加。创建STM32F413工程首先新建一个工程stm32f413-xxx-xxx复制模板代码bsp\stm32\libraries\templates\stm32f4xx,在《bsp\stm32\docs\STM32系列BSP制作教程.md》文件中有介绍在bsp\

回文数与进制转换-课程作业_阿腾木的博客-程序员资料

题目描述我们把从左往右和从右往左念起来相同的数字叫做回文数。例如,75457就是一个回文数。当然某个数用某个进制表示不是回文数,但是用别的进制表示可能就是回文数。例如,17是用十进制表示的数,显然它不是一个回文数,但是将17用二进制表示出来是10001,显然在二进制下它是一个回文数。现在给你一个用十进制表示的数,请你判断它在2~16进制下是否是回文数。输入输入包含多组测试数据。每组输入一个用十进制表示的正整数n(0&lt;n&lt;50000),当n=0时,输入结束。输出对于每组输入,如果n

Dive into Deep Learning - 动手学深度学习_Yongqiang Cheng的博客-程序员资料

Dive into Deep Learning - 动手学深度学习Dive into Deep Learninghttps://d2l.ai/https://en.d2l.ai/d2l-en.pdf动手学深度学习https://zh.d2l.ai/https://zh.d2l.ai/d2l-zh.pdfDive into Deep Learning - GitHubhttps://github.com/d2l-ai/d2l-en动手学深度学习 - GitHubhttps://github

MFC绘图(转载)_mfc创建分布图_sufwei的博客-程序员资料

 1 几何对象的结构和类为了使用绘图函数,应该先了解绘图所用到的几种表示几何对象的结构和类。这些结构和类分别定义在头文件windef.h和afxwin.h中。1.点1)点结构POINT点数据结构POINT用来表示一点的x、y坐标:typedef struct tagPOINT {     LONG x;     LONG y; } POINT;2)点类CPo

使用eclipse练习java_浪里小黑狼的博客-程序员资料

package E;public class E { public static void main(String args[]) { char a1='十',a2='点',a3='进',a4='攻'; char secret='A'; a1=(char)(a1^secret); a2=(char)(a2^secret); a3=(char)...

随便推点

中英文维基百科语料上的Word2Vec实验_chvalrous的博客-程序员资料

最近试了一下Word2Vec, GloVe 以及对应的python版本 gensim word2vec 和 python-glove,就有心在一个更大规模的语料上测试一下,自然而然维基百科的语料进入了视线。维基百科官方提供了一个很好的维基百科数据源:https://dumps.wikimedia.org,可以方便的下载多种语言多种格式的维基百科数据。此前通过gensim的玩过英文的维基百科语料

通达OA 2011-2013 通杀GETSHELL_god_7z1的博客-程序员资料

转载自T00ls此程序应用于非常多的政府机构, 教育机构, 以及各大一流公司(中国电信等)!请各位看完本文后不要试图对任何使用本程序的网站进行破坏攻击入侵等…本人发此贴纯属技术交流探讨, 并无怂恿别人去尝试攻击的意思!若不接受以上条件的请自觉关闭, 由此产生的后果与作者(本人)无关!EXP:第一步:1[GET]http://sit

第二章_pingguo0123的博客-程序员资料

第 2 章 程序的版式 版式虽然不会影响程序的功能,但会影响可读性。程序的版式追求清晰、美观,是程序风格的重要构成因素。 可以把程序的版式比喻为“书法”。好的“书法”可让人对程序一目了然,看得兴致勃勃。差的程序“书法”如螃蟹爬行,让人看得索然无味,更令维护者烦恼有加。请程序员们学习程序的“书法”,弥补大学计算机教育的漏洞,实在很有必要。 2.1 空行 空行起着分隔程序段落的作用。空行得体(不过多也不过少)将使程序的布局更加清晰。空行不会浪费内存,虽然打印含有空行的程序是会多消耗一些纸张,但是值得。所以不要舍

Tensorboard:PermissionError: [Errno 13] Permission denied: ‘/tmp/.tensorboard-info/pid-46614.info‘_时光碎了天的博客-程序员资料

在使用Tensorboard过程中,突然遇到报错现象:PermissionError: [Errno 13] Permission denied: '/tmp/.tensorboard-info/pid-46614.info' 解决办法: 直接在linux命令行输入两行命令串:export TMPDIR=/tmp/$USER;mkdir -p $TMPDIR; 然后就可以正常 tensorboard --logdir=文件夹名称...

vmvare搭建k8s集群_wmware 搭建k8s集群_喜欢coding的谢同学的博客-程序员资料

文章目录虚拟机安装新建虚拟机centos安装环境准备子网组建宿主机网卡配置虚拟机网卡配置集群搭建基础配置kube相关配置虚拟机安装新建虚拟机自定义安装就用默认空白光盘,然后选自己将用的版本;建议centos,而且得centos7以上,我用的是CentOS-7-x86_64-Minimal-1810.iso如果上面搜狐镜像的链接失效了,点击进入另一篇博客,选一个就行。虚拟机名称自定义...

MySQL数学函数大全_林愿星的博客-程序员资料

ABS(N) 返回N的绝对值mysql> select ABS(2);   -> 2 mysql> select ABS(-32);   -> 32 SIGN(N) 返回参数的符号(为-1、0或1)mysql> select SIGN(-32);   -> -1 mysql> select SIGN(0);   -> 0 mysql> select SIGN(234);   -> 1 MOD(N,