后台长时间任务的监控,例如,处理进度的监控,可以通过客户端轮询拉或者服务器推技术来实现。这里主要讨论服务器推技术的实现。
Comet 基于HTTP 长连接、无须在浏览器端安装插件的“服务器推”技术为“Comet”。Comet主要有两种实现方式。
基于 AJAX 的长轮询(long-polling)方式
使用 AJAX 实现“服务器推”与传统的 AJAX 应用不同之处在于
服务器端会阻塞请求直到有数据传递或超时才返回。
客户端 JavaScript 响应处理函数会在处理完服务器返回的信息后,再次发出请求,重新建立连接。
当客户端处理接收的数据、重新建立连接时,服务器端可能有新的数据到达;这些信息会被服务器端保存直到客户端重新建立连接,客户端会一次把当前服务器端所有的信息取回。
在这种长轮询方式下,客户端是在 XMLHttpRequest 的 readystate 为 4(即数据传输结束)时调用回调函数,进行信息处理。当 readystate 为 4 时,数据传输结束,连接已经关闭。Mozilla Firefox 提供了对 Streaming AJAX 的支持, 即 readystate 为 3 时(数据仍在传输中),客户端可以读取数据,从而无须关闭连接,就能读取处理服务器端返回的信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var xhr = new XMLHttpRequest ();
xhr.previous_text = '' ;
xhr.onerror = function () { log_message("[XHR] Fatal Error." ); };
xhr.onreadystatechange = function ()
{
try
{
if (xhr.readyState > 2 )
{
var new _response = xhr.responseText.substring(xhr.previous_text.length);
var result = JSON.parse( new _response );
log_message(result.message);
document.getElementById('progressor' ).style.width = result.progress + "%" ;
xhr.previous_text = xhr.responseText;
}
}
catch (e)
{
}
};
xhr.open("GET" , "ajax_stream.php" , true );
xhr.send("Making request..." );
基于 Iframe 及 htmlfile 的流(streaming)方式
iframe 是很早就存在的一种HTML标记,通过在HTML页面里嵌入一个隐蔵帧,然后将这个隐蔵帧的SRC属性设为对一个长连接的请求,服务器端就能源源不断地往客户端输入数据。iframe 服务器端并不返回直接显示在页面的数据,而是返回对客户端 Javascript 函数的调用,如<script type="text/javascript">js_func(“data from server ”)</script>
。服务器端将返回的数据作为客户端JavaScript函数的参数传递;客户端浏览器的Javascript引擎在收到服务器返回的JavaScript调用时就会去执行代码。
Server Send Event HTML5提供了很多新的特性,其中包括的Server Send Event 和Websockets 能够更方便的实现服务器端推。Websockets 的接口实现的是客户端和服务器端之间的双向通信,对应的开销也会更大。 对于我们这里的应用场景来说,只需要单向的服务器端退即可,所以使用SSE来实现更为合适。
客户端的实现 1
2
3
4
5
6
7
8
9
10
if (!!window .EventSource)
{
var source = new EventSource('task.php' );
source.addEventListener('message' , function (e )
{
console .log(e.data);
}, false );
}
客户端的实现很简单,只需要新建一个EventSource,参数为请求的地址,然后添加上listener即可。一旦添加完message listener之后,客户端就会发送请求,在这期间,如果服务器端长时间没有响应,那么,客户端会再次发送请求,直到调用source.close()
方法。 所以客户端正常情况下,应该有个服务器端处理完,关闭source的逻辑。
服务器端 服务器端只需要在任务结束直接,不关闭response,然后然response中写符合特定格式的数据即可
Spring有对SSE的封装,下面以Spring为例,完整地演示后台任务进度的反馈
下面是Controller的代码,对SSE的调用需要在异步任务中进行,但是SSE并不属于应用层的逻辑,所以通过回调的方式传递给应用层。任务完成后需要调用complete方法,对应的客户端对complete之前返回的消息做特殊的响应,即关闭EventSource
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SseEmitter sseEmitter = new SseEmitter();
fooService.batch((String id, String result, String message, boolean exception) -> {
String data = String.format("%s,%s,%s" , result, message, exception);
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().id(id).data(data);
sseEmitter.send(eventBuilder);
if (BATCH_COMPLETE.equals(result)){
sseEmitter.complete();
}
}, error -> {
try {
sseEmitter.send(BATCH_COMPLETE);
sseEmitter.complete();
} catch (IOException e) {
sseEmitter.completeWithError(e);
}
});
return sseEmitter ;
应用层方法中,应该对每条批处理数据进行异步。异步使用Spring的@Async
注解即可,但是这里需要注意的是,如果通过this调用自己类方法AOP代理是会失效的,因此需要用另外一个Bean来封装异步方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void batch(FooController.SSEBatchConsumer replyStatus, Consumer<String> replyError) {
AtomicInteger count = new AtomicInteger(0 );
List<Foo> foos = this .fooRepository.findAll ();
if (foos == null || foos.size () == 0 ) {
replyError.accept(null );
return ;
}
for (Foo foo : foos) {
this .asyncService.doFoo(foo, foo.size (), count , replyStatus);
}
}
public class AsyncService {
@Async
public void asyncService(Foo fpp, int total, AtomicInteger count , RecruitmentsIndexController.SSEBatchConsumer replyStatus)
}