Лучший опыт

Spring Boot, Kafka и WebSocket для отправки сообщений в реальном времени.

Spring Boot на основе Spring Framework упрощает загрузку и разработку новых приложений. Kafka  —  платформа распределенной потоковой передачи, позволяющая создавать конвейеры и приложения для передачи данных в реальном времени. WebSocket обеспечивает полнодуплексный канал связи через один сокет  —  идеальный вариант для обмена данными в реальном времени. Далее рассмотрим процесс настройки приложения Spring Boot, интеграции его с Kafka для постановки с?
Spring Boot, Kafka и WebSocket для отправки сообщений в реальном времени...


Spring Boot на основе Spring Framework упрощает загрузку и разработку новых приложений. Kafka  —  платформа распределенной потоковой передачи, позволяющая создавать конвейеры и приложения для передачи данных в реальном времени. WebSocket обеспечивает полнодуплексный канал связи через один сокет  —  идеальный вариант для обмена данными в реальном времени.

Далее рассмотрим процесс настройки приложения Spring Boot, интеграции его с Kafka для постановки сообщений в очередь и с WebSocket для обмена сообщениями. Вы получите четкое представление о взаимодействии этих технологий в приложении с чатом реального времени.

В конце статьи имеются ссылки на репозитории.


Вначале разберемся с настройкой приложения Spring Boot и интеграцией его с Kafka.

Spring Boot  —  мощный фреймворк для разработки и настройки приложений. Он включает функции автонастройки, автономный код и встроенные серверы, упрощающие создание автономных корпоративных приложений Spring.

Сначала нужно настроить приложение Spring Boot. Это можно сделать с помощью инструмента Spring Initializr, обеспечивающего быструю начальную загрузку таких приложений. Для работы с чатом нужно включить зависимости “Web”, “Kafka”, “WebSocket” и “Lombok”.

Spring Initializr
Зависимости

После настройки приложения можно приступать к интеграции Spring Boot и распределенной потоковой платформы Kafka, используемой для создания конвейеров передачи данных в режиме реального времени и потоковых приложений. Это идеальный выбор для живого общения благодаря масштабируемости по горизонтали, надежности и невероятно быстрой работе.

Сначала создадим классы `Message` и `MessageType`, представляющие объект message и типы отправляемых сообщений.

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Message {
private MessageType type;
private String content;
private String sender;
private String sessionId;
}public enum MessageType {
CHAT,
CONNECT,
DISCONNECT
}

Затем создаем класс `KafkaConsumerConfig`:

@Configuration public class KafkaConsumerConfig {  @Bean public ConsumerFactory<String, Message> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "chat"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), new JsonDeserializer<>(Message.class)); }  @Bean public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }

Теперь настраиваем в классе `KafkaProducerConfig` производителя (producer) Kafka, отвечающего за отправку сообщений. Используем для этого `KafkaTemplate`. Вот как выглядит класс `KafkaProducerConfig`:

@Configuration public class KafkaProducerConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress;  @Bean public ProducerFactory<String, Message> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); }  @Bean public KafkaTemplate<String, Message> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

Чтобы в реальном времени обмениваться сообщениями между сервером и клиентами, воспользуемся WebSocket.


WebSocket предоставляет полнодуплексный канал связи через один сокет, поэтому идеально подходит для передачи данных реального времени между клиентом и сервером.

В классе `WebSocketConfig` настраиваем брокер для обработки сообщений, отправляемых клиентам WebSocket и получаемые от них. Вот как выглядит класс `WebSocketConfig`:

@Configuration @EnableWebSocketMessageBroker public class WebSocketConfigimplementsWebSocketMessageBrokerConfigurer {  @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic"); config.setApplicationDestinationPrefixes("/app"); }  @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("http://localhost:5173/").withSockJS(); } }

В составе класса:

