springboot使用logback记录日志到kafka

引言

本文是ELK整合的一部分。流程为:应用程序日志–>kafka–>logstash–>es–>kibana。

这里使用springboot+logback将日志写入到kafka。

使用logback记录日志到kafka

由于springboot自带有logback的依赖,所以我们准备一个logback-spring.xml的配置文件即可,application.yml也无需任何配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<contextName>springboot-demo</contextName>

<!-- 定义日志文件的存储地址,勿在 LogBack 的配置中使用相对路径 -->
<property name="LOG_PATH" value="d:/logs/" />

<!--输出到控制台-->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<!-- 滚动日志 -->
<appender name="ROLLING_FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--日志文件输出的文件名 -->
<FileNamePattern>${LOG_PATH}/springboot-demo.%d.%i.log
</FileNamePattern>
<MaxHistory>100</MaxHistory>
<TimeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<MaxFileSize>50MB</MaxFileSize>
</TimeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50}-%msg%n</pattern>
</encoder>
</appender>
<!-- This is the kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<topic>app-log</topic>
<!-- we don't care how the log messages will be partitioned -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />

<!-- use async delivery. the application threads are not blocked by logging -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=192.168.193.100:9092</producerConfig>
<!-- don't wait for a broker to ack the reception of a batch. -->
<producerConfig>acks=0</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
<!-- define a client-id that you use to identify yourself against the kafka broker -->
<producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig>

</appender>

<!-- 日志输出级别 ERROR,WARN,INFO,DEBUG -->
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="ROLLING_FILE" />
<appender-ref ref="kafkaAppender" />
</root>

</configuration>

这里主要关注kafkaAppender这一部分,这里使用了logback-kafka-appender这个依赖。

maven依赖:

1
2
3
4
5
6
7
8
9
10
11
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--logback-kafka-appender依赖-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
</dependency>

kafka的配置(application.properties):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.193.100:9092

#=============== provider =======================

spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

记录10条日志,看是否发送到了kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 通过logback appender来讲日志发送到kafka
*/
private static void sendKafkaLogByLogbackAppender() {
Gson gson = new GsonBuilder().create();
IntStream.rangeClosed(1,10).forEach(i->{
Message message = new Message();
message.setId(System.currentTimeMillis());
String name = Math.random() < 0.5 ? "zhangsan "+i:"李四 "+i;
User user = User.builder().id(i).name(name).birthday(new Date()).build();
message.setMsg(user);
message.setSendTime(new Date());
String msg = gson.toJson(message);
log.info(msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

由于我们使用了kibana,所以直接在kibana查看即可(注意在logstash配置从kafka接受数据)。

1564020034801

Donny wechat
欢迎关注我的个人公众号
打赏,是超越赞的一种表达。
Show comments from Gitment