Flink 1.11 日志该如何配置?

Flink 1.11 日志升级到了 Log4j2,并且 Web UI 增强了功能。

在 Flink 1.11 之前,Flink 使用的日志是 Log4j,配置文件 log4j.properties 中的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

该配置文件会将 JobManager 和 TaskManager 的日志分别打印在不同的文件中,每个文件的日志大小一直会增加,如果想配置日志文件按大小滚动的话可以使用 RollingFileAppender,则要将配置文件改成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, RFA

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO


log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${log.file}
log4j.appender.RFA.MaxFileSize=256MB
log4j.appender.RFA.Append=true
log4j.appender.RFA.MaxBackupIndex=10
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %t %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, RFA

为什么要在生产环境下将日志文件改成按照大小滚动呢?

无非是在生产情况下,流数据是非常大的,有的时候自己可能会通过 print() 打印出来流数据进来验证结果,有的时候可能是打印的日志记录做 debug 用,然后到生产忘记关了,结果到生产就全部将流数据打印出来,这种情况下,就会导致 TaskManager 的日志文件会非常大,那么我们打开 Web UI 查看可能就会很卡,这也就是为啥我们有时候打开 Web UI 查看日志的时候,非常卡顿,加载不出来的原因了,主要原因就是日志文件太大导致的。

当然有的同学可能会想着将 Flink 作业的日志发到 Kafka 做统一的收集,然后做一些日志分析告警和再消费发到 ElasticSearch 等去做日志搜索,如果是发到 Kafka,可以使用 KafkaLog4jAppender,日志文件配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, kafka


# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO


# log send to kafka
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.topic=flink_logs
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.requiredNumAcks=0
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.kafka.level=INFO


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, kafka

在 Flink 1.11 中,将 Log4j 升级到了 Log4j2,可以通过查看 FLINK-15672 可以知道变更的文件很多,

新版本的日志配置文件还是叫 log4j.properties,配置如下所示,不清楚为啥 FLINK-15672 代码提交中改了很多地方的配置文件名为 log4j2.properties,但是 Flink 最后打包的 conf 目录下还是保持和之前一样的文件名,按道理不是也应该进行更改成 log4j2.properties 的吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = File
appender.main.append = false
appender.main.fileName = ${sys:log.file}
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

默认这个配置也是不会对日志文件进行按照大小滚动的,那么如果我们要保持和之前的效果一样(按照日志大小滚动日志文件)该怎么做呢?你可以更改配置文件的内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 200MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

如果你升级了到 1.11 版本,不能继续延用之前 1.11 之前的配置的那种日志文件滚动的配置了,需要做点变更。上面配置的表示日志文件以每隔 200MB 会进行切分,然后日志切分后的文件名是 ${sys:log.file}.%i,以数字结尾。

1.11 进行了配置厚度的效果如下图所示:

TaskManager 日志列表展示

JobManager 日志列表展示

从上面图中可以发现 1.11 对于查看日志比之前友好了不少,多个日志文件都有列表展示,而不再是之前只能查看单个日志文件了。

注:为了演示日志文件滚动的效果,测试的时候设置的日志 appender.rolling.policies.size.size 是 1KB

关注我

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号(zhisheng)了,你可以回复关键字:Flink 即可无条件获取到。另外也可以加我微信 你可以加我的微信:yuanblog_tzs,探讨技术!

更多私密资料请加入知识星球!

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. Flink 1.11 之前
  2. 2. Flink 1.11
  3. 3. 关注我
  4. 4. Github 代码仓库