`
dong_ming
  • 浏览: 13097 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

利用netty中的future获取异步执行的结果

阅读更多
      前段时间使用netty3,感受到其对于future的设计在写异步操作时的高效与便捷,通过future与futurelistener的组合实现异步的通知。这个在平时写异步执行代码的中经常用到。

      其实JDK也有Future这个接口,是active object模式的一种实现。最主要的思想就是让任务的调度和任务的执行分离。在一个主线程中发起一个任务,将这个任务有另一个线程去异步的执行,主线程继续执行其他的逻辑,当需要那个异步执行的结果的时候从Future中去get()这个结果。
public class FutureTest {
	public static void main(String[] args) throws InterruptedException {

		// 执行异步任务的线程池
		ExecutorService executor = Executors.newCachedThreadPool();
		Future<String> future = executor.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				return "hello, world!";
			}
		});
		// 做其他的任务balabalbala...

		//需要刚刚执行的那个结果,从future中获取结果
		try {
			String result = future.get();
			System.out.println("got the resulit:" + result);
		} catch (ExecutionException e) {
			e.printStackTrace();
		}finally{
			executor.shutdown();
		}
	}
}

      上面的代码举个不恰当的例子,是这样一种情况:你在逛街的时候买奶茶,和营业员说我要一杯奶茶,营业员收到你的钱,然后给你一个小票,等奶茶好了你到时候来取。你觉得可以利用这个时间走开又去隔壁的面包店逛逛。有一点要注意的是当奶茶好的时候营业员不会打电话叫你,而是你自己去询问营业员奶茶好了没有,如果好了了你就可以立马拿走,如果没有就要麻烦你稍微等一下了,更糟糕的情况是制作奶茶的机器出了问题(产生异常了)。
       netty的future主要是配合FutureListener的使用,来达到异步通知的功能。也就是你买奶茶的时候,如果奶茶制作完成,营业员会通过电话来通知你。代码对直接吧Nettty直接拿了过来,做了少许的改动。
ComputeFuture接口
public interface ComputeFuture {

	boolean isDone();

	boolean isCancelled();

	boolean isSuccess();

	Throwable getCause();

	boolean cancel();

	boolean setSuccess(Object result);

	boolean setFailure(Throwable cause);

	void addListener(ComputeFutureListener listener);

	void removeListener(ComputeFutureListener listener);

	ComputeFuture sync() throws InterruptedException;

	ComputeFuture syncUninterruptibly();

	ComputeFuture await() throws InterruptedException;
	
	ComputeFuture awaitUninterruptibly();

	boolean await(long timeout, TimeUnit unit) throws InterruptedException;

	boolean await(long timeoutMillis) throws InterruptedException;

	boolean awaitUninterruptibly(long timeout, TimeUnit unit);

	boolean awaitUninterruptibly(long timeoutMillis);
	
	 Object getResult() ;

}

ComputeFuture实现类
public class DefaultComputeFutureImpl implements ComputeFuture {

	private static final Throwable CANCELLED = new Throwable();

	private final boolean cancellable;

	private ComputeFutureListener firstListener;
	private List<ComputeFutureListener> otherListeners;
	private boolean done;
	private Throwable cause;
	private int waiters;
	
	private Object result;

	public Object getResult() {
		return result;
	}

	public DefaultComputeFutureImpl(boolean cancellable) {
		this.cancellable = cancellable;
	}

	@Override
	public synchronized boolean isDone() {
		return done;
	}

	@Override
	public synchronized boolean isSuccess() {
		return done && cause == null;
	}

	@Override
	public synchronized Throwable getCause() {
		if (cause != CANCELLED) {
			return cause;
		} else {
			return null;
		}
	}

	@Override
	public synchronized boolean isCancelled() {
		return cause == CANCELLED;
	}

	@Override
	public void addListener(ComputeFutureListener listener) {
		if (listener == null) {
			throw new NullPointerException("listener");
		}

		boolean notifyNow = false;
		synchronized (this) {
			if (done) {
				notifyNow = true;
			} else {
				if (firstListener == null) {
					firstListener = listener;
				} else {
					if (otherListeners == null) {
						otherListeners = new ArrayList<ComputeFutureListener>(1);
					}
					otherListeners.add(listener);
				}
			}
		}

		if (notifyNow) {
			notifyListener(listener);
		}
	}

	@Override
	public void removeListener(ComputeFutureListener listener) {
		if (listener == null) {
			throw new NullPointerException("listener");
		}

		synchronized (this) {
			if (!done) {
				if (listener == firstListener) {
					if (otherListeners != null && !otherListeners.isEmpty()) {
						firstListener = otherListeners.remove(0);
					} else {
						firstListener = null;
					}
				} else if (otherListeners != null) {
					otherListeners.remove(listener);
				}
			}
		}
	}

	@Override
	public ComputeFuture sync() throws InterruptedException {
		await();
		rethrowIfFailed0();
		return this;
	}

	@Override
	public ComputeFuture syncUninterruptibly() {
		awaitUninterruptibly();
		rethrowIfFailed0();
		return this;
	}

	private void rethrowIfFailed0() {
		Throwable cause = getCause();
		if (cause == null) {
			return;
		}

		if (cause instanceof RuntimeException) {
			throw (RuntimeException) cause;
		}

		if (cause instanceof Error) {
			throw (Error) cause;
		}

		throw new RuntimeErrorException((Error) cause);
	}

	@Override
	public ComputeFuture await() throws InterruptedException {
		if (Thread.interrupted()) {
			throw new InterruptedException();
		}

		synchronized (this) {
			while (!done) {
				waiters++;
				try {
					wait();
				} finally {
					waiters--;
				}
			}
		}
		return this;
	}

	@Override
	public boolean await(long timeout, TimeUnit unit)
			throws InterruptedException {
		return await0(unit.toNanos(timeout), true);
	}

	public boolean await(long timeoutMillis) throws InterruptedException {
		return await0(MILLISECONDS.toNanos(timeoutMillis), true);
	}
	@Override
	public ComputeFuture awaitUninterruptibly() {
		boolean interrupted = false;
		synchronized (this) {
			while (!done) {
				waiters++;
				try {
					wait();
				} catch (InterruptedException e) {
					interrupted = true;
				} finally {
					waiters--;
				}
			}
		}

		if (interrupted) {
			Thread.currentThread().interrupt();
		}

		return this;
	}
	@Override
	public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
		try {
			return await0(unit.toNanos(timeout), false);
		} catch (InterruptedException e) {
			throw new InternalError();
		}
	}
	@Override
	public boolean awaitUninterruptibly(long timeoutMillis) {
		try {
			return await0(MILLISECONDS.toNanos(timeoutMillis), false);
		} catch (InterruptedException e) {
			throw new InternalError();
		}
	}

	private boolean await0(long timeoutNanos, boolean interruptable)
			throws InterruptedException {
		if (interruptable && Thread.interrupted()) {
			throw new InterruptedException();
		}

		long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
		long waitTime = timeoutNanos;
		boolean interrupted = false;

		try {
			synchronized (this) {
				if (done || waitTime <= 0) {
					return done;
				}

				waiters++;
				try {
					for (;;) {
						try {
							wait(waitTime / 1000000, (int) (waitTime % 1000000));
						} catch (InterruptedException e) {
							if (interruptable) {
								throw e;
							} else {
								interrupted = true;
							}
						}

						if (done) {
							return true;
						} else {
							waitTime = timeoutNanos
									- (System.nanoTime() - startTime);
							if (waitTime <= 0) {
								return done;
							}
						}
					}
				} finally {
					waiters--;
				}
			}
		} finally {
			if (interrupted) {
				Thread.currentThread().interrupt();
			}
		}
	}
	@Override
	public boolean setSuccess(Object result) {
		synchronized (this) {
			// Allow only once.
			if (done) {
				return false;
			}

			done = true;
			this.result = result;
			if (waiters > 0) {
				notifyAll();
			}
		}

		notifyListeners();
		return true;
	}
	@Override
	public boolean setFailure(Throwable cause) {
		if (cause == null) {
			throw new NullPointerException("cause");
		}

		synchronized (this) {
			// Allow only once.
			if (done) {
				return false;
			}

			this.cause = cause;
			done = true;
			if (waiters > 0) {
				notifyAll();
			}
		}

		notifyListeners();
		return true;
	}
	@Override
	public boolean cancel() {
		if (!cancellable) {
			return false;
		}

		synchronized (this) {
			// Allow only once.
			if (done) {
				return false;
			}

			cause = CANCELLED;
			done = true;
			if (waiters > 0) {
				notifyAll();
			}
		}

		notifyListeners();
		return true;
	}

	private void notifyListeners() {
		if (firstListener != null) {
			notifyListener(firstListener);
			firstListener = null;

			if (otherListeners != null) {
				for (ComputeFutureListener l : otherListeners) {
					notifyListener(l);
				}
				otherListeners = null;
			}
		}
	}

	private void notifyListener(ComputeFutureListener l) {
		try {
			l.operationComplete(this);
		} catch (Throwable t) {
			t.printStackTrace();
		}
	}

}


再来一个listener接口
public interface ComputeFutureListener {
	
	void operationComplete(ComputeFuture future) throws Exception;
}


一个最简单的计算接口,实现一个int的加法
public interface BasicCompute {
	
	ComputeFuture add(int a ,int b);
}

计算接口的实现类
public class BasicComputeImpl implements BasicCompute {

	private ComputeFuture future;
	// 执行异步任务的线程池
	private ExecutorService executor = Executors.newCachedThreadPool();

	public BasicComputeImpl(ComputeFuture future) {
		this.future = future;
	}

	@Override
	public ComputeFuture add(int a, int b) {
		
		executor.execute(new Runnable() {
			
			@Override
			public void run() {
				try {
					Thread.sleep(10000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName() + ",start to compute......");
				int result = a+b;
				System.out.println(Thread.currentThread().getName()  +",got the result:" + result);
				future.setSuccess(result);
			}
		});
		return future;
	}
}


测试类
	public static void main(String[] args) {

		BasicCompute computeActor = new BasicComputeImpl(
				new DefaultComputeFutureImpl(false));
		ComputeFuture future = computeActor.add(2, 3);
		future.addListener(new ComputeFutureListener() {

			@Override
			public void operationComplete(ComputeFuture future)
					throws Exception {

				if (future.isSuccess()) {
					System.out.println(Thread.currentThread().getName()
							+ ",the result is:" + future.getResult());
				}
			}
		});
		
		System.out.println(Thread.currentThread().getName()+ ",let's wait the result...");
	}
}


     执行的结果如下:
     main,let's wait the result...
     pool-1-thread-1,start to compute......
     pool-1-thread-1,got the result:5
     pool-1-thread-1,the result is:5
     这里之所以把线程的名称打出来,是为了搞清楚这个监听器的执行是在哪个线程中执行的。当我们需要开发异步的程序的时候就可以使用这个类来完成通知获取结果的操作。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics