AI 爆火背后,Spring Boot SSE 推送该怎么学?

最近 AI 爆火,与之相关的技术也成为香饽饽,SSE 推送就是其中之一。SSE,全称 Server-Sent Events,是 HTML5 Web API 的一员。它能让网页低延迟、高效地接收服务器实时更新,通过 HTTP 协议实现服务器主动向客户端推送数据。借助持久化 HTTP 长连接,服务器可以向客户端实时推送数据,不过客户端无法通过 SSE 向服务端回传数据。
很多小伙伴想上手 SSE 推送开发,却不知从何开始。别担心!本文将基于 Spring Boot,带大家实操 SSE 推送。不仅介绍了 Spring Boot 实现 SSE 推送的多种方式,还会兼顾 Spring Boot 2.4 前后版本的差异,助你快速掌握这项热门技术。

SSE推送实战:基于定时器实现

1、引入依赖:在项目中引入 webflux,仅需添加一行代码搞定依赖。

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

2、编写控制器:构建一个定时发送数据的控制器。

  private final AtomicInteger counter = new AtomicInteger(0);

    @GetMapping(path = "/interval/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE+ "; charset=UTF-8")
    public Flux<ServerSentEvent<Integer>> streamSseMvc() {
        return Flux.interval(Duration.ofSeconds(1))
               .map(seq -> ServerSentEvent.<Integer>builder()
                       .data(counter.incrementAndGet())
                       .build()).takeUntil(event -> event.data() > 10).doOnComplete(() -> log.info("complete"));
    }

这段代码实现了每秒发送一次数据,当发送数据值大于11 时停止。

3、搭建 SSE 客户端页面:设计一个简单的 HTML 页面接收数据。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Example</title>
</head>
<body>
    <div id="messages"></div>
    <script>
        const eventSource = new EventSource('/sse/interval/stream');
        eventSource.onmessage = function (event) {
            const messagesDiv = document.getElementById('messages');
            const newMessage = document.createElement('p');
            newMessage.textContent = `Received: ${event.data}`;
            messagesDiv.appendChild(newMessage);
        };
        eventSource.onerror = function (error) {
            console.error('EventSource failed:', error);
            eventSource.close();
        };
    </script>
</body>
</html>    

4、测试

浏览器访问客户端页面,会发现页面打印如下内容

Received: 1

Received: 2

Received: 3

Received: 4

Received: 5

Received: 6

Received: 7

Received: 8

Received: 9

Received: 10

Received: 11

看到这个内容,不知道大家会不会有疑问,为啥页面还会出现11,这主要是因为 takeUntil 操作符的工作机制。takeUntil 操作符是在满足指定条件后才停止发出元素,不过它会包含最后一个满足条件的元素。

具体而言,当 counter 的值为 10 时,下一次递增后变为 11,此时 event.data() > 10 条件成立,Flux 停止发出新的元素,但 11 这个元素已经被生成并发出了,所以客户端会接收到值为 11 的事件。

若不想让页面输出 11,可以把 takeUntil 的条件改为 event.data() >= 10,这样当 counter 的值达到 10 时,Flux 就会停止发出新的元素

更灵活的实现:基于发布订阅模式

定时器实现虽简单,但触发时间点不好控制。发布订阅模式则更灵活,生产者发送消息,消费者接收消息。

Spring Boot 2.4 之前版本

核心主要利用FluxProcessor,它有好几种实现类,我们常用大致有3种,UnicastProcessor、DirectProcessor、ReplayProcessor,这三种的实现大同小异,其主要区别在于

  • UnicastProcessor:适用于仅需一个订阅者的场景,像单个客户端接收实时数据更新的情况。
  • DirectProcessor:适合有多个订阅者的场景,比如多个客户端都要接收相同的实时消息。
  • ReplayProcessor:适合需要向新订阅者重播之前消息的场景,例如新客户端连接时需要获取历史消息。

本文就以DirectProcessor来讲解

1、创建发布订阅管理类:借助DirectProcessor实现消息发布与订阅。

public class DirectProcessorSsePublisherService implements SsePublisherService {

    private final DirectProcessor<String> processor = DirectProcessor.create();
    private final Flux<String> flux = processor.replay().autoConnect();

    @Override
    public Flux<String> getMessages() {
        return flux;
    }

    @Override
    public void publishMessage(String message) {
        processor.onNext(message);
    }

    @Override
    public void complete() {
        processor.onComplete();
    }
}

DirectProcessorSsePublisherService类的主要功能是作为一个消息发布服务,借助DirectProcessor和Flux来实现响应式消息的发布与订阅。外部类可以通过getMessages方法订阅消息流,通过publishMessage方法发布新消息,通过complete方法结束消息流。

2、设置定时器生产数据:定时生成数据并发布。

@Component
public class ProduceDataTask {

    @Autowired
    private SsePublisherService ssePublisherService;

    private final AtomicInteger counter = new AtomicInteger(0);


    @Scheduled(fixedRate = 5000)
    public void run(){
        int num = counter.incrementAndGet();
        System.out.println("num = " + num);
        ssePublisherService.publishMessage("hello-" + num);
        if (num > 10) {
            ssePublisherService.complete();
        }

    }
}

ProduceDataTask 类的主要功能是每隔 5 秒生成一个递增的整数,并将包含该整数的消息通过 SsePublisherService 发布出去,当生成的整数大于 10 时,结束消息发布。

3、编写消费控制器:接收并处理发布的消息。

@RestController
@RequestMapping("/sse")
@Slf4j
public class SseController {

    @Autowired
    private SsePublisherService ssePublisherService;

 


    @GetMapping(path = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> subscribe() {
        return ssePublisherService.getMessages()
                .map(message -> ServerSentEvent.<String>builder()
                        .data(message)
                        .build());
    }
    }

4、搭建 SSE 客户端页面:设计一个简单的 HTML 页面接收数据。

页面和方式一的页面雷同,就不贴代码了

5、测试

浏览器访问客户端页面,会发现页面打印如下内容

Received: hello-1

Received: hello-2

Received: hello-3

Received: hello-4

Received: hello-5

Received: hello-6

Received: hello-7

Received: hello-8

Received: hello-9

Received: hello-10

Spring Boot 2.4 及之后版本

1、使用 Sinks 管理发布订阅:核心类换成Sinks。

@Service
public class SinksSsePublisherService implements SsePublisherService {

    private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Flux<String> getMessages() {
        return sink.asFlux();
    }

    @Override
    public void publishMessage(String message) {
        sink.tryEmitNext(message);
    }

    @Override
    public void complete() {
        sink.tryEmitComplete();
    }
}

SinksSsePublisherService 类的主要功能是作为一个消息发布服务,使用 Sinks 来管理消息的发布和订阅。外部代码可以通过 getMessages 方法订阅消息流,通过 publishMessage 方法发布新消息,通过 complete 方法结束消息流。同时,使用 onBackpressureBuffer() 策略来处理背压,确保在订阅者处理速度较慢时数据不会丢失。

2、生产数据与消费控制器:定时器和消费控制器代码与低版本相同。

3、测试:同样能看到推送的消息。

总结与拓展

本文基于 WebFlux,详细介绍了 Spring Boot 实现 SSE 推送的多种方法。若想基于 WebMVC 实现,可使用org.springframework.web.servlet.mvc.method.annotation.SseEmitter。
为方便大家学习,本文示例代码已上传至 GitHub:https://github.com/lyb-geek/springboot-learning/tree/master/springboot-sse

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 228,702评论 6 534
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,615评论 3 419
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 176,606评论 0 376
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,044评论 1 314
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,826评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,227评论 1 324
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,307评论 3 442
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,447评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 48,992评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,807评论 3 355
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,001评论 1 370
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,550评论 5 361
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,243评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,667评论 0 26
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,930评论 1 287
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,709评论 3 393
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,996评论 2 374

推荐阅读更多精彩内容