Guava 包里的 Service 接口用于封裝一個(gè)服務(wù)對(duì)象的運(yùn)行狀態(tài)、包括 start 和 stop 等方法。例如 web 服務(wù)器,RPC 服務(wù)器、計(jì)時(shí)器等可以實(shí)現(xiàn)這個(gè)接口。對(duì)此類服務(wù)的狀態(tài)管理并不輕松、需要對(duì)服務(wù)的開啟/關(guān)閉進(jìn)行妥善管理、特別是在多線程環(huán)境下尤為復(fù)雜。Guava 包提供了一些基礎(chǔ)類幫助你管理復(fù)雜的狀態(tài)轉(zhuǎn)換邏輯和同步細(xì)節(jié)。
一個(gè)服務(wù)正常生命周期有:
服務(wù)一旦被停止就無法再重新啟動(dòng)了。如果服務(wù)在 starting、running、stopping 狀態(tài)出現(xiàn)問題、會(huì)進(jìn)入 Service.State.FAILED.狀態(tài)。調(diào)用 [startAsync()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#startAsync())方法可以異步開啟一個(gè)服務(wù),同時(shí)返回 this 對(duì)象形成方法調(diào)用鏈。注意:只有在當(dāng)前服務(wù)的狀態(tài)是 NEW 時(shí)才能調(diào)用 startAsync()方法,因此最好在應(yīng)用中有一個(gè)統(tǒng)一的地方初始化相關(guān)服務(wù)。停止一個(gè)服務(wù)也是類似的、使用異步方法 [stopAsync()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#stopAsync()) 。但是不像 startAsync(),多次調(diào)用這個(gè)方法是安全的。這是為了方便處理關(guān)閉服務(wù)時(shí)候的鎖競(jìng)爭(zhēng)問題。
通過 [addListener()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#addListener())方法異步添加監(jiān)聽器。此方法允許你添加一個(gè) Service.Listener 、它會(huì)在每次服務(wù)狀態(tài)轉(zhuǎn)換的時(shí)候被調(diào)用。注意:最好在服務(wù)啟動(dòng)之前添加 Listener(這時(shí)的狀態(tài)是 NEW)、否則之前已發(fā)生的狀態(tài)轉(zhuǎn)換事件是無法在新添加的 Listener上被重新觸發(fā)的。
同步使用 [awaitRunning()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#awaitRunning())。這個(gè)方法不能被打斷、不強(qiáng)制捕獲異常、一旦服務(wù)啟動(dòng)就會(huì)返回。如果服務(wù)沒有成功啟動(dòng),會(huì)拋出 IllegalStateException 異常。同樣的, [awaitTerminated()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#awaitTerminated()) 方法會(huì)等待服務(wù)達(dá)到終止?fàn)顟B(tài)(TERMINATED 或者 FAILED)。兩個(gè)方法都有重載方法允許傳入超時(shí)時(shí)間。
Service 接口本身實(shí)現(xiàn)起來會(huì)比較復(fù)雜、且容易碰到一些捉摸不透的問題。因此我們不推薦直接實(shí)現(xiàn)這個(gè)接口。而是請(qǐng)繼承 Guava 包里已經(jīng)封裝好的基礎(chǔ)抽象類。每個(gè)基礎(chǔ)類支持一種特定的線程模型。
AbstractIdleService 類簡(jiǎn)單實(shí)現(xiàn)了 Service 接口、其在 running 狀態(tài)時(shí)不會(huì)執(zhí)行任何動(dòng)作–因此在 running 時(shí)也不需要啟動(dòng)線程–但需要處理開啟/關(guān)閉動(dòng)作。要實(shí)現(xiàn)一個(gè)此類的服務(wù),只需繼承 AbstractIdleService 類,然后自己實(shí)現(xiàn) [startUp()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractIdleService.html#startUp()) 和 [shutDown()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractIdleService.html#shutDown())方法就可以了。
protected void startUp() {
servlets.add(new GcStatsServlet());
}
protected void shutDown() {}
如上面的例子、由于任何請(qǐng)求到 GcStatsServlet 時(shí)已經(jīng)會(huì)有現(xiàn)成線程處理了,所以在服務(wù)運(yùn)行時(shí)就不需要做什么額外動(dòng)作了。
AbstractExecutionThreadService 通過單線程處理啟動(dòng)、運(yùn)行、和關(guān)閉等操作。你必須重載 run()方法,同時(shí)需要能響應(yīng)停止服務(wù)的請(qǐng)求。具體的實(shí)現(xiàn)可以在一個(gè)循環(huán)內(nèi)做處理:
public void run() {
while (isRunning()) {
// perform a unit of work
}
}
另外,你還可以重載 [triggerShutdown()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractExecutionThreadService.html#triggerShutdown())方法讓 run()方法結(jié)束返回。
重載 startUp()和 shutDown()方法是可選的,不影響服務(wù)本身狀態(tài)的管理
protected void startUp() {
dispatcher.listenForConnections(port, queue);
}
protected void run() {
Connection connection;
while ((connection = queue.take() != POISON)) {
process(connection);
}
}
protected void triggerShutdown() {
dispatcher.stopListeningForConnections(queue);
queue.put(POISON);
}
start()內(nèi)部會(huì)調(diào)用 startUp()方法,創(chuàng)建一個(gè)線程、然后在線程內(nèi)調(diào)用 run()方法。stop()會(huì)調(diào)用 triggerShutdown()方法并且等待線程終止。
AbstractScheduledService 類用于在運(yùn)行時(shí)處理一些周期性的任務(wù)。子類可以實(shí)現(xiàn) [runOneIteration()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.html#runOneIteration())方法定義一個(gè)周期執(zhí)行的任務(wù),以及相應(yīng)的 startUp()和 shutDown()方法。為了能夠描述執(zhí)行周期,你需要實(shí)現(xiàn) [scheduler()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.html#scheduler())方法。通常情況下,你可以使用 AbstractScheduledService.Scheduler 類提供的兩種調(diào)度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),類似于 JDK 并發(fā)包中 ScheduledExecutorService 類提供的兩種調(diào)度方式。如要自定義 schedules 則可以使用 CustomScheduler 類來輔助實(shí)現(xiàn);具體用法見 javadoc。
如需要自定義的線程管理、可以通過擴(kuò)展 AbstractService 類來實(shí)現(xiàn)。一般情況下、使用上面的幾個(gè)實(shí)現(xiàn)類就已經(jīng)滿足需求了,但如果在服務(wù)執(zhí)行過程中有一些特定的線程處理需求、則建議繼承 AbstractService 類。
繼承 AbstractService 方法必須實(shí)現(xiàn)兩個(gè)方法.
doStart 和 doStop 方法的實(shí)現(xiàn)需要考慮下性能,盡可能的低延遲。如果初始化的開銷較大,如讀文件,打開網(wǎng)絡(luò)連接,或者其他任何可能引起阻塞的操作,建議移到另外一個(gè)單獨(dú)的線程去處理。
除了對(duì) Service 接口提供基礎(chǔ)的實(shí)現(xiàn)類,Guava 還提供了 ServiceManager 類使得涉及到多個(gè) Service 集合的操作更加容易。通過實(shí)例化 ServiceManager 類來創(chuàng)建一個(gè) Service 集合,你可以通過以下方法來管理它們:
檢測(cè)類的方法有:
我們建議整個(gè)服務(wù)的生命周期都能通過 ServiceManager 來管理,不過即使?fàn)顟B(tài)轉(zhuǎn)換是通過其他機(jī)制觸發(fā)的、也不影響 ServiceManager 方法的正確執(zhí)行。例如:當(dāng)一個(gè)服務(wù)不是通過 startAsync()、而是其他機(jī)制啟動(dòng)時(shí),listeners 仍然可以被正常調(diào)用、awaitHealthy()也能夠正常工作。ServiceManager 唯一強(qiáng)制的要求是當(dāng)其被創(chuàng)建時(shí)所有的服務(wù)必須處于 New 狀態(tài)。
附:TestCase、也可以作為練習(xí) Demo
ServiceTest
</pre>
/*
* Copyright (C) 2013 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.util.concurrent.Service.State.FAILED;
import static com.google.common.util.concurrent.Service.State.NEW;
import static com.google.common.util.concurrent.Service.State.RUNNING;
import static com.google.common.util.concurrent.Service.State.STARTING;
import static com.google.common.util.concurrent.Service.State.STOPPING;
import static com.google.common.util.concurrent.Service.State.TERMINATED;
import junit.framework.TestCase;
/**
* Unit tests for {@link Service}
*/
public class ServiceTest extends TestCase {
/** Assert on the comparison ordering of the State enum since we guarantee it. */
public void testStateOrdering() {
// List every valid (direct) state transition.
assertLessThan(NEW, STARTING);
assertLessThan(NEW, TERMINATED);
assertLessThan(STARTING, RUNNING);
assertLessThan(STARTING, STOPPING);
assertLessThan(STARTING, FAILED);
assertLessThan(RUNNING, STOPPING);
assertLessThan(RUNNING, FAILED);
assertLessThan(STOPPING, FAILED);
assertLessThan(STOPPING, TERMINATED);
}
private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
if (a.compareTo(b) >= 0) {
fail(String.format("Expected %s to be less than %s", a, b));
}
}
}
<pre>
AbstractIdleServiceTest
/*
* Copyright (C) 2009 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static org.truth0.Truth.ASSERT;
import com.google.common.collect.Lists;
import junit.framework.TestCase;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Tests for {@link AbstractIdleService}.
*
* @author Chris Nokleberg
* @author Ben Yu
*/
public class AbstractIdleServiceTest extends TestCase {
// Functional tests using real thread. We only verify publicly visible state.
// Interaction assertions are done by the single-threaded unit tests.
public static class FunctionalTest extends TestCase {
private static class DefaultService extends AbstractIdleService {
@Override protected void startUp() throws Exception {}
@Override protected void shutDown() throws Exception {}
}
public void testServiceStartStop() throws Exception {
AbstractIdleService service = new DefaultService();
service.startAsync().awaitRunning();
assertEquals(Service.State.RUNNING, service.state());
service.stopAsync().awaitTerminated();
assertEquals(Service.State.TERMINATED, service.state());
}
public void testStart_failed() throws Exception {
final Exception exception = new Exception("deliberate");
AbstractIdleService service = new DefaultService() {
@Override protected void startUp() throws Exception {
throw exception;
}
};
try {
service.startAsync().awaitRunning();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(Service.State.FAILED, service.state());
}
public void testStop_failed() throws Exception {
final Exception exception = new Exception("deliberate");
AbstractIdleService service = new DefaultService() {
@Override protected void shutDown() throws Exception {
throw exception;
}
};
service.startAsync().awaitRunning();
try {
service.stopAsync().awaitTerminated();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(Service.State.FAILED, service.state());
}
}
public void testStart() {
TestService service = new TestService();
assertEquals(0, service.startUpCalled);
service.startAsync().awaitRunning();
assertEquals(1, service.startUpCalled);
assertEquals(Service.State.RUNNING, service.state());
ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
}
public void testStart_failed() {
final Exception exception = new Exception("deliberate");
TestService service = new TestService() {
@Override protected void startUp() throws Exception {
super.startUp();
throw exception;
}
};
assertEquals(0, service.startUpCalled);
try {
service.startAsync().awaitRunning();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(1, service.startUpCalled);
assertEquals(Service.State.FAILED, service.state());
ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
}
public void testStop_withoutStart() {
TestService service = new TestService();
service.stopAsync().awaitTerminated();
assertEquals(0, service.startUpCalled);
assertEquals(0, service.shutDownCalled);
assertEquals(Service.State.TERMINATED, service.state());
ASSERT.that(service.transitionStates).isEmpty();
}
public void testStop_afterStart() {
TestService service = new TestService();
service.startAsync().awaitRunning();
assertEquals(1, service.startUpCalled);
assertEquals(0, service.shutDownCalled);
service.stopAsync().awaitTerminated();
assertEquals(1, service.startUpCalled);
assertEquals(1, service.shutDownCalled);
assertEquals(Service.State.TERMINATED, service.state());
ASSERT.that(service.transitionStates)
.has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
}
public void testStop_failed() {
final Exception exception = new Exception("deliberate");
TestService service = new TestService() {
@Override protected void shutDown() throws Exception {
super.shutDown();
throw exception;
}
};
service.startAsync().awaitRunning();
assertEquals(1, service.startUpCalled);
assertEquals(0, service.shutDownCalled);
try {
service.stopAsync().awaitTerminated();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(1, service.startUpCalled);
assertEquals(1, service.shutDownCalled);
assertEquals(Service.State.FAILED, service.state());
ASSERT.that(service.transitionStates)
.has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
}
public void testServiceToString() {
AbstractIdleService service = new TestService();
assertEquals("TestService [NEW]", service.toString());
service.startAsync().awaitRunning();
assertEquals("TestService [RUNNING]", service.toString());
service.stopAsync().awaitTerminated();
assertEquals("TestService [TERMINATED]", service.toString());
}
public void testTimeout() throws Exception {
// Create a service whose executor will never run its commands
Service service = new TestService() {
@Override protected Executor executor() {
return new Executor() {
@Override public void execute(Runnable command) {}
};
}
};
try {
service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
fail("Expected timeout");
} catch (TimeoutException e) {
ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
}
}
private static class TestService extends AbstractIdleService {
int startUpCalled = 0;
int shutDownCalled = 0;
final List<State> transitionStates = Lists.newArrayList();
@Override protected void startUp() throws Exception {
assertEquals(0, startUpCalled);
assertEquals(0, shutDownCalled);
startUpCalled++;
assertEquals(State.STARTING, state());
}
@Override protected void shutDown() throws Exception {
assertEquals(1, startUpCalled);
assertEquals(0, shutDownCalled);
shutDownCalled++;
assertEquals(State.STOPPING, state());
}
@Override protected Executor executor() {
transitionStates.add(state());
return MoreExecutors.sameThreadExecutor();
}
}
}
<pre>
AbstractScheduledServiceTest
</pre>
/*
* Copyright (C) 2011 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.common.util.concurrent.Service.State;
import junit.framework.TestCase;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Unit test for {@link AbstractScheduledService}.
*
* @author Luke Sandberg
*/
public class AbstractScheduledServiceTest extends TestCase {
volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
volatile ScheduledFuture<?> future = null;
volatile boolean atFixedRateCalled = false;
volatile boolean withFixedDelayCalled = false;
volatile boolean scheduleCalled = false;
final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
};
public void testServiceStartStop() throws Exception {
NullService service = new NullService();
service.startAsync().awaitRunning();
assertFalse(future.isDone());
service.stopAsync().awaitTerminated();
assertTrue(future.isCancelled());
}
private class NullService extends AbstractScheduledService {
@Override protected void runOneIteration() throws Exception {}
@Override protected Scheduler scheduler() { return configuration; }
@Override protected ScheduledExecutorService executor() { return executor; }
}
public void testFailOnExceptionFromRun() throws Exception {
TestService service = new TestService();
service.runException = new Exception();
service.startAsync().awaitRunning();
service.runFirstBarrier.await();
service.runSecondBarrier.await();
try {
future.get();
fail();
} catch (ExecutionException e) {
// An execution exception holds a runtime exception (from throwables.propogate) that holds our
// original exception.
assertEquals(service.runException, e.getCause().getCause());
}
assertEquals(service.state(), Service.State.FAILED);
}
public void testFailOnExceptionFromStartUp() {
TestService service = new TestService();
service.startUpException = new Exception();
try {
service.startAsync().awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals(service.startUpException, e.getCause());
}
assertEquals(0, service.numberOfTimesRunCalled.get());
assertEquals(Service.State.FAILED, service.state());
}
public void testFailOnExceptionFromShutDown() throws Exception {
TestService service = new TestService();
service.shutDownException = new Exception();
service.startAsync().awaitRunning();
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
try {
service.awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(service.shutDownException, e.getCause());
}
assertEquals(Service.State.FAILED, service.state());
}
public void testRunOneIterationCalledMultipleTimes() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.stopAsync().awaitTerminated();
}
public void testExecutorOnlyCalledOnce() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
// It should be called once during startup.
assertEquals(1, service.numberOfTimesExecutorCalled.get());
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.stopAsync().awaitTerminated();
// Only called once overall.
assertEquals(1, service.numberOfTimesExecutorCalled.get());
}
public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
final CountDownLatch terminationLatch = new CountDownLatch(1);
AbstractScheduledService service = new AbstractScheduledService() {
volatile ScheduledExecutorService executorService;
@Override protected void runOneIteration() throws Exception {}
@Override protected ScheduledExecutorService executor() {
if (executorService == null) {
executorService = super.executor();
// Add a listener that will be executed after the listener that shuts down the executor.
addListener(new Listener() {
@Override public void terminated(State from) {
terminationLatch.countDown();
}
}, MoreExecutors.sameThreadExecutor());
}
return executorService;
}
@Override protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
}};
service.startAsync();
assertFalse(service.executor().isShutdown());
service.awaitRunning();
service.stopAsync();
terminationLatch.await();
assertTrue(service.executor().isShutdown());
assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
}
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
final CountDownLatch failureLatch = new CountDownLatch(1);
AbstractScheduledService service = new AbstractScheduledService() {
volatile ScheduledExecutorService executorService;
@Override protected void runOneIteration() throws Exception {}
@Override protected void startUp() throws Exception {
throw new Exception("Failed");
}
@Override protected ScheduledExecutorService executor() {
if (executorService == null) {
executorService = super.executor();
// Add a listener that will be executed after the listener that shuts down the executor.
addListener(new Listener() {
@Override public void failed(State from, Throwa