  • Метод `configureMessageBroker` настраивает брокер сообщений. Он включает простой брокер в памяти и устанавливает префикс для сообщений, которые привязаны к методам с аннотацией @MessageMapping.
  • Метод `registerStompEndpoints` регистрирует конечную точку “/ws”, разрешая резервные опции SockJS для альтернативной транспортировки, если WebSocket недоступен.

В настроенном WebSocket можно создать `MessageController` для обработки отправляемых и принимаемых сообщений. `MessageController` использует аннотацию `@MessageMapping` для сопоставления назначения входящих сообщений с определенными методами.

Сначала создадим класс Sender:

@Service public class Sender {  private final KafkaTemplate<String, Message> kafkaTemplate;  public Sender(KafkaTemplate<String, Message> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; }  public void send(String topic, Message message) { kafkaTemplate.send(topic, message); } }

Вот так выглядит класс `MessageController`:

@ControllerpublicclassMessageController {  private final Sender sender; private final SimpMessageSendingOperations messagingTemplate; private static final Logger logger = LoggerFactory.getLogger(MessageController.class);  public MessageController(Sender sender, SimpMessageSendingOperations messagingTemplate) { this.sender = sender; this.messagingTemplate = messagingTemplate; }  @MessageMapping("/chat.send-message") public void sendMessage(@Payload Message chatMessage, SimpMessageHeaderAccessor headerAccessor) { chatMessage.setSessionId(headerAccessor.getSessionId()); sender.send("messaging", chatMessage); logger.info("Sending message to /topic/public: " + chatMessage); messagingTemplate.convertAndSend("/topic/public", chatMessage); logger.info("Message sent to /topic/public: " + chatMessage); }  @MessageMapping("/chat.add-user") @SendTo("/topic/public") public Message addUser( @Payload Message chatMessage, SimpMessageHeaderAccessor headerAccessor ) { if (headerAccessor.getSessionAttributes() != null) { headerAccessor.getSessionAttributes().put("username", chatMessage.getSender()); }  return chatMessage; } }

В этом классе:

  • Метод SendMessage предназначен для обработки сообщений, отправляемых в место назначения “/chat.send-message”. Он отправляет полученное `Message` по адресу “/topic/public”, транслируя его всем подключенным клиентам WebSocket.

В заключение рассмотрим, как обрабатывать события WebSocket и как использовать сообщения из Kafka.

Обработка событий WebSocket необходима для поддержки состояния приложения. Например, при отключении/подключении пользователя можно передавать сообщение/уведомление об этом остальным пользователям. Spring позволяет обрабатывать эти события, создавая класс прослушивателя событий (event listener).

Соответствующие методы класса `WebSocketEventListener`помечены как `@EventListener`. Они вызываются автоматически в ответ на соответствующее событие. Пример класса `WebSocketEventListener`:

@Component public class WebSocketEventListener {  private static final Logger logger = LoggerFactory.getLogger(WebSocketEventListener.class); private final SimpMessageSendingOperations messagingTemplate;  public WebSocketEventListener(SimpMessageSendingOperations messagingTemplate) { this.messagingTemplate = messagingTemplate; }  @EventListener public void handleWebSocketConnectListener(SessionConnectedEvent event) { logger.info("Received a new web socket connection"); }  @EventListener public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage()); String username = (String) headerAccessor.getSessionAttributes().get("username");  if (username != null) { logger.info("User Disconnected: " + username); Message chatMessage = new Message(); chatMessage.setType(MessageType.DISCONNECT); chatMessage.setSender(username); messagingTemplate.convertAndSend("/topic/public", chatMessage); } } }

В этом классе:

  • Метод handleWebSocketConnectListener вызывается при открытии нового соединения WebSocket.
  • Метод handleWebSocketDisconnectListener вызывается при закрытии соединения WebSocket. Он отправляет сообщение ‘DISCONNECT’ всем остальным подключенным клиентам WebSocket.

Наконец, нужно использовать сообщения от Kafka. В классе Receiver имеется метод с аннотацией @KafkaListener. Он вызывается автоматически при получении сообщения из топика Kafka. Вот как выглядит этот класс `Receiver`:

@Service public class Receiver {  private static final Logger logger = LoggerFactory.getLogger(Receiver.class); private final SimpMessageSendingOperations messagingTemplate; private final SimpUserRegistry userRegistry;  public Receiver(SimpMessageSendingOperations messagingTemplate, SimpUserRegistry userRegistry) { this.messagingTemplate = messagingTemplate; this.userRegistry = userRegistry; }  @KafkaListener(topics = "messaging", groupId = "chat") public void consume(Message chatMessage) { logger.info("Received message from Kafka: " + chatMessage); for (SimpUser user : userRegistry.getUsers()) { for (SimpSession session : user.getSessions()) { if (!session.getId().equals(chatMessage.getSessionId())) { messagingTemplate.convertAndSendToUser(session.getId(), "/topic/public", chatMessage); } } } } }

