+ where conduit = runConduit $ chanSource chan .| preprocessor .| runConduitParser (catchError session handler)
+ handler e@(Unexpected "ConduitParser.empty") = do
+
+ -- Horrible way to get last item in conduit:
+ -- Add a fake message so we can tell when to stop
+ liftIO $ writeChan chan (RspShutdown (ResponseMessage "EMPTY" IdRspNull Nothing Nothing))
+ x <- peek
+ case x of
+ Just x -> do
+ lastMsg <- skipToEnd x
+ name <- getParserName
+ liftIO $ throw (UnexpectedMessageException (T.unpack name) lastMsg)
+ Nothing -> throw e
+
+ handler e = throw e
+
+ skipToEnd x = do
+ y <- peek
+ case y of
+ Just (RspShutdown (ResponseMessage "EMPTY" IdRspNull Nothing Nothing)) -> return x
+ Just _ -> await >>= skipToEnd
+ Nothing -> return x