文章目录
  1. 1. Comet
    1. 1.1. 基于 AJAX 的长轮询(long-polling)方式
    2. 1.2. 基于 Iframe 及 htmlfile 的流(streaming)方式
  2. 2. Server Send Event
    1. 2.1. 客户端的实现
    2. 2.2. 服务器端

后台长时间任务的监控,例如,处理进度的监控,可以通过客户端轮询拉或者服务器推技术来实现。这里主要讨论服务器推技术的实现。

Comet

基于HTTP 长连接、无须在浏览器端安装插件的“服务器推”技术为“Comet”。Comet主要有两种实现方式。

基于 AJAX 的长轮询(long-polling)方式

使用 AJAX 实现“服务器推”与传统的 AJAX 应用不同之处在于

  1. 服务器端会阻塞请求直到有数据传递或超时才返回。
  2. 客户端 JavaScript 响应处理函数会在处理完服务器返回的信息后,再次发出请求,重新建立连接。
  3. 当客户端处理接收的数据、重新建立连接时,服务器端可能有新的数据到达;这些信息会被服务器端保存直到客户端重新建立连接,客户端会一次把当前服务器端所有的信息取回。

在这种长轮询方式下,客户端是在 XMLHttpRequest 的 readystate 为 4(即数据传输结束)时调用回调函数,进行信息处理。当 readystate 为 4 时,数据传输结束,连接已经关闭。Mozilla Firefox 提供了对 Streaming AJAX 的支持, 即 readystate 为 3 时(数据仍在传输中),客户端可以读取数据,从而无须关闭连接,就能读取处理服务器端返回的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var xhr = new XMLHttpRequest();
xhr.previous_text = '';
//xhr.onload = function() { log_message("[XHR] Done. responseText: <i>" + xhr.responseText + "</i>"); };
xhr.onerror = function() { log_message("[XHR] Fatal Error."); };
xhr.onreadystatechange = function()
{
try
{
if (xhr.readyState > 2)
{
var new_response = xhr.responseText.substring(xhr.previous_text.length);
var result = JSON.parse( new_response );
log_message(result.message);
//update the progressbar
document.getElementById('progressor').style.width = result.progress + "%";
xhr.previous_text = xhr.responseText;
}
}
catch (e)
{
//log_message("<b>[XHR] Exception: " + e + "</b>");
}
};
xhr.open("GET", "ajax_stream.php", true);
xhr.send("Making request...");

基于 Iframe 及 htmlfile 的流(streaming)方式

iframe 是很早就存在的一种HTML标记,通过在HTML页面里嵌入一个隐蔵帧,然后将这个隐蔵帧的SRC属性设为对一个长连接的请求,服务器端就能源源不断地往客户端输入数据。iframe 服务器端并不返回直接显示在页面的数据,而是返回对客户端 Javascript 函数的调用,如<script type="text/javascript">js_func(“data from server ”)</script>。服务器端将返回的数据作为客户端JavaScript函数的参数传递;客户端浏览器的Javascript引擎在收到服务器返回的JavaScript调用时就会去执行代码。

Server Send Event

HTML5提供了很多新的特性,其中包括的Server Send EventWebsockets能够更方便的实现服务器端推。Websockets的接口实现的是客户端和服务器端之间的双向通信,对应的开销也会更大。 对于我们这里的应用场景来说,只需要单向的服务器端退即可,所以使用SSE来实现更为合适。

客户端的实现

1
2
3
4
5
6
7
8
9
10
if (!!window.EventSource)
{
var source = new EventSource('task.php');
source.addEventListener('message', function(e)
{
console.log(e.data);
//Do whatever with e.data
}, false);
}

客户端的实现很简单,只需要新建一个EventSource,参数为请求的地址,然后添加上listener即可。一旦添加完message listener之后,客户端就会发送请求,在这期间,如果服务器端长时间没有响应,那么,客户端会再次发送请求,直到调用source.close()方法。 所以客户端正常情况下,应该有个服务器端处理完,关闭source的逻辑。

服务器端

服务器端只需要在任务结束直接,不关闭response,然后然response中写符合特定格式的数据即可

1
2
id:xxx
data:xxx

Spring有对SSE的封装,下面以Spring为例,完整地演示后台任务进度的反馈

下面是Controller的代码,对SSE的调用需要在异步任务中进行,但是SSE并不属于应用层的逻辑,所以通过回调的方式传递给应用层。任务完成后需要调用complete方法,对应的客户端对complete之前返回的消息做特殊的响应,即关闭EventSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SseEmitter sseEmitter = new SseEmitter();
fooService.batch((String id, String result, String message, boolean exception) -> {
String data = String.format("%s,%s,%s" , result, message, exception);
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().id(id).data(data);
sseEmitter.send(eventBuilder);
if (BATCH_COMPLETE.equals(result)){
sseEmitter.complete();
}
}, error -> {
try {
sseEmitter.send(BATCH_COMPLETE);
sseEmitter.complete();
} catch (IOException e) {
sseEmitter.completeWithError(e);
}
});
return sseEmitter;

应用层方法中,应该对每条批处理数据进行异步。异步使用Spring的@Async注解即可,但是这里需要注意的是,如果通过this调用自己类方法AOP代理是会失效的,因此需要用另外一个Bean来封装异步方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void batch(FooController.SSEBatchConsumer replyStatus, Consumer<String> replyError) {
AtomicInteger count = new AtomicInteger(0);
List<Foo> foos = this.fooRepository.findAll();
if (foos == null || foos.size() == 0) {
replyError.accept(null);
return;
}
for (Foo foo : foos) {
this.asyncService.doFoo(foo, foo.size(), count, replyStatus);
}
}
public class AsyncService {
@Async
public void asyncService(Foo fpp, int total, AtomicInteger count, RecruitmentsIndexController.SSEBatchConsumer replyStatus)
}
文章目录
  1. 1. Comet
    1. 1.1. 基于 AJAX 的长轮询(long-polling)方式
    2. 1.2. 基于 Iframe 及 htmlfile 的流(streaming)方式
  2. 2. Server Send Event
    1. 2.1. 客户端的实现
    2. 2.2. 服务器端
Fork me on GitHub