目录

Flume

Flume

1 概述

  • 本文软体版本:
    • apache-flume-1.8.0-bin
    • kafka_2.11-0.11.0.1

Kafa将更新到另一篇文章.

2 实践

2.1 Flume data from log file To kafka (Acl)

#File ===> flume.conf
agent-se-prometheus-alertinfo-prod.sources = source1
agent-se-prometheus-alertinfo-prod.channels = channel1
agent-se-prometheus-alertinfo-prod.sinks = sink1

# source type command
agent-se-prometheus-alertinfo-prod.sources.source1.type = exec
agent-se-prometheus-alertinfo-prod.sources.source1.command = tail -F /data/logs/alertinfo.log


# link source to sink via channel
agent-se-prometheus-alertinfo-prod.sources.source1.channels = channel1
agent-se-prometheus-alertinfo-prod.sinks.sink1.channel = channel1


# channel configuration
agent-se-prometheus-alertinfo-prod.channels.channel1.type   = file
agent-se-prometheus-alertinfo-prod.channels.channel1.checkpointDir = /data/services/se-prometheus-alertinfo-prod/apache-flume-1.8.0-bin/channel1/checkpoint
agent-se-prometheus-alertinfo-prod.channels.channel1.dataDirs = /data/services/se-prometheus-alertinfo-prod/apache-flume-1.8.0-bin/channel1/data/
agent-se-prometheus-alertinfo-prod.channels.channel1.capacity = 100000
agent-se-prometheus-alertinfo-prod.channels.channel1.transactionCapacity = 100


# sinks  to kafka configuration
agent-se-prometheus-alertinfo-prod.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.topic = prometheus-alertinfo
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.bootstrap.servers = kafka01:9092,kafka02:9092,kafka03:9092
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.flumeBatchSize = 100
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.producer.acks = 1
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.producer.linger.ms = 1
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.producer.compression.type = snappy
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
agent-se-prometheus-alertinfo-prod.sinks.sink1.kafka.producer.sasl.mechanism = PLAIN  
#File ====> flume-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_181-amd64/

export JAVA_OPTS="$JAVA_OPTS -Xms100m -Xmx2000m -Djava.security.auth.login.config=/data/services/prometheus-alertinfo-prod/apache-flume-1.8.0-bin/conf/flume_jass.conf -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=6001 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

#File ===> flume_jass.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="xxxx"
    password="xxxx";
};
# File ===> log4j.properties

flume.root.logger=DEBUG,console
#flume.root.logger=INFO,LOGFILE
flume.log.dir=./logs
flume.log.file=flume.log

log4j.logger.org.apache.flume.lifecycle = INFO
log4j.logger.org.jboss = WARN
log4j.logger.org.mortbay = INFO
log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN
log4j.logger.org.apache.hadoop = INFO
log4j.logger.org.apache.hadoop.hive = ERROR

# Define the root logger to the system property "flume.root.logger".
log4j.rootLogger=${flume.root.logger}


# Stock log4j rolling file appender
# Default log rotation configuration
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=100MB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n


log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file}
log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd}
log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n


# console
# Add "console" to flume.root.logger above if you want to use this
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
#start command
nohup bin/flume-ng agent -n  agent-se-prometheus-alertinfo-prod  -c conf -f conf/flume.conf &> /dev/null &