鍍金池/ 教程/ Java/ Google-Guava Concurrent 包里的 Service 框架淺析
不可變集合
排序: Guava 強(qiáng)大的”流暢風(fēng)格比較器”
強(qiáng)大的集合工具類:java.util.Collections 中未包含的集合工具
新集合類型
常見 Object 方法
I/O
前置條件
字符串處理:分割,連接,填充
散列
原生類型
數(shù)學(xué)運(yùn)算
使用和避免 null
Throwables:簡(jiǎn)化異常和錯(cuò)誤的傳播與檢查
google Guava 包的 ListenableFuture 解析
事件總線
緩存
函數(shù)式編程
區(qū)間
集合擴(kuò)展工具類
Google-Guava Concurrent 包里的 Service 框架淺析
google Guava 包的 reflection 解析

Google-Guava Concurrent 包里的 Service 框架淺析

概述

Guava 包里的 Service 接口用于封裝一個(gè)服務(wù)對(duì)象的運(yùn)行狀態(tài)、包括 start 和 stop 等方法。例如 web 服務(wù)器,RPC 服務(wù)器、計(jì)時(shí)器等可以實(shí)現(xiàn)這個(gè)接口。對(duì)此類服務(wù)的狀態(tài)管理并不輕松、需要對(duì)服務(wù)的開啟/關(guān)閉進(jìn)行妥善管理、特別是在多線程環(huán)境下尤為復(fù)雜。Guava 包提供了一些基礎(chǔ)類幫助你管理復(fù)雜的狀態(tài)轉(zhuǎn)換邏輯和同步細(xì)節(jié)。

使用一個(gè)服務(wù)

一個(gè)服務(wù)正常生命周期有:

服務(wù)一旦被停止就無法再重新啟動(dòng)了。如果服務(wù)在 starting、running、stopping 狀態(tài)出現(xiàn)問題、會(huì)進(jìn)入 Service.State.FAILED.狀態(tài)。調(diào)用 [startAsync()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#startAsync())方法可以異步開啟一個(gè)服務(wù),同時(shí)返回 this 對(duì)象形成方法調(diào)用鏈。注意:只有在當(dāng)前服務(wù)的狀態(tài)是 NEW 時(shí)才能調(diào)用 startAsync()方法,因此最好在應(yīng)用中有一個(gè)統(tǒng)一的地方初始化相關(guān)服務(wù)。停止一個(gè)服務(wù)也是類似的、使用異步方法 [stopAsync()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#stopAsync()) 。但是不像 startAsync(),多次調(diào)用這個(gè)方法是安全的。這是為了方便處理關(guān)閉服務(wù)時(shí)候的鎖競(jìng)爭(zhēng)問題。

Service 也提供了一些方法用于等待服務(wù)狀態(tài)轉(zhuǎn)換的完成:

通過 [addListener()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#addListener())方法異步添加監(jiān)聽器。此方法允許你添加一個(gè) Service.Listener 、它會(huì)在每次服務(wù)狀態(tài)轉(zhuǎn)換的時(shí)候被調(diào)用。注意:最好在服務(wù)啟動(dòng)之前添加 Listener(這時(shí)的狀態(tài)是 NEW)、否則之前已發(fā)生的狀態(tài)轉(zhuǎn)換事件是無法在新添加的 Listener上被重新觸發(fā)的。

同步使用 [awaitRunning()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#awaitRunning())。這個(gè)方法不能被打斷、不強(qiáng)制捕獲異常、一旦服務(wù)啟動(dòng)就會(huì)返回。如果服務(wù)沒有成功啟動(dòng),會(huì)拋出 IllegalStateException 異常。同樣的, [awaitTerminated()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Service.html#awaitTerminated()) 方法會(huì)等待服務(wù)達(dá)到終止?fàn)顟B(tài)(TERMINATED 或者 FAILED)。兩個(gè)方法都有重載方法允許傳入超時(shí)時(shí)間。

Service 接口本身實(shí)現(xiàn)起來會(huì)比較復(fù)雜、且容易碰到一些捉摸不透的問題。因此我們不推薦直接實(shí)現(xiàn)這個(gè)接口。而是請(qǐng)繼承 Guava 包里已經(jīng)封裝好的基礎(chǔ)抽象類。每個(gè)基礎(chǔ)類支持一種特定的線程模型。

基礎(chǔ)實(shí)現(xiàn)類

AbstractIdleService

AbstractIdleService 類簡(jiǎn)單實(shí)現(xiàn)了 Service 接口、其在 running 狀態(tài)時(shí)不會(huì)執(zhí)行任何動(dòng)作–因此在 running 時(shí)也不需要啟動(dòng)線程–但需要處理開啟/關(guān)閉動(dòng)作。要實(shí)現(xiàn)一個(gè)此類的服務(wù),只需繼承 AbstractIdleService 類,然后自己實(shí)現(xiàn) [startUp()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractIdleService.html#startUp()) 和 [shutDown()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractIdleService.html#shutDown())方法就可以了。


    protected void startUp() {
    servlets.add(new GcStatsServlet());
    }
    protected void shutDown() {}

如上面的例子、由于任何請(qǐng)求到 GcStatsServlet 時(shí)已經(jīng)會(huì)有現(xiàn)成線程處理了,所以在服務(wù)運(yùn)行時(shí)就不需要做什么額外動(dòng)作了。

AbstractExecutionThreadService

AbstractExecutionThreadService 通過單線程處理啟動(dòng)、運(yùn)行、和關(guān)閉等操作。你必須重載 run()方法,同時(shí)需要能響應(yīng)停止服務(wù)的請(qǐng)求。具體的實(shí)現(xiàn)可以在一個(gè)循環(huán)內(nèi)做處理:


    public void run() {
      while (isRunning()) {
        // perform a unit of work
      }
    }

另外,你還可以重載 [triggerShutdown()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractExecutionThreadService.html#triggerShutdown())方法讓 run()方法結(jié)束返回。

重載 startUp()和 shutDown()方法是可選的,不影響服務(wù)本身狀態(tài)的管理


    protected void startUp() {
    dispatcher.listenForConnections(port, queue);
     }
     protected void run() {
       Connection connection;
       while ((connection = queue.take() != POISON)) {
     process(connection);
       }
     }
     protected void triggerShutdown() {
       dispatcher.stopListeningForConnections(queue);
       queue.put(POISON);
     }

start()內(nèi)部會(huì)調(diào)用 startUp()方法,創(chuàng)建一個(gè)線程、然后在線程內(nèi)調(diào)用 run()方法。stop()會(huì)調(diào)用 triggerShutdown()方法并且等待線程終止。

AbstractScheduledService

AbstractScheduledService 類用于在運(yùn)行時(shí)處理一些周期性的任務(wù)。子類可以實(shí)現(xiàn) [runOneIteration()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.html#runOneIteration())方法定義一個(gè)周期執(zhí)行的任務(wù),以及相應(yīng)的 startUp()和 shutDown()方法。為了能夠描述執(zhí)行周期,你需要實(shí)現(xiàn) [scheduler()](http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/AbstractScheduledService.html#scheduler())方法。通常情況下,你可以使用 AbstractScheduledService.Scheduler 類提供的兩種調(diào)度器:newFixedRateSchedule(initialDelay, delay, TimeUnit)newFixedDelaySchedule(initialDelay, delay, TimeUnit),類似于 JDK 并發(fā)包中 ScheduledExecutorService 類提供的兩種調(diào)度方式。如要自定義 schedules 則可以使用 CustomScheduler 類來輔助實(shí)現(xiàn);具體用法見 javadoc。

AbstractService

如需要自定義的線程管理、可以通過擴(kuò)展 AbstractService 類來實(shí)現(xiàn)。一般情況下、使用上面的幾個(gè)實(shí)現(xiàn)類就已經(jīng)滿足需求了,但如果在服務(wù)執(zhí)行過程中有一些特定的線程處理需求、則建議繼承 AbstractService 類。

繼承 AbstractService 方法必須實(shí)現(xiàn)兩個(gè)方法.

doStart 和 doStop 方法的實(shí)現(xiàn)需要考慮下性能,盡可能的低延遲。如果初始化的開銷較大,如讀文件,打開網(wǎng)絡(luò)連接,或者其他任何可能引起阻塞的操作,建議移到另外一個(gè)單獨(dú)的線程去處理。

使用 ServiceManager

除了對(duì) Service 接口提供基礎(chǔ)的實(shí)現(xiàn)類,Guava 還提供了 ServiceManager 類使得涉及到多個(gè) Service 集合的操作更加容易。通過實(shí)例化 ServiceManager 類來創(chuàng)建一個(gè) Service 集合,你可以通過以下方法來管理它們:

檢測(cè)類的方法有:

我們建議整個(gè)服務(wù)的生命周期都能通過 ServiceManager 來管理,不過即使?fàn)顟B(tài)轉(zhuǎn)換是通過其他機(jī)制觸發(fā)的、也不影響 ServiceManager 方法的正確執(zhí)行。例如:當(dāng)一個(gè)服務(wù)不是通過 startAsync()、而是其他機(jī)制啟動(dòng)時(shí),listeners 仍然可以被正常調(diào)用、awaitHealthy()也能夠正常工作。ServiceManager 唯一強(qiáng)制的要求是當(dāng)其被創(chuàng)建時(shí)所有的服務(wù)必須處于 New 狀態(tài)。

附:TestCase、也可以作為練習(xí) Demo

ServiceTest


    </pre>
    /*
     * Copyright (C) 2013 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    package com.google.common.util.concurrent;

    import static com.google.common.util.concurrent.Service.State.FAILED;
    import static com.google.common.util.concurrent.Service.State.NEW;
    import static com.google.common.util.concurrent.Service.State.RUNNING;
    import static com.google.common.util.concurrent.Service.State.STARTING;
    import static com.google.common.util.concurrent.Service.State.STOPPING;
    import static com.google.common.util.concurrent.Service.State.TERMINATED;

    import junit.framework.TestCase;

    /**
     * Unit tests for {@link Service}
     */
    public class ServiceTest extends TestCase {

    /** Assert on the comparison ordering of the State enum since we guarantee it. */
     public void testStateOrdering() {
     // List every valid (direct) state transition.
     assertLessThan(NEW, STARTING);
     assertLessThan(NEW, TERMINATED);

     assertLessThan(STARTING, RUNNING);
     assertLessThan(STARTING, STOPPING);
     assertLessThan(STARTING, FAILED);

     assertLessThan(RUNNING, STOPPING);
     assertLessThan(RUNNING, FAILED);

     assertLessThan(STOPPING, FAILED);
     assertLessThan(STOPPING, TERMINATED);
     }

     private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
     if (a.compareTo(b) >= 0) {
     fail(String.format("Expected %s to be less than %s", a, b));
     }
     }
    }
    <pre>

AbstractIdleServiceTest


    /*
     * Copyright (C) 2009 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    package com.google.common.util.concurrent;

    import static org.truth0.Truth.ASSERT;

    import com.google.common.collect.Lists;

    import junit.framework.TestCase;

    import java.util.List;
    import java.util.concurrent.Executor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;

    /**
     * Tests for {@link AbstractIdleService}.
     *
     * @author Chris Nokleberg
     * @author Ben Yu
     */
    public class AbstractIdleServiceTest extends TestCase {

    // Functional tests using real thread. We only verify publicly visible state.
     // Interaction assertions are done by the single-threaded unit tests.

    public static class FunctionalTest extends TestCase {

    private static class DefaultService extends AbstractIdleService {
     @Override protected void startUp() throws Exception {}
     @Override protected void shutDown() throws Exception {}
     }

    public void testServiceStartStop() throws Exception {
     AbstractIdleService service = new DefaultService();
     service.startAsync().awaitRunning();
     assertEquals(Service.State.RUNNING, service.state());
     service.stopAsync().awaitTerminated();
     assertEquals(Service.State.TERMINATED, service.state());
     }

    public void testStart_failed() throws Exception {
     final Exception exception = new Exception("deliberate");
     AbstractIdleService service = new DefaultService() {
     @Override protected void startUp() throws Exception {
     throw exception;
     }
     };
     try {
     service.startAsync().awaitRunning();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(Service.State.FAILED, service.state());
     }

    public void testStop_failed() throws Exception {
     final Exception exception = new Exception("deliberate");
     AbstractIdleService service = new DefaultService() {
     @Override protected void shutDown() throws Exception {
     throw exception;
     }
     };
     service.startAsync().awaitRunning();
     try {
     service.stopAsync().awaitTerminated();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(Service.State.FAILED, service.state());
     }
     }

    public void testStart() {
     TestService service = new TestService();
     assertEquals(0, service.startUpCalled);
     service.startAsync().awaitRunning();
     assertEquals(1, service.startUpCalled);
     assertEquals(Service.State.RUNNING, service.state());
     ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
     }

    public void testStart_failed() {
     final Exception exception = new Exception("deliberate");
     TestService service = new TestService() {
     @Override protected void startUp() throws Exception {
     super.startUp();
     throw exception;
     }
     };
     assertEquals(0, service.startUpCalled);
     try {
     service.startAsync().awaitRunning();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(1, service.startUpCalled);
     assertEquals(Service.State.FAILED, service.state());
     ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
     }

    public void testStop_withoutStart() {
     TestService service = new TestService();
     service.stopAsync().awaitTerminated();
     assertEquals(0, service.startUpCalled);
     assertEquals(0, service.shutDownCalled);
     assertEquals(Service.State.TERMINATED, service.state());
     ASSERT.that(service.transitionStates).isEmpty();
     }

    public void testStop_afterStart() {
     TestService service = new TestService();
     service.startAsync().awaitRunning();
     assertEquals(1, service.startUpCalled);
     assertEquals(0, service.shutDownCalled);
     service.stopAsync().awaitTerminated();
     assertEquals(1, service.startUpCalled);
     assertEquals(1, service.shutDownCalled);
     assertEquals(Service.State.TERMINATED, service.state());
     ASSERT.that(service.transitionStates)
     .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
     }

    public void testStop_failed() {
     final Exception exception = new Exception("deliberate");
     TestService service = new TestService() {
     @Override protected void shutDown() throws Exception {
     super.shutDown();
     throw exception;
     }
     };
     service.startAsync().awaitRunning();
     assertEquals(1, service.startUpCalled);
     assertEquals(0, service.shutDownCalled);
     try {
     service.stopAsync().awaitTerminated();
     fail();
     } catch (RuntimeException e) {
     assertSame(exception, e.getCause());
     }
     assertEquals(1, service.startUpCalled);
     assertEquals(1, service.shutDownCalled);
     assertEquals(Service.State.FAILED, service.state());
     ASSERT.that(service.transitionStates)
     .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
     }

    public void testServiceToString() {
     AbstractIdleService service = new TestService();
     assertEquals("TestService [NEW]", service.toString());
     service.startAsync().awaitRunning();
     assertEquals("TestService [RUNNING]", service.toString());
     service.stopAsync().awaitTerminated();
     assertEquals("TestService [TERMINATED]", service.toString());
     }

    public void testTimeout() throws Exception {
     // Create a service whose executor will never run its commands
     Service service = new TestService() {
     @Override protected Executor executor() {
     return new Executor() {
     @Override public void execute(Runnable command) {}
     };
     }
     };
     try {
     service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
     fail("Expected timeout");
     } catch (TimeoutException e) {
     ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
     }
     }

    private static class TestService extends AbstractIdleService {
     int startUpCalled = 0;
     int shutDownCalled = 0;
     final List<State> transitionStates = Lists.newArrayList();

    @Override protected void startUp() throws Exception {
     assertEquals(0, startUpCalled);
     assertEquals(0, shutDownCalled);
     startUpCalled++;
     assertEquals(State.STARTING, state());
     }

    @Override protected void shutDown() throws Exception {
     assertEquals(1, startUpCalled);
     assertEquals(0, shutDownCalled);
     shutDownCalled++;
     assertEquals(State.STOPPING, state());
     }

    @Override protected Executor executor() {
     transitionStates.add(state());
     return MoreExecutors.sameThreadExecutor();
     }
     }
    }

    <pre>

AbstractScheduledServiceTest


    </pre>
    /*
     * Copyright (C) 2011 The Guava Authors
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */

    package com.google.common.util.concurrent;

    import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
    import com.google.common.util.concurrent.Service.State;

    import junit.framework.TestCase;

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
     * Unit test for {@link AbstractScheduledService}.
     *
     * @author Luke Sandberg
     */

    public class AbstractScheduledServiceTest extends TestCase {

    volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
     volatile ScheduledFuture<?> future = null;

    volatile boolean atFixedRateCalled = false;
     volatile boolean withFixedDelayCalled = false;
     volatile boolean scheduleCalled = false;

    final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
     @Override
     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
     long delay, TimeUnit unit) {
     return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
     }
     };

    public void testServiceStartStop() throws Exception {
     NullService service = new NullService();
     service.startAsync().awaitRunning();
     assertFalse(future.isDone());
     service.stopAsync().awaitTerminated();
     assertTrue(future.isCancelled());
     }

    private class NullService extends AbstractScheduledService {
     @Override protected void runOneIteration() throws Exception {}
     @Override protected Scheduler scheduler() { return configuration; }
     @Override protected ScheduledExecutorService executor() { return executor; }
     }

    public void testFailOnExceptionFromRun() throws Exception {
     TestService service = new TestService();
     service.runException = new Exception();
     service.startAsync().awaitRunning();
     service.runFirstBarrier.await();
     service.runSecondBarrier.await();
     try {
     future.get();
     fail();
     } catch (ExecutionException e) {
     // An execution exception holds a runtime exception (from throwables.propogate) that holds our
     // original exception.
     assertEquals(service.runException, e.getCause().getCause());
     }
     assertEquals(service.state(), Service.State.FAILED);
     }

    public void testFailOnExceptionFromStartUp() {
     TestService service = new TestService();
     service.startUpException = new Exception();
     try {
     service.startAsync().awaitRunning();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(service.startUpException, e.getCause());
     }
     assertEquals(0, service.numberOfTimesRunCalled.get());
     assertEquals(Service.State.FAILED, service.state());
     }

    public void testFailOnExceptionFromShutDown() throws Exception {
     TestService service = new TestService();
     service.shutDownException = new Exception();
     service.startAsync().awaitRunning();
     service.runFirstBarrier.await();
     service.stopAsync();
     service.runSecondBarrier.await();
     try {
     service.awaitTerminated();
     fail();
     } catch (IllegalStateException e) {
     assertEquals(service.shutDownException, e.getCause());
     }
     assertEquals(Service.State.FAILED, service.state());
     }

    public void testRunOneIterationCalledMultipleTimes() throws Exception {
     TestService service = new TestService();
     service.startAsync().awaitRunning();
     for (int i = 1; i < 10; i++) {
     service.runFirstBarrier.await();
     assertEquals(i, service.numberOfTimesRunCalled.get());
     service.runSecondBarrier.await();
     }
     service.runFirstBarrier.await();
     service.stopAsync();
     service.runSecondBarrier.await();
     service.stopAsync().awaitTerminated();
     }

    public void testExecutorOnlyCalledOnce() throws Exception {
     TestService service = new TestService();
     service.startAsync().awaitRunning();
     // It should be called once during startup.
     assertEquals(1, service.numberOfTimesExecutorCalled.get());
     for (int i = 1; i < 10; i++) {
     service.runFirstBarrier.await();
     assertEquals(i, service.numberOfTimesRunCalled.get());
     service.runSecondBarrier.await();
     }
     service.runFirstBarrier.await();
     service.stopAsync();
     service.runSecondBarrier.await();
     service.stopAsync().awaitTerminated();
     // Only called once overall.
     assertEquals(1, service.numberOfTimesExecutorCalled.get());
     }

    public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
     final CountDownLatch terminationLatch = new CountDownLatch(1);
     AbstractScheduledService service = new AbstractScheduledService() {
     volatile ScheduledExecutorService executorService;
     @Override protected void runOneIteration() throws Exception {}

    @Override protected ScheduledExecutorService executor() {
     if (executorService == null) {
     executorService = super.executor();
     // Add a listener that will be executed after the listener that shuts down the executor.
     addListener(new Listener() {
     @Override public void terminated(State from) {
     terminationLatch.countDown();
     }
     }, MoreExecutors.sameThreadExecutor());
     }
     return executorService;
     }

    @Override protected Scheduler scheduler() {
     return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
     }};

    service.startAsync();
     assertFalse(service.executor().isShutdown());
     service.awaitRunning();
     service.stopAsync();
     terminationLatch.await();
     assertTrue(service.executor().isShutdown());
     assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
     }

    public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
     final CountDownLatch failureLatch = new CountDownLatch(1);
     AbstractScheduledService service = new AbstractScheduledService() {
     volatile ScheduledExecutorService executorService;
     @Override protected void runOneIteration() throws Exception {}

    @Override protected void startUp() throws Exception {
     throw new Exception("Failed");
     }

    @Override protected ScheduledExecutorService executor() {
     if (executorService == null) {
     executorService = super.executor();
     // Add a listener that will be executed after the listener that shuts down the executor.
     addListener(new Listener() {
     @Override public void failed(State from, Throwa