В этом классе метод `consume` вызывается при получении сообщения из раздела (топика) Kafka “messaging”. Он отправляет полученное `сообщение` в “/topic/public” (раздел WebSocket), транслируя его всем подключенным к WebSocket клиентам.


Фронтенд: обмен сообщениями реального времени через WebSocket с помощью React и STOMP

Теперь создадим с помощью React фронтенд-приложение для взаимодействия с бэкенд-сервисом. Для установки протокола STOMP поверх подключения WebSocket используем библиотеку @stomp/stompjs.

Сначала создадим проект, используя IDE WebStorm.

Затем устанавливаем необходимые библиотеки:npm install @stomp/stompjs sockjs-client @mui/material react-avatar

Здесь @stomp/stompjs используется для клиента STOMP, клиент sockjs используется для подключения к WebSocket, @mui/material используется для компонентов пользовательского интерфейса, а react-avatar нужен для отображения аватаров пользователей.

Создание Username Page

В `UsernamePage.jsx` обычно создают форму, где пользователь вводит свое имя. После отправки имени пользователя его можно сохранить в состоянии и передать в компонент `ChatPage` в качестве пропа. Затем это имя пользователя будет использоваться как отправитель сообщений.

import { useState } from 'react'; import { Button, TextField, Container, Box } from '@mui/material'; import PropTypes from 'prop-types';  function UsernamePage({ setUsername }) { UsernamePage.propTypes = { setUsername: PropTypes.func.isRequired, };  const [inputUsername, setInputUsername] = useState('');  const handleUsernameSubmit = (event) => { event.preventDefault(); if (inputUsername) { setUsername(inputUsername); } };  return ( <Container> <Box display="flex" flexDirection="column" justifyContent="center" alignItems="center" mt={2}> <h1>Type your username</h1> <form onSubmit={handleUsernameSubmit}> <Box display="flex" alignItems="stretch"> <TextField sx={{ color: 'white', '& .MuiOutlinedInput-notchedOutline': { borderColor: 'gray' }, width: '300px', '& .MuiOutlinedInput-root': { borderRadius: '36px', '& fieldset': { borderColor: 'gray', }, '& input': { height: '8px', }, }, }} inputProps={{ style: { color: 'white' } }} variant="outlined" placeholder="Username" value={inputUsername} onChange={(e) => setInputUsername(e.target.value)} /> <Box marginLeft={2}> <Button variant="contained" sx={{ width: '94px', height: '42px', borderRadius: '36px', }} color="primary" type="submit"> Enter </Button> </Box> </Box> </form> </Box> </Container> ); }  export default UsernamePage;

В этом коде UsernamePage принимает проп setUsername, который является функцией для обновления имени пользователя в родительском компоненте (App.jsx). Он содержит форму с текстовым полем для имени пользователя и кнопку отправки. Когда форма отправлена, она вызывает функцию `setUsername` с введенным именем пользователя.

В `App.jsx` обычно сохраняют состояние имени пользователя для условного рендеринга либо `UsernamePage`, либо `ChatPage` в зависимости от того, было ли задано имя пользователя.

import { useState } from 'react'; import UsernamePage from './component/UsernamePage.jsx'; import ChatPage from './component/ChatPage.jsx';  function App() { const [username, setUsername] = useState(null);  return ( <div> {username ? <ChatPage username={username} /> : <UsernamePage setUsername={setUsername} />} </div> ); }  export default App;

В этом коде приложение хранит и поддерживает состояние имени пользователя. Если это имя равно null, оно отображается (рендерится) как `UsernamePage` и передает функцию `setUsername` в качестве пропа. Если имя пользователя не равно null, оно передается в качестве пропа и отображается `ChatPage`.

Создание Chat Page

Создадим в файле “ChatPage.jsx` компонент “ChatPage` . Он будет обрабатывать подключение к WebSocket, отправлять и получать сообщения и рендерить интерфейс чата.

