使用Pulsar替换Kafka

This commit is contained in:
wangxiaoyang 2022-02-12 20:46:11 +08:00
parent b94746a2c9
commit e5c324598a
9 changed files with 147 additions and 52 deletions

View File

@ -30,8 +30,9 @@
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
@ -45,6 +46,11 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,62 @@
package cn.linter.oasys.chat.config;
import cn.linter.oasys.chat.handler.ChatMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author wangxiaoyang
*/
@Slf4j
@Configuration
public class PulsarConfig {
@Value("${pulsar.host}")
private String pulsarHost;
@Value("${pulsar.topic}")
private String pulsarTopic;
@Value("${pulsar.subscription}")
private String pulsarSubscription;
@Bean
public PulsarClient pulsarClient() {
try {
return PulsarClient.builder()
.serviceUrl("pulsar://" + pulsarHost)
.build();
} catch (PulsarClientException e) {
log.error("Pulsar连接失败", e);
return null;
}
}
@Bean
public Producer<String> producer(PulsarClient pulsarClient) {
try {
return pulsarClient.newProducer(Schema.STRING)
.topic(pulsarTopic)
.create();
} catch (PulsarClientException e) {
log.error("Pulsar生产者创建失败", e);
return null;
}
}
@Bean
public Consumer<String> consumer(PulsarClient pulsarClient, ChatMessageListener messageListener) {
try {
return pulsarClient.newConsumer(Schema.STRING)
.topic(pulsarTopic)
.subscriptionName(pulsarSubscription)
.messageListener(messageListener)
.subscribe();
} catch (PulsarClientException e) {
log.error("Pulsar生产者创建失败", e);
return null;
}
}
}

View File

@ -0,0 +1,41 @@
package cn.linter.oasys.chat.handler;
import cn.linter.oasys.chat.container.SessionContainer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import java.io.IOException;
/**
* 消息监听器
*
* @author wangxiaoyang
* @since 2021/1/2
*/
@Slf4j
@Component
public class ChatMessageListener implements MessageListener<String> {
@Override
public void received(Consumer<String> consumer, Message<String> msg) {
TextMessage textMessage = new TextMessage(msg.getValue());
SessionContainer.values().forEach(session -> {
try {
session.sendMessage(textMessage);
} catch (IOException e) {
log.error("消息发送失败", e);
}
});
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
log.error("消息应答失败", e);
}
}
}

View File

@ -5,7 +5,7 @@ import cn.linter.oasys.chat.repository.MessageRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.apache.pulsar.client.api.Producer;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@ -19,15 +19,15 @@ import java.util.Map;
*/
@Slf4j
@Component
public class MessagePublisher {
public class ChatMessagePublisher {
private final MessageRepository messageRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final Producer<String> pulsarPublisher;
private final ObjectMapper objectMapper;
public MessagePublisher(MessageRepository messageRepository, KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
public ChatMessagePublisher(MessageRepository messageRepository, Producer<String> pulsarPublisher, ObjectMapper objectMapper) {
this.messageRepository = messageRepository;
this.kafkaTemplate = kafkaTemplate;
this.pulsarPublisher = pulsarPublisher;
this.objectMapper = objectMapper;
}
@ -45,7 +45,7 @@ public class MessagePublisher {
}
try {
String messageString = objectMapper.writeValueAsString(message);
kafkaTemplate.send("public-chat", messageString);
pulsarPublisher.sendAsync(messageString);
} catch (JsonProcessingException e) {
log.error("Json process error", e);
}

View File

@ -22,10 +22,10 @@ import java.util.Map;
@Component
public class ChatWebSocketHandler extends TextWebSocketHandler {
private final MessagePublisher messagePublisher;
private final ChatMessagePublisher chatMessagePublisher;
public ChatWebSocketHandler(MessagePublisher messagePublisher) {
this.messagePublisher = messagePublisher;
public ChatWebSocketHandler(ChatMessagePublisher chatMessagePublisher) {
this.chatMessagePublisher = chatMessagePublisher;
}
@Override
@ -33,12 +33,12 @@ public class ChatWebSocketHandler extends TextWebSocketHandler {
SessionContainer.add(session);
Map<String, Object> attributes = session.getAttributes();
String content = attributes.get("fullName") + "进入了聊天室";
messagePublisher.publish(attributes, Message.Type.SYSTEM, content);
chatMessagePublisher.publish(attributes, Message.Type.SYSTEM, content);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
messagePublisher.publish(session.getAttributes(), Message.Type.PUBLIC, message.getPayload());
chatMessagePublisher.publish(session.getAttributes(), Message.Type.PUBLIC, message.getPayload());
}
@Override
@ -46,7 +46,7 @@ public class ChatWebSocketHandler extends TextWebSocketHandler {
SessionContainer.remove(session);
Map<String, Object> attributes = session.getAttributes();
String content = attributes.get("fullName") + "离开了聊天室";
messagePublisher.publish(attributes, Message.Type.SYSTEM, content);
chatMessagePublisher.publish(attributes, Message.Type.SYSTEM, content);
}
}

View File

@ -1,34 +0,0 @@
package cn.linter.oasys.chat.handler;
import cn.linter.oasys.chat.container.SessionContainer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import java.io.IOException;
/**
* 消息监听器
*
* @author wangxiaoyang
* @since 2021/1/2
*/
@Slf4j
@Component
public class MessageListener {
@KafkaListener(topics = "public-chat")
public void listen(ConsumerRecord<String, String> record) {
TextMessage textMessage = new TextMessage(record.value());
SessionContainer.values().parallelStream().forEach(session -> {
try {
session.sendMessage(textMessage);
} catch (IOException e) {
log.error("Message sending error", e);
}
});
}
}

View File

@ -0,0 +1,19 @@
{
"properties": [
{
"name": "pulsar.host",
"type": "java.lang.String",
"description": "Pulsar服务IP."
},
{
"name": "pulsar.topic",
"type": "java.lang.String",
"description": "topic."
},
{
"name": "pulsar.subscription",
"type": "java.lang.String",
"description": "subscription."
}
]
}

View File

@ -15,7 +15,7 @@ spring:
jackson:
default-property-inclusion: non_null
time-zone: GMT+8
kafka:
consumer:
group-id: chat
auto-offset-reset: earliest
pulsar:
host: localhost:6650
topic: chat
subscription: chat

View File

@ -30,6 +30,7 @@
<spring-security-oauth2.version>2.5.0.RELEASE</spring-security-oauth2.version>
<mybatis.version>2.2.2</mybatis.version>
<pagehelper.version>1.4.1</pagehelper.version>
<pulsar.version>2.9.1</pulsar.version>
<minio.version>8.3.5</minio.version>
<nimbus.version>9.18</nimbus.version>
</properties>