I am unable to get a Google pubsub Java async client to shutdown cleanly. After calling Subscriber.stopAsync() I am getting exceptions like this
14:30:07.600 [grpc-default-worker-ELG-1-2] WARN io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise - An exception was thrown by io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete()
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@724c721d rejected from java.util.concurrent.ScheduledThreadPoolExecutor@36bdb610[Shutting down, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 19]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[?:1.8.0_144]
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) ~[?:1.8.0_144]
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) ~[?:1.8.0_144]
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) ~[?:1.8.0_144]
at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93) ~[grpc-core-1.15.0.jar:1.15.0]
at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86) ~[grpc-core-1.15.0.jar:1.15.0]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.onReady(ClientCallImpl.java:611) ~[grpc-core-1.15.0.jar:1.15.0]
at io.grpc.internal.ForwardingClientStreamListener.onReady(ForwardingClientStreamListener.java:49) ~[grpc-core-1.15.0.jar:1.15.0]
at io.grpc.internal.AbstractStream$TransportState.notifyIfReady(AbstractStream.java:298) ~[grpc-core-1.15.0.jar:1.15.0]
at io.grpc.internal.AbstractStream$TransportState.onStreamAllocated(AbstractStream.java:237) ~[grpc-core-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.NettyClientStream$TransportState.setHttp2Stream(NettyClientStream.java:249) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:521) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:509) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.tryPromise(Http2CodecUtil.java:378) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:344) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:256) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:52) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:696) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:409) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1396) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.forceFlush(SslHandler.java:1776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:775) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.flush(SslHandler.java:752) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.gracefulClose(NettyClientHandler.java:631) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:300) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1061) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.write(AbstractChannel.java:295) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$AbstractQueuedCommand.run(WriteQueue.java:174) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:112) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [grpc-netty-shaded-1.15.0.jar:1.15.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
I think this is indirectly caused by consumers trying to ack the message they processed.
My expectation is that after calling stopAsync() that no more messages would be pulled from the server but that messages buffered on the client would be delivered to my callback, and that I would be able to ack or nak those messages and any I am in the middle of processing, but I can't seem to make this work.
I can't see any other methods on Subscription I could call to effect a graceful shutdown, am I missing something?
Obviously these messages will eventually be re-delivered, but I would prefer to process the messages in my buffer before shutting down, and I would prefer to avoid "normal" exceptions in the logs.