import { useState, useEffect, useRef } from 'react'; import { Client } from '@stomp/stompjs'; import SockJS from 'sockjs-client/dist/sockjs'; import ChatMessage from "./ChatMessage.jsx"; import { Button, TextField, Container, Box } from '@mui/material';  function ChatPage({ username }) { const [messages, setMessages] = useState([]); const [client, setClient] = useState(null); const messageInputRef = useRef();  useEffect(() => { const newClient = new Client({ webSocketFactory: () => new SockJS('http://localhost:8080/ws'), onConnect: () => { const joinMessage = { sender: username, type: 'CONNECT', }; newClient.publish({ destination: '/app/chat.addUser', body: JSON.stringify(joinMessage) }); newClient.subscribe('/topic/public', message => { const newMessage = JSON.parse(message.body); setMessages(prevMessages => [...prevMessages, newMessage]); }); }, onDisconnect: () => { if (newClient.connected) { const leaveMessage = { sender: username, type: 'DISCONNECT', }; newClient.publish({ destination: '/app/chat.addUser', body: JSON.stringify(leaveMessage) }); } }, });  newClient.activate(); setClient(newClient);  return () => { newClient.deactivate(); }; }, [username]);  const sendMessage = () => { if (messageInputRef.current.value && client) { const chatMessage = { sender: username, content: messageInputRef.current.value, type: 'CHAT', }; client.publish({ destination: '/app/chat.sendMessage', body: JSON.stringify(chatMessage) }); messageInputRef.current.value = ''; } };  return ( <Container> <Box> {messages.map((message, index) => ( <ChatMessage key={index} message={message} username={username} /> ))} </Box> <form onSubmit={sendMessage}> <TextField inputRef={messageInputRef} placeholder="Type a message..." /> <Button type="submit">Send</Button> </form> </Container> ); }  export default ChatPage;

В этом коде используются хуки useState и useEffect из React для управления состоянием и побочными эффектами компонента. Класс Client из @stomp/stompjs используется для создания клиента STOMP через подключение к WebSocket. Метод publish используется для отправки сообщений на сервер, а метод subscribe для получения сообщений с сервера.

Создание компонента Chat Message

В файле `ChatMessage.jsx` создадим компонент ChatMessage. Он будет отображать одно сообщение чата.

import Avatar from 'react-avatar'; import { Box } from '@mui/material';  function ChatMessage({ message, username }) { return ( <Box sx={{ display: 'flex', flexDirection: 'column', alignItems: message.sender === username ? 'flex-end' : 'flex-start', margin: '10px 0' }}> <Box sx={{ display: 'flex', flexDirection: message.sender === username ? 'row-reverse' : 'row', alignItems: 'center', gap: 1 }}> <Avatar name={message.sender} size="35" round={true} /> <h4>{message.sender}</h4> </Box> <Box sx={{ backgroundColor: message.sender === username ? 'primary.main' : 'secondary.main', color: 'white', borderRadius: '12px', padding: '10px', maxWidth: '80%', }}> <p>{message.content}</p> </Box> </Box> ); }  export default ChatMessage;

В этом коде используется компонент Box из @mui/material для создания гибкого макета и компонент Avatar из react-avatar для отображения аватара пользователя. Проп sx используется для применения стилей к компонентам.

В этом разделе мы создали с использованием React фронтенд-приложение, которое взаимодействует с бэкенд-сервисом через соединение WebSocket. При этом использована библиотека @stomp/stompjs для определения протокола STOMP через подключение к WebSocket и библиотека @mui/material для создания пользовательского интерфейса.


Попробуем сделать это с командной строкой, создав класс `CommandController`:

@RestController public class CommandController {  private final KafkaTemplate<String, Message> kafkaTemplate; private final SimpMessageSendingOperations messagingTemplate;  public CommandController(KafkaTemplate<String, Message> kafkaTemplate, SimpMessageSendingOperations messagingTemplate) { this.kafkaTemplate = kafkaTemplate; this.messagingTemplate = messagingTemplate; }  @PostMapping("/send") public void send(@RequestBody Message message) { kafkaTemplate.send("messaging", message); messagingTemplate.convertAndSend("/topic/public", message); } }

Выполняем эти команды в терминале:

curl -X POST -H "Content-Type: application/json" -d "{\"type\":\"CONNECT\",\"content\":\"Hello, World!\",\"sender\":\"Command Line\"}" http://localhost:8080/send  curl -X POST -H "Content-Type: application/json" -d "{\"type\":\"CHAT\",\"content\":\"Hello, World!\",\"sender\":\"Command Line\"}" http://localhost:8080/send

Получим визуальный результат:


Используя Spring Boot, Kafka и WebSocket мы создали масштабируемое и эффективное приложение для чата, работающее в реальном времени.

Репозитории на GitHub: