服务器发送事件(Server-Sent Events,简称SSE)可能听起来像是另一个流行词,但它是一项正在悄然革新实时通信的技术。与建立全双工连接的WebSockets不同,SSE从服务器到客户端创建了一个单向通道。这种简单性正是它的强大之处。

以下是为什么在Quarkus中使用SSE值得关注的原因:

  • 轻量且易于实现
  • 基于标准HTTP工作
  • 自动处理重连
  • 兼容现有的网络基础设施
  • 适用于不需要双向通信的场景

在Quarkus中实现SSE:快速入门指南

让我们通过一些代码来动手实践。以下是如何在Quarkus中实现一个基本的SSE端点:


@Path("/events")
public class SSEResource {

    @Inject
    @Channel("news-channel") 
    Emitter<String> emitter;

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Multi<String> stream() {
        return Multi.createFrom().emitter(emitter::send);
    }

    @POST
    @Path("/push")
    public void push(String news) {
        emitter.send(news);
    }
}

这个简单的例子设置了一个SSE端点,用于发送新闻更新。客户端可以连接到/events端点以接收更新,你可以通过/events/push端点推送新事件。

扩展SSE:驯服并发的野兽

在大规模系统中实现SSE时,控制客户端并发变得至关重要。以下是一些保持系统平稳运行的策略:

1. 使用连接池

实现一个连接池来管理SSE连接。这有助于防止在处理大量并发客户端时资源耗尽。


@ApplicationScoped
public class SSEConnectionPool {
    private final ConcurrentHashMap<String, SseEventSink> connections = new ConcurrentHashMap<>();

    public void addConnection(String clientId, SseEventSink sink) {
        connections.put(clientId, sink);
    }

    public void removeConnection(String clientId) {
        connections.remove(clientId);
    }

    public void broadcast(String message) {
        connections.values().forEach(sink -> sink.send(sse.newEvent(message)));
    }
}

2. 实现背压

使用Reactive Streams实现背压,防止客户端过载导致问题:


@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createFrom().emitter(emitter::send)
        .onOverflow().drop()
        .onItem().transform(item -> {
            // 处理项目
            return item;
        });
}

3. 客户端限流

实现客户端限流以控制事件处理的速率:


const eventSource = new EventSource('/events');
const queue = [];
let processing = false;

eventSource.onmessage = (event) => {
    queue.push(event.data);
    if (!processing) {
        processQueue();
    }
};

function processQueue() {
    if (queue.length === 0) {
        processing = false;
        return;
    }
    processing = true;
    const item = queue.shift();
    // 处理项目
    setTimeout(processQueue, 100); // 限制为每秒10个项目
}

回退策略:当SSE不够用时

虽然SSE很棒,但并不总是完美的解决方案。以下是一些回退策略:

1. 长轮询

如果SSE不支持或失败,回退到长轮询:


function longPoll() {
    fetch('/events/poll')
        .then(response => response.json())
        .then(data => {
            // 处理数据
            longPoll(); // 立即开始下一个请求
        })
        .catch(error => {
            console.error('长轮询错误:', error);
            setTimeout(longPoll, 5000); // 5秒后重试
        });
}

2. WebSocket回退

对于需要双向通信的场景,实现WebSocket回退:


@ServerEndpoint("/websocket")
public class FallbackWebSocket {
    @OnOpen
    public void onOpen(Session session) {
        // 处理新连接
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        // 处理传入消息
    }
}

保持连接活跃:心跳间隔

为了维护SSE连接并检测断开,实现心跳间隔:


@Scheduled(every="30s")
void sendHeartbeat() {
    emitter.send("heartbeat");
}

在客户端:


let lastHeartbeat = Date.now();

eventSource.onmessage = (event) => {
    if (event.data === 'heartbeat') {
        lastHeartbeat = Date.now();
        return;
    }
    // 处理常规事件
};

setInterval(() => {
    if (Date.now() - lastHeartbeat > 60000) {
        // 60秒无心跳,重新连接
        eventSource.close();
        connectSSE();
    }
}, 5000);

大规模调试连接问题

在大规模处理SSE时,调试可能具有挑战性。以下是一些让生活更轻松的技巧:

1. 实现详细日志记录

使用Quarkus的日志功能跟踪SSE连接和事件:


@Inject
Logger logger;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream(@Context SecurityContext ctx) {
    String clientId = ctx.getUserPrincipal().getName();
    logger.infof("SSE连接已建立,客户端:%s", clientId);
    return Multi.createFrom().emitter(emitter::send)
        .onTermination().invoke(() -> {
            logger.infof("SSE连接已终止,客户端:%s", clientId);
        });
}

2. 实现指标

在Quarkus中使用Micrometer跟踪重要指标:


@Inject
MeterRegistry registry;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    registry.counter("sse.connections").increment();
    return Multi.createFrom().emitter(emitter::send)
        .onTermination().invoke(() -> {
            registry.counter("sse.disconnections").increment();
        });
}

3. 使用分布式追踪

实现分布式追踪以跟踪系统中的SSE事件:


@Inject
Tracer tracer;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    Span span = tracer.buildSpan("sse-stream").start();
    return Multi.createFrom().emitter(emitter::send)
        .onItem().invoke(item -> {
            tracer.buildSpan("sse-event")
                .asChildOf(span)
                .start()
                .finish();
        })
        .onTermination().invoke(span::finish);
}

总结:Quarkus中SSE的强大之处

Quarkus中的服务器发送事件为大规模系统中的实时通信提供了一种强大且轻量的替代方案。通过实现适当的并发控制、回退策略、心跳机制和强大的调试实践,你可以充分利用SSE的潜力。

记住,虽然WebSockets可能是更引人注目的选择,但SSE通常可以提供你所需的简单性和可扩展性。因此,下次你设计实时系统时,给SSE一个展示的机会。你的未来自我(以及你的运维团队)会感谢你!

“简单是终极的复杂。” - 莱昂纳多·达·芬奇

现在去构建一些令人惊叹的、可扩展的、实时的系统吧,使用SSE和Quarkus!