=== modified file 'src/Makefile.am'
--- src/Makefile.am	2014-04-30 10:50:09 +0000
+++ src/Makefile.am	2014-05-11 11:00:34 +0000
@@ -282,41 +282,40 @@
 	AuthReg.h \
 	YesNoNone.h \
 	YesNoNone.cc \
 	RefreshPattern.h \
 	cache_cf.cc \
 	CacheDigest.h \
 	CacheDigest.cc \
 	cache_manager.cc \
 	NeighborTypeDomainList.h \
 	CachePeerDomainList.h \
 	CachePeer.h \
 	CacheManager.h \
 	carp.h \
 	carp.cc \
 	cbdata.cc \
 	cbdata.h \
 	ChunkedCodingParser.cc \
 	ChunkedCodingParser.h \
 	client_db.h \
 	client_db.cc \
-	client_side.h \
 	client_side.cc \
 	client_side.h \
 	client_side_reply.cc \
 	client_side_reply.h \
 	client_side_request.cc \
 	client_side_request.h \
 	ClientInfo.h \
 	BodyPipe.cc \
 	BodyPipe.h \
 	ClientInfo.h \
 	ClientRequestContext.h \
 	clientStream.cc \
 	clientStream.h \
 	CollapsedForwarding.cc \
 	CollapsedForwarding.h \
 	CompletionDispatcher.cc \
 	CompletionDispatcher.h \
 	CommRead.h \
 	ConfigOption.cc \
 	ConfigParser.cc \
@@ -459,40 +458,42 @@
 	Packer.cc \
 	Packer.h \
 	Parsing.cc \
 	Parsing.h \
 	$(XPROF_STATS_SOURCE) \
 	pconn.cc \
 	pconn.h \
 	PeerDigest.h \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
 	PeerPoolMgr.h \
 	PeerPoolMgr.cc \
 	PeerSelectState.h \
 	PingData.h \
+	Pipeline.cc \
+	Pipeline.h \
 	protos.h \
 	redirect.h \
 	redirect.cc \
 	refresh.h \
 	refresh.cc \
 	RemovalPolicy.cc \
 	RemovalPolicy.h \
 	send-announce.h \
 	send-announce.cc \
 	$(SBUF_SOURCE) \
 	SBufDetailedStats.h \
 	SBufDetailedStats.cc \
 	SBufStatsAction.h \
 	SBufStatsAction.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.h \
 	SquidMath.cc \
 	SquidNew.cc \
 	IoStats.h \
 	stat.h \
@@ -1403,40 +1404,42 @@
 	tests/stub_EventLoop.cc \
 	time.cc \
 	BodyPipe.cc \
 	cache_manager.cc \
 	cache_cf.h \
 	AuthReg.h \
 	YesNoNone.h \
 	YesNoNone.cc \
 	RefreshPattern.h \
 	cache_cf.cc \
 	CacheDigest.h \
 	tests/stub_CacheDigest.cc \
 	carp.h \
 	tests/stub_carp.cc \
 	cbdata.cc \
 	ChunkedCodingParser.cc \
 	client_db.h \
 	client_db.cc \
 	client_side.h \
 	client_side.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	client_side_reply.cc \
 	client_side_request.cc \
 	ClientInfo.h \
 	clientStream.cc \
 	tests/stub_CollapsedForwarding.cc \
 	ConfigOption.cc \
 	ConfigParser.cc \
 	CpuAffinityMap.cc \
 	CpuAffinityMap.h \
 	CpuAffinitySet.cc \
 	CpuAffinitySet.h \
 	$(DELAY_POOL_SOURCE) \
 	$(DISKIO_SOURCE) \
 	disk.h \
 	disk.cc \
 	dlink.h \
 	dlink.cc \
 	$(DNSSOURCE) \
 	errorpage.cc \
 	tests/stub_ETag.cc \
@@ -1818,40 +1821,42 @@
 tests_testEvent_SOURCES = \
 	AccessLogEntry.cc \
 	BodyPipe.cc \
 	CacheDigest.h \
 	tests/stub_CacheDigest.cc \
 	cache_cf.h \
 	AuthReg.h \
 	YesNoNone.h \
 	YesNoNone.cc \
 	RefreshPattern.h \
 	cache_cf.cc \
 	cache_manager.cc \
 	carp.h \
 	tests/stub_carp.cc \
 	cbdata.cc \
 	ChunkedCodingParser.cc \
 	client_db.h \
 	client_db.cc \
 	client_side.h \
 	client_side.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	client_side_reply.cc \
 	client_side_request.cc \
 	ClientInfo.h \
 	clientStream.cc \
 	tests/stub_CollapsedForwarding.cc \
 	ConfigOption.cc \
 	ConfigParser.cc \
 	CpuAffinityMap.cc \
 	CpuAffinityMap.h \
 	CpuAffinitySet.cc \
 	CpuAffinitySet.h \
 	debug.cc \
 	$(DELAY_POOL_SOURCE) \
 	$(DISKIO_SOURCE) \
 	disk.h \
 	disk.cc \
 	dlink.h \
 	dlink.cc \
 	$(DNSSOURCE) \
 	errorpage.cc \
@@ -2068,40 +2073,42 @@
 tests_testEventLoop_SOURCES = \
 	AccessLogEntry.cc \
 	BodyPipe.cc \
 	CacheDigest.h \
 	tests/stub_CacheDigest.cc \
 	cache_manager.cc \
 	cache_cf.h \
 	AuthReg.h \
 	YesNoNone.h \
 	YesNoNone.cc \
 	RefreshPattern.h \
 	cache_cf.cc \
 	carp.h \
 	tests/stub_carp.cc \
 	cbdata.cc \
 	ChunkedCodingParser.cc \
 	client_db.h \
 	client_db.cc \
 	client_side.h \
 	client_side.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	client_side_reply.cc \
 	client_side_request.cc \
 	ClientInfo.h \
 	clientStream.cc \
 	tests/stub_CollapsedForwarding.cc \
 	ConfigOption.cc \
 	ConfigParser.cc \
 	CpuAffinityMap.cc \
 	CpuAffinityMap.h \
 	CpuAffinitySet.cc \
 	CpuAffinitySet.h \
 	debug.cc \
 	$(DELAY_POOL_SOURCE) \
 	$(DISKIO_SOURCE) \
 	disk.h \
 	disk.cc \
 	dlink.h \
 	dlink.cc \
 	$(DNSSOURCE) \
 	errorpage.cc \
@@ -2317,40 +2324,42 @@
 tests_test_http_range_SOURCES = \
 	AccessLogEntry.cc \
 	BodyPipe.cc \
 	cache_cf.h \
 	AuthReg.h \
 	YesNoNone.h \
 	YesNoNone.cc \
 	RefreshPattern.h \
 	cache_cf.cc \
 	cache_manager.cc \
 	CacheDigest.h \
 	tests/stub_CacheDigest.cc \
 	carp.h \
 	tests/stub_carp.cc \
 	cbdata.cc \
 	ChunkedCodingParser.cc \
 	client_db.h \
 	client_db.cc \
 	client_side.h \
 	client_side.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	client_side_reply.cc \
 	client_side_request.cc \
 	ClientInfo.h \
 	clientStream.cc \
 	tests/stub_CollapsedForwarding.cc \
 	ConfigOption.cc \
 	ConfigParser.cc \
 	CpuAffinityMap.cc \
 	CpuAffinityMap.h \
 	CpuAffinitySet.cc \
 	CpuAffinitySet.h \
 	debug.cc \
 	$(DELAY_POOL_SOURCE) \
 	$(DISKIO_SOURCE) \
 	disk.h \
 	disk.cc \
 	dlink.h \
 	dlink.cc \
 	$(DNSSOURCE) \
 	errorpage.cc \
@@ -2630,40 +2639,42 @@
 	time.cc \
 	BodyPipe.cc \
 	cache_manager.cc \
 	cache_cf.h \
 	AuthReg.h \
 	YesNoNone.h \
 	YesNoNone.cc \
 	RefreshPattern.h \
 	cache_cf.cc \
 	debug.cc \
 	CacheDigest.h \
 	tests/stub_CacheDigest.cc \
 	carp.h \
 	tests/stub_carp.cc \
 	cbdata.cc \
 	ChunkedCodingParser.cc \
 	client_db.h \
 	client_db.cc \
 	client_side.h \
 	client_side.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	client_side_reply.cc \
 	client_side_request.cc \
 	ClientInfo.h \
 	clientStream.cc \
 	tests/stub_CollapsedForwarding.cc \
 	ConfigOption.cc \
 	ConfigParser.cc \
 	CpuAffinityMap.cc \
 	CpuAffinityMap.h \
 	CpuAffinitySet.cc \
 	CpuAffinitySet.h \
 	$(DELAY_POOL_SOURCE) \
 	disk.h \
 	disk.cc \
 	dlink.h \
 	dlink.cc \
 	$(DNSSOURCE) \
 	errorpage.cc \
 	tests/stub_ETag.cc \
 	external_acl.cc \
@@ -3445,40 +3456,42 @@
 tests_testURL_SOURCES = \
 	AccessLogEntry.cc \
 	BodyPipe.cc \
 	cache_cf.h \
 	AuthReg.h \
 	YesNoNone.h \
 	YesNoNone.cc \
 	RefreshPattern.h \
 	cache_cf.cc \
 	tests/stub_cache_manager.cc \
 	CacheDigest.h \
 	tests/stub_CacheDigest.cc \
 	carp.h \
 	tests/stub_carp.cc \
 	cbdata.cc \
 	ChunkedCodingParser.cc \
 	client_db.h \
 	client_db.cc \
 	client_side.h \
 	client_side.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	client_side_reply.cc \
 	client_side_request.cc \
 	ClientInfo.h \
 	clientStream.cc \
 	tests/stub_CollapsedForwarding.cc \
 	ConfigOption.cc \
 	ConfigParser.cc \
 	CpuAffinityMap.cc \
 	CpuAffinityMap.h \
 	CpuAffinitySet.cc \
 	CpuAffinitySet.h \
 	$(DELAY_POOL_SOURCE) \
 	disk.h \
 	disk.cc \
 	DiskIO/ReadRequest.cc \
 	DiskIO/WriteRequest.cc \
 	dlink.h \
 	dlink.cc \
 	$(DNSSOURCE) \
 	errorpage.cc \

=== added file 'src/Pipeline.cc'
--- src/Pipeline.cc	1970-01-01 00:00:00 +0000
+++ src/Pipeline.cc	2014-05-11 11:47:19 +0000
@@ -0,0 +1,49 @@
+/*
+ * DEBUG: section 33    Client Request Pipeline
+ */
+#include "squid.h"
+#include "client_side.h"
+#include "Debug.h"
+#include "Pipeline.h"
+
+void
+Pipeline::add(const ClientSocketContextPointer &c)
+{
+    requests.push(c);
+    ++nrequests;
+    debugs(33, 3, "Pipeline " << (void*)this << " add request " << nrequests << ' ' << c);
+}
+
+ClientSocketContextPointer
+Pipeline::front() const
+{
+    if (requests.empty()) {
+        debugs(33, 3, "Pipeline " << (void*)this << " empty");
+        return ClientSocketContextPointer();
+    }
+
+    debugs(33, 3, "Pipeline " << (void*)this << " front " << requests.front());
+    return requests.front();
+}
+
+void
+Pipeline::terminateAll(int xerrno)
+{
+    while (!requests.empty()) {
+        ClientSocketContextPointer context = requests.front();
+        debugs(33, 3, "Pipeline " << (void*)this << " notify(" << xerrno << ") " << context);
+        context->noteIoError(xerrno);
+        context->connIsFinished();  // cleanup the context state and self-deregister
+        assert(context != requests.front());
+    }
+}
+
+void
+Pipeline::pop()
+{
+    if (requests.empty())
+        return;
+
+    debugs(33, 3, "Pipeline " << (void*)this << " drop " << requests.front());
+    requests.pop();
+}

=== added file 'src/Pipeline.h'
--- src/Pipeline.h	1970-01-01 00:00:00 +0000
+++ src/Pipeline.h	2014-05-11 11:47:26 +0000
@@ -0,0 +1,46 @@
+#ifndef SQUID_SRC_PIPELINE_H
+#define SQUID_SRC_PIPELINE_H
+
+#include "base/RefCount.h"
+
+#include <queue>
+
+class ClientSocketContext;
+typedef RefCount<ClientSocketContext> ClientSocketContextPointer;
+
+/// A pipeline queue of HTTP/1 transactions.
+class Pipeline {
+public:
+    Pipeline() : nrequests(0) {}
+    ~Pipeline() {terminateAll(0);}
+    Pipeline& operator =(const Pipeline&); // do not implement
+
+    /// register a new transaction context to the pipeline
+    void add(const ClientSocketContextPointer &);
+
+    /// get the first transaction context in the pipeline
+    ClientSocketContextPointer front() const;
+
+    /// how many transactions are currently pipelined
+    size_t count() const {return requests.size();}
+
+    /// whether there are none or any transactions currently pipelined
+    bool empty() const {return requests.empty();}
+
+    /// Tell everybody about the err, and abort all waiting transactions.
+    void terminateAll(const int xerrno);
+
+    /// deregister the front transaction from the pipeline
+    void pop();
+
+    /// Number of transactions seen in this pipeline.
+    /// Includes non-completed transactions.
+    uint32_t nrequests;
+
+private:
+    /// Transactions parsed from the connection but not completed processing.
+    /// For HTTP/1 these are handled as a FIFO queue of request messages.
+    std::queue<ClientSocketContextPointer> requests;
+};
+
+#endif /* SQUID_SRC_PIPELINE_H */

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2014-04-27 07:59:17 +0000
+++ src/client_side.cc	2014-05-11 11:00:34 +0000
@@ -239,121 +239,93 @@
 /**
  * This routine should be called to grow the inbuf and then
  * call comm_read().
  */
 void
 ConnStateData::readSomeData()
 {
     if (reading())
         return;
 
     debugs(33, 4, HERE << clientConnection << ": reading request...");
 
     if (!in.maybeMakeSpaceAvailable())
         return;
 
     typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
     reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest);
     comm_read(clientConnection, in.buf, reader);
 }
 
-void
-ClientSocketContext::removeFromConnectionList(ConnStateData * conn)
-{
-    ClientSocketContext::Pointer *tempContextPointer;
-    assert(conn != NULL && cbdataReferenceValid(conn));
-    assert(conn->getCurrentContext() != NULL);
-    /* Unlink us from the connection request list */
-    tempContextPointer = & conn->currentobject;
-
-    while (tempContextPointer->getRaw()) {
-        if (*tempContextPointer == this)
-            break;
-
-        tempContextPointer = &(*tempContextPointer)->next;
-    }
-
-    assert(tempContextPointer->getRaw() != NULL);
-    *tempContextPointer = next;
-    next = NULL;
-}
-
 ClientSocketContext::~ClientSocketContext()
 {
     clientStreamNode *node = getTail();
 
     if (node) {
         ClientSocketContext *streamContext = dynamic_cast<ClientSocketContext *> (node->data.getRaw());
 
         if (streamContext) {
             /* We are *always* the tail - prevent recursive free */
             assert(this == streamContext);
             node->data = NULL;
         }
     }
 
-    if (connRegistered_)
-        deRegisterWithConn();
-
     httpRequestFree(http);
 
-    /* clean up connection links to us */
-    assert(this != next.getRaw());
+    debugs(33, 3, "destruct, this=" << (void*)this);
 }
 
 void
 ClientSocketContext::registerWithConn()
 {
     assert (!connRegistered_);
     assert (http);
     assert (http->getConn() != NULL);
     connRegistered_ = true;
-    http->getConn()->addContextToQueue(this);
-}
-
-void
-ClientSocketContext::deRegisterWithConn()
-{
-    assert (connRegistered_);
-    removeFromConnectionList(http->getConn());
-    connRegistered_ = false;
+    http->getConn()->pipeline.add(ClientSocketContext::Pointer(this));
 }
 
 void
 ClientSocketContext::connIsFinished()
 {
     assert (http);
     assert (http->getConn() != NULL);
-    deRegisterWithConn();
+    ConnStateData *conn = http->getConn();
+    connRegistered_ = false;
     /* we can't handle any more stream data - detach */
     clientStreamDetach(getTail(), http);
+
+    assert(conn->pipeline.front() == this);
+    conn->pipeline.pop();
+    conn->kick(); // kick anything which was waiting for us to finish
 }
 
 ClientSocketContext::ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
         clientConnection(aConn),
         http(aReq),
         reply(NULL),
-        next(NULL),
         writtenToSocket(0),
         mayUseConnection_ (false),
         connRegistered_ (false)
 {
+    debugs(33, 3, "constructed, this=" << (void*)this);
     assert(http != NULL);
     memset (reqbuf, '\0', sizeof (reqbuf));
     flags.deferred = 0;
     flags.parsed_ok = 0;
     deferredparams.node = NULL;
     deferredparams.rep = NULL;
 }
 
 void
 ClientSocketContext::writeControlMsg(HttpControlMsg &msg)
 {
     const HttpReply::Pointer rep(msg.reply);
     Must(rep != NULL);
 
     // apply selected clientReplyContext::buildReplyHeader() mods
     // it is not clear what headers are required for control messages
     rep->header.removeHopByHopEntries();
     rep->header.putStr(HDR_CONNECTION, "keep-alive");
     httpHdrMangleList(&rep->header, http->request, ROR_REPLY);
 
@@ -692,78 +664,40 @@
 ClientHttpRequest::freeResources()
 {
     safe_free(uri);
     safe_free(log_uri);
     safe_free(redirect.location);
     range_iter.boundary.clean();
     HTTPMSGUNLOCK(request);
 
     if (client_stream.tail)
         clientStreamAbort((clientStreamNode *)client_stream.tail->data, this);
 }
 
 void
 httpRequestFree(void *data)
 {
     ClientHttpRequest *http = (ClientHttpRequest *)data;
     assert(http != NULL);
     delete http;
 }
 
-bool
-ConnStateData::areAllContextsForThisConnection() const
-{
-    assert(this != NULL);
-    ClientSocketContext::Pointer context = getCurrentContext();
-
-    while (context.getRaw()) {
-        if (context->http->getConn() != this)
-            return false;
-
-        context = context->next;
-    }
-
-    return true;
-}
-
-void
-ConnStateData::freeAllContexts()
-{
-    ClientSocketContext::Pointer context;
-
-    while ((context = getCurrentContext()).getRaw() != NULL) {
-        assert(getCurrentContext() !=
-               getCurrentContext()->next);
-        context->connIsFinished();
-        assert (context != currentobject);
-    }
-}
-
-/// propagates abort event to all contexts
-void
-ConnStateData::notifyAllContexts(int xerrno)
-{
-    typedef ClientSocketContext::Pointer CSCP;
-    for (CSCP c = getCurrentContext(); c.getRaw(); c = c->next)
-        c->noteIoError(xerrno);
-}
-
 /* This is a handler normally called by comm_close() */
 void ConnStateData::connStateClosed(const CommCloseCbParams &io)
 {
     deleteThis("ConnStateData::connStateClosed");
 }
 
 #if USE_AUTH
 void
 ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *by)
 {
     if (auth_ == NULL) {
         if (aur != NULL) {
             debugs(33, 2, "Adding connection-auth to " << clientConnection << " from " << by);
             auth_ = aur;
         }
         return;
     }
 
     // clobered with self-pointer
     // NP: something nasty is going on in Squid, but harmless.
@@ -824,42 +758,41 @@
         debugs(33, 2, "ERROR: Closing " << clientConnection << " due to change of connection-auth from " << by);
         auth_->releaseAuthServer();
         auth_ = NULL;
         // this is a fatal type of problem.
         // Close the connection immediately with TCP RST to abort all traffic flow
         comm_reset_close(clientConnection);
         return;
     }
 
     /* NOT REACHABLE */
 }
 #endif
 
 // cleans up before destructor is called
 void
 ConnStateData::swanSong()
 {
     debugs(33, 2, HERE << clientConnection);
     flags.readMore = false;
     clientdbEstablished(clientConnection->remote, -1);	/* decrement */
-    assert(areAllContextsForThisConnection());
-    freeAllContexts();
+    pipeline.terminateAll(0);
 
     unpinConnection();
 
     if (Comm::IsConnOpen(clientConnection))
         clientConnection->close();
 
 #if USE_AUTH
     // NP: do this bit after closing the connections to avoid side effects from unwanted TCP RST
     setAuth(NULL, "ConnStateData::SwanSong cleanup");
 #endif
 
     BodyProducer::swanSong();
     flags.swanSang = true;
 }
 
 bool
 ConnStateData::isOpen() const
 {
     return cbdataReferenceValid(this) && // XXX: checking "this" in a method
            Comm::IsConnOpen(clientConnection) &&
@@ -929,48 +862,40 @@
 {
     if (Config.maxRequestBodySize &&
             bodyLength > Config.maxRequestBodySize)
         return 1;		/* too large */
 
     return 0;
 }
 
 #ifndef PURIFY
 bool
 connIsUsable(ConnStateData * conn)
 {
     if (conn == NULL || !cbdataReferenceValid(conn) || !Comm::IsConnOpen(conn->clientConnection))
         return false;
 
     return true;
 }
 
 #endif
 
-// careful: the "current" context may be gone if we wrote an early response
-ClientSocketContext::Pointer
-ConnStateData::getCurrentContext() const
-{
-    assert(this);
-    return currentobject;
-}
-
 void
 ClientSocketContext::deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData)
 {
     debugs(33, 2, "clientSocketRecipient: Deferring request " << http->uri);
     assert(flags.deferred == 0);
     flags.deferred = 1;
     deferredparams.node = node;
     deferredparams.rep = rep;
     deferredparams.queuedBuffer = receivedData;
     return;
 }
 
 int
 responseFinishedOrFailed(HttpReply * rep, StoreIOBuffer const & receivedData)
 {
     if (rep == NULL && receivedData.data == NULL && receivedData.length == 0)
         return 1;
 
     return 0;
 }
@@ -1474,41 +1399,41 @@
 static void
 clientSocketRecipient(clientStreamNode * node, ClientHttpRequest * http,
                       HttpReply * rep, StoreIOBuffer receivedData)
 {
     /* Test preconditions */
     assert(node != NULL);
     PROF_start(clientSocketRecipient);
     /* TODO: handle this rather than asserting
      * - it should only ever happen if we cause an abort and
      * the callback chain loops back to here, so we can simply return.
      * However, that itself shouldn't happen, so it stays as an assert for now.
      */
     assert(cbdataReferenceValid(node));
     assert(node->node.next == NULL);
     ClientSocketContext::Pointer context = dynamic_cast<ClientSocketContext *>(node->data.getRaw());
     assert(context != NULL);
     assert(connIsUsable(http->getConn()));
 
     /* TODO: check offset is what we asked for */
 
-    if (context != http->getConn()->getCurrentContext()) {
+    if (context != http->getConn()->pipeline.front()) {
         context->deferRecipientForLater(node, rep, receivedData);
         PROF_stop(clientSocketRecipient);
         return;
     }
 
     // After sending Transfer-Encoding: chunked (at least), always send
     // the last-chunk if there was no error, ignoring responseFinishedOrFailed.
     const bool mustSendLastChunk = http->request->flags.chunkedReply &&
                                    !http->request->flags.streamError && !context->startOfOutput();
     if (responseFinishedOrFailed(rep, receivedData) && !mustSendLastChunk) {
         context->writeComplete(context->clientConnection, NULL, 0, COMM_OK);
         PROF_stop(clientSocketRecipient);
         return;
     }
 
     if (!context->startOfOutput())
         context->sendBody(rep, receivedData);
     else {
         assert(rep);
         http->al->reply = rep;
@@ -1582,114 +1507,119 @@
         assert(deferredRequest->http->out.size == 0);
         /** defer now. */
         clientSocketRecipient(deferredRequest->deferredparams.node,
                               deferredRequest->http,
                               deferredRequest->deferredparams.rep,
                               deferredRequest->deferredparams.queuedBuffer);
     }
 
     /** otherwise, the request is still active in a callbacksomewhere,
      * and we are done
      */
 }
 
 /// called when we have successfully finished writing the response
 void
 ClientSocketContext::keepaliveNextRequest()
 {
     ConnStateData * conn = http->getConn();
 
     debugs(33, 3, HERE << "ConnnStateData(" << conn->clientConnection << "), Context(" << clientConnection << ")");
+
+    // mark ourselves as completed
     connIsFinished();
+}
 
-    if (conn->pinning.pinned && !Comm::IsConnOpen(conn->pinning.serverConnection)) {
-        debugs(33, 2, HERE << conn->clientConnection << " Connection was pinned but server side gone. Terminating client connection");
-        conn->clientConnection->close();
+void
+ConnStateData::kick()
+{
+    if (pinning.pinned && !Comm::IsConnOpen(pinning.serverConnection)) {
+        debugs(33, 2, clientConnection << " Connection was pinned but server side gone. Terminating client connection");
+        clientConnection->close();
         return;
     }
 
     /** \par
      * We are done with the response, and we are either still receiving request
      * body (early response!) or have already stopped receiving anything.
      *
      * If we are still receiving, then clientParseRequest() below will fail.
      * (XXX: but then we will call readNextRequest() which may succeed and
      * execute a smuggled request as we are not done with the current request).
      *
      * If we stopped because we got everything, then try the next request.
      *
      * If we stopped receiving because of an error, then close now to avoid
      * getting stuck and to prevent accidental request smuggling.
      */
 
-    if (const char *reason = conn->stoppedReceiving()) {
-        debugs(33, 3, HERE << "closing for earlier request error: " << reason);
-        conn->clientConnection->close();
+    if (const char *reason = stoppedReceiving()) {
+        debugs(33, 3, "closing for earlier request error: " << reason);
+        clientConnection->close();
         return;
     }
 
     /** \par
      * Attempt to parse a request from the request buffer.
      * If we've been fed a pipelined request it may already
      * be in our read buffer.
      *
      \par
      * This needs to fall through - if we're unlucky and parse the _last_ request
      * from our read buffer we may never re-register for another client read.
      */
 
-    if (conn->clientParseRequests()) {
-        debugs(33, 3, HERE << conn->clientConnection << ": parsed next request from buffer");
+    if (clientParseRequests()) {
+        debugs(33, 3, clientConnection << ": parsed next request from buffer");
     }
 
     /** \par
      * Either we need to kick-start another read or, if we have
      * a half-closed connection, kill it after the last request.
      * This saves waiting for half-closed connections to finished being
      * half-closed _AND_ then, sometimes, spending "Timeout" time in
      * the keepalive "Waiting for next request" state.
      */
-    if (commIsHalfClosed(conn->clientConnection->fd) && (conn->getConcurrentRequestCount() == 0)) {
-        debugs(33, 3, "ClientSocketContext::keepaliveNextRequest: half-closed client with no pending requests, closing");
-        conn->clientConnection->close();
+    if (commIsHalfClosed(clientConnection->fd) && pipeline.empty()) {
+        debugs(33, 3, "half-closed client with no pending requests, closing");
+        clientConnection->close();
         return;
     }
 
-    ClientSocketContext::Pointer deferredRequest;
-
     /** \par
      * At this point we either have a parsed request (which we've
      * kicked off the processing for) or not. If we have a deferred
      * request (parsed but deferred for pipeling processing reasons)
      * then look at processing it. If not, simply kickstart
      * another read.
      */
 
-    if ((deferredRequest = conn->getCurrentContext()).getRaw()) {
-        debugs(33, 3, HERE << conn->clientConnection << ": calling PushDeferredIfNeeded");
-        ClientSocketContextPushDeferredIfNeeded(deferredRequest, conn);
-    } else if (conn->flags.readMore) {
-        debugs(33, 3, HERE << conn->clientConnection << ": calling conn->readNextRequest()");
-        conn->readNextRequest();
+    ClientSocketContext::Pointer deferredRequest = pipeline.front();
+    if (deferredRequest != NULL) {
+        debugs(33, 3, clientConnection << ": calling PushDeferredIfNeeded");
+        ClientSocketContextPushDeferredIfNeeded(deferredRequest, this);
+    } else if (flags.readMore) {
+        debugs(33, 3, clientConnection << ": calling conn->readNextRequest()");
+        readNextRequest();
     } else {
         // XXX: Can this happen? CONNECT tunnels have deferredRequest set.
-        debugs(33, DBG_IMPORTANT, HERE << "abandoning " << conn->clientConnection);
+        debugs(33, DBG_IMPORTANT, HERE << "abandoning " << clientConnection);
     }
 }
 
 void
 clientUpdateSocketStats(LogTags logType, size_t size)
 {
     if (size == 0)
         return;
 
     kb_incr(&statCounter.client_http.kbytes_out, size);
 
     if (logTypeIsATcpHit(logType))
         kb_incr(&statCounter.client_http.hit_kbytes_out, size);
 }
 
 /**
  * increments iterator "i"
  * used by clientPackMoreRanges
  *
  \retval true    there is still data available to pack more ranges
@@ -2376,95 +2306,72 @@
     xfree(url);
     return result;
 }
 
 bool
 ConnStateData::In::maybeMakeSpaceAvailable()
 {
     if (buf.spaceSize() < 2) {
         const SBuf::size_type haveCapacity = buf.length() + buf.spaceSize();
         if (haveCapacity >= Config.maxRequestBufferSize) {
             debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize);
             return false;
         }
         const SBuf::size_type wantCapacity = min(static_cast<SBuf::size_type>(Config.maxRequestBufferSize), haveCapacity*2);
         buf.reserveCapacity(wantCapacity);
         debugs(33, 2, "growing request buffer: available=" << buf.spaceSize() << " used=" << buf.length());
     }
     return (buf.spaceSize() >= 2);
 }
 
-void
-ConnStateData::addContextToQueue(ClientSocketContext * context)
-{
-    ClientSocketContext::Pointer *S;
-
-    for (S = (ClientSocketContext::Pointer *) & currentobject; S->getRaw();
-            S = &(*S)->next);
-    *S = context;
-
-    ++nrequests;
-}
-
-int
-ConnStateData::getConcurrentRequestCount() const
-{
-    int result = 0;
-    ClientSocketContext::Pointer *T;
-
-    for (T = (ClientSocketContext::Pointer *) &currentobject;
-            T->getRaw(); T = &(*T)->next, ++result);
-    return result;
-}
-
 int
 ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno)
 {
     if (flag != COMM_OK) {
         debugs(33, 2, "connReadWasError: FD " << clientConnection << ": got flag " << flag);
         return 1;
     }
 
     if (size < 0) {
         if (!ignoreErrno(xerrno)) {
             debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno));
             return 1;
         } else if (in.buf.isEmpty()) {
             debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")");
         }
     }
 
     return 0;
 }
 
 int
 ConnStateData::connFinishedWithConn(int size)
 {
     if (size == 0) {
-        if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) {
+        if (pipeline.empty() && in.buf.isEmpty()) {
             /* no current or pending requests */
             debugs(33, 4, HERE << clientConnection << " closed");
             return 1;
         } else if (!Config.onoff.half_closed_clients) {
             /* admin doesn't want to support half-closed client sockets */
             debugs(33, 3, HERE << clientConnection << " aborted (half_closed_clients disabled)");
-            notifyAllContexts(0); // no specific error implies abort
+            pipeline.terminateAll(0);
             return 1;
         }
     }
 
     return 0;
 }
 
 void
 connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount)
 {
     assert(byteCount > 0 && byteCount <= conn->in.buf.length());
     conn->in.buf.consume(byteCount);
     debugs(33, 5, "conn->in.buf has " << conn->in.buf.length() << " bytes unused.");
 }
 
 /// respond with ERR_TOO_BIG if request header exceeds request_header_max_size
 void
 ConnStateData::checkHeaderLimits()
 {
     if (in.buf.length() < Config.maxRequestHeaderSize)
@@ -2883,41 +2790,41 @@
     }
 }
 
 static void
 connStripBufferWhitespace (ConnStateData * conn)
 {
     // XXX: kill this whole function.
     while (!conn->in.buf.isEmpty() && xisspace(conn->in.buf.at(0))) {
         conn->in.buf.consume(1);
     }
 }
 
 /**
  * Limit the number of concurrent requests.
  * \return true  when there are available position(s) in the pipeline queue for another request.
  * \return false when the pipeline queue is full or disabled.
  */
 bool
 ConnStateData::concurrentRequestQueueFilled() const
 {
-    const int existingRequestCount = getConcurrentRequestCount();
+    const int existingRequestCount = pipeline.count();
 
     // default to the configured pipeline size.
     // add 1 because the head of pipeline is counted in concurrent requests and not prefetch queue
     const int concurrentRequestLimit = Config.pipeline_max_prefetch + 1;
 
     // when queue filled already we cant add more.
     if (existingRequestCount >= concurrentRequestLimit) {
         debugs(33, 3, clientConnection << " max concurrent requests reached (" << concurrentRequestLimit << ")");
         debugs(33, 5, clientConnection << " deferring new request until one is done");
         return true;
     }
 
     return false;
 }
 
 /**
  * Attempt to parse one or more requests from the input buffer.
  * If a request is successfully parsed, even if the next request
  * is only partially parsed, it will return TRUE.
  */
@@ -2989,92 +2896,92 @@
     reader = NULL;
 
     /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
 
     if (io.flag == COMM_ERR_CLOSING) {
         debugs(33,5, HERE << io.conn << " closing Bailout.");
         return;
     }
 
     assert(Comm::IsConnOpen(clientConnection));
     assert(io.conn->fd == clientConnection->fd);
 
     /*
      * Don't reset the timeout value here.  The timeout value will be
      * set to Config.Timeout.request by httpAccept() and
      * clientWriteComplete(), and should apply to the request as a
      * whole, not individual read() calls.  Plus, it breaks our
      * lame half-close detection
      */
     if (connReadWasError(io.flag, io.size, io.xerrno)) {
-        notifyAllContexts(io.xerrno);
+        pipeline.terminateAll(io.xerrno);
         io.conn->close();
         return;
     }
 
     if (io.flag == COMM_OK) {
         if (io.size > 0) {
             kb_incr(&(statCounter.client_http.kbytes_in), io.size);
 
             // may comm_close or setReplyToError
             if (!handleReadData(io.buf2))
                 return;
 
         } else if (io.size == 0) {
             debugs(33, 5, HERE << io.conn << " closed?");
 
             if (connFinishedWithConn(io.size)) {
                 clientConnection->close();
                 return;
             }
 
             /* It might be half-closed, we can't tell */
             fd_table[io.conn->fd].flags.socket_eof = true;
 
             commMarkHalfClosed(io.conn->fd);
 
             fd_note(io.conn->fd, "half-closed");
 
             /* There is one more close check at the end, to detect aborted
              * (partial) requests. At this point we can't tell if the request
              * is partial.
              */
             /* Continue to process previously read data */
         }
     }
 
     /* Process next request */
-    if (getConcurrentRequestCount() == 0)
+    if (pipeline.empty())
         fd_note(io.fd, "Reading next request");
 
     if (!clientParseRequests()) {
         if (!isOpen())
             return;
         /*
          * If the client here is half closed and we failed
          * to parse a request, close the connection.
          * The above check with connFinishedWithConn() only
          * succeeds _if_ the buffer is empty which it won't
          * be if we have an incomplete request.
-         * XXX: This duplicates ClientSocketContext::keepaliveNextRequest
+         * XXX: This duplicates ConnStateData::kick
          */
-        if (getConcurrentRequestCount() == 0 && commIsHalfClosed(io.fd)) {
+        if (pipeline.empty() && commIsHalfClosed(io.fd)) {
             debugs(33, 5, HERE << io.conn << ": half-closed connection, no completed request parsed, connection closing.");
             clientConnection->close();
             return;
         }
     }
 
     if (!isOpen())
         return;
 
     clientAfterReadingRequests();
 }
 
 /**
  * called when new request data has been read from the socket
  *
  * \retval false called comm_close or setReplyToError (the caller should bail)
  * \retval true  we did not call comm_close or setReplyToError
  */
 bool
 ConnStateData::handleReadData(SBuf *buf)
@@ -3173,41 +3080,41 @@
         Must(!in.bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent());
     } catch (...) { // TODO: be more specific
         debugs(33, 3, HERE << "malformed chunks" << bodyPipe->status());
         return ERR_INVALID_REQ;
     }
 
     debugs(33, 7, HERE << "need more chunked data" << *bodyPipe->status());
     return ERR_NONE;
 }
 
 /// quit on errors related to chunked request body handling
 void
 ConnStateData::abortChunkedRequestBody(const err_type error)
 {
     finishDechunkingRequest(false);
 
     // XXX: The code below works if we fail during initial request parsing,
     // but if we fail when the server-side works already, the server may send
     // us its response too, causing various assertions. How to prevent that?
 #if WE_KNOW_HOW_TO_SEND_ERRORS
-    ClientSocketContext::Pointer context = getCurrentContext();
+    ClientSocketContext::Pointer context = pipeline.get();
     if (context != NULL && !context->http->out.offset) { // output nothing yet
         clientStreamNode *node = context->getClientReplyContext();
         clientReplyContext *repContext = dynamic_cast<clientReplyContext*>(node->data.getRaw());
         assert(repContext);
         const Http::StatusCode scode = (error == ERR_TOO_BIG) ?
                                        Http::scPayloadTooLarge : HTTP_BAD_REQUEST;
         repContext->setReplyToError(error, scode,
                                     repContext->http->request->method,
                                     repContext->http->uri,
                                     CachePeer,
                                     repContext->http->request,
                                     in.buf, NULL);
         context->pullData();
     } else {
         // close or otherwise we may get stuck as nobody will notice the error?
         comm_reset_close(clientConnection);
     }
 #else
     debugs(33, 3, HERE << "aborting chunked request without error " << error);
     comm_reset_close(clientConnection);
@@ -3807,43 +3714,44 @@
 
     assert(certProperties.signAlgorithm != Ssl::algSignEnd);
 
     if (certProperties.signAlgorithm == Ssl::algSignUntrusted) {
         assert(port->untrustedSigningCert.get());
         certProperties.signWithX509.resetAndLock(port->untrustedSigningCert.get());
         certProperties.signWithPkey.resetAndLock(port->untrustedSignPkey.get());
     } else {
         assert(port->signingCert.get());
         certProperties.signWithX509.resetAndLock(port->signingCert.get());
 
         if (port->signPkey.get())
             certProperties.signWithPkey.resetAndLock(port->signPkey.get());
     }
     signAlgorithm = certProperties.signAlgorithm;
 }
 
 void
 ConnStateData::getSslContextStart()
 {
-    assert(areAllContextsForThisConnection());
-    freeAllContexts();
-    /* careful: freeAllContexts() above frees request, host, etc. */
+    // XXX starting SSL with a pipeline of requests still waiting for non-SSL replies?
+    assert(pipeline.size() < 2); // the CONNECT being there us okay. Anything else is a bug.
+    pipeline.terminateAll(0);
+    /* careful: pipeline.terminateAll(0) above frees request, host, etc. */
 
     if (port->generateHostCertificates) {
         Ssl::CertificateProperties certProperties;
         buildSslCertGenerationParams(certProperties);
         sslBumpCertKey = certProperties.dbKey().c_str();
         assert(sslBumpCertKey.size() > 0 && sslBumpCertKey[0] != '\0');
 
         debugs(33, 5, HERE << "Finding SSL certificate for " << sslBumpCertKey << " in cache");
         Ssl::LocalContextStorage *ssl_ctx_cache = Ssl::TheGlobalContextStorage.getLocalStorage(port->s);
         SSL_CTX * dynCtx = NULL;
         Ssl::SSL_CTX_Pointer *cachedCtx = ssl_ctx_cache ? ssl_ctx_cache->get(sslBumpCertKey.termedBuf()) : NULL;
         if (cachedCtx && (dynCtx = cachedCtx->get())) {
             debugs(33, 5, HERE << "SSL certificate for " << sslBumpCertKey << " have found in cache");
             if (Ssl::verifySslCertificate(dynCtx, certProperties)) {
                 debugs(33, 5, HERE << "Cached SSL certificate for " << sslBumpCertKey << " is valid");
                 getSslContextDone(dynCtx);
                 return;
             } else {
                 debugs(33, 5, HERE << "Cached SSL certificate for " << sslBumpCertKey << " is out of date. Delete this certificate from cache");
                 if (ssl_ctx_cache)
@@ -4368,75 +4276,75 @@
 {
     Must(bodyPipe != NULL);
     debugs(33, 5, HERE << "start dechunking" << bodyPipe->status());
     assert(!in.bodyParser);
     in.bodyParser = new ChunkedCodingParser;
 }
 
 /// put parsed content into input buffer and clean up
 void
 ConnStateData::finishDechunkingRequest(bool withSuccess)
 {
     debugs(33, 5, HERE << "finish dechunking: " << withSuccess);
 
     if (bodyPipe != NULL) {
         debugs(33, 7, HERE << "dechunked tail: " << bodyPipe->status());
         BodyPipe::Pointer myPipe = bodyPipe;
         stopProducingFor(bodyPipe, withSuccess); // sets bodyPipe->bodySize()
         Must(!bodyPipe); // we rely on it being nil after we are done with body
         if (withSuccess) {
             Must(myPipe->bodySizeKnown());
-            ClientSocketContext::Pointer context = getCurrentContext();
+            ClientSocketContext::Pointer context = pipeline.front();
             if (context != NULL && context->http && context->http->request)
                 context->http->request->setContentLength(myPipe->bodySize());
         }
     }
 
     delete in.bodyParser;
     in.bodyParser = NULL;
 }
 
 ConnStateData::In::In() :
         bodyParser(NULL),
         buf()
 {}
 
 ConnStateData::In::~In()
 {
     delete bodyParser; // TODO: pool
 }
 
 void
 ConnStateData::sendControlMsg(HttpControlMsg msg)
 {
     if (!isOpen()) {
         debugs(33, 3, HERE << "ignoring 1xx due to earlier closure");
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
-    if (context != NULL) {
-        context->writeControlMsg(msg); // will call msg.cbSuccess
+    if (!pipeline.empty()) {
+        pipeline.front()->writeControlMsg(msg); // will call msg.cbSuccess
         return;
     }
 
     debugs(33, 3, HERE << " closing due to missing context for 1xx");
+    pipeline.terminateAll(0);
     clientConnection->close();
 }
 
 /// Our close handler called by Comm when the pinned connection is closed
 void
 ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io)
 {
     // FwdState might repin a failed connection sooner than this close
     // callback is called for the failed connection.
     assert(pinning.serverConnection == io.conn);
     pinning.closeHandler = NULL; // Comm unregisters handlers before calling
     const bool sawZeroReply = pinning.zeroReply; // reset when unpinning
     unpinConnection();
     if (sawZeroReply && clientConnection != NULL) {
         debugs(33, 3, "Closing client connection on pinned zero reply.");
         clientConnection->close();
     }
 }
 
 void
@@ -4505,42 +4413,41 @@
 
 void
 ConnStateData::stopPinnedConnectionMonitoring()
 {
     if (pinning.readHandler != NULL) {
         comm_read_cancel(pinning.serverConnection->fd, pinning.readHandler);
         pinning.readHandler = NULL;
     }
 }
 
 /// Our read handler called by Comm when the server either closes an idle pinned connection or
 /// perhaps unexpectedly sends something on that idle (from Squid p.o.v.) connection.
 void
 ConnStateData::clientPinnedConnectionRead(const CommIoCbParams &io)
 {
     pinning.readHandler = NULL; // Comm unregisters handlers before calling
 
     if (io.flag == COMM_ERR_CLOSING)
         return; // close handler will clean up
 
-    // We could use getConcurrentRequestCount(), but this may be faster.
-    const bool clientIsIdle = !getCurrentContext();
+    const bool clientIsIdle = pipeline.empty();
 
     debugs(33, 3, "idle pinned " << pinning.serverConnection << " read " <<
            io.size << (clientIsIdle ? " with idle client" : ""));
 
     assert(pinning.serverConnection == io.conn);
     pinning.serverConnection->close();
 
     // If we are still sending data to the client, do not close now. When we are done sending,
     // ClientSocketContext::keepaliveNextRequest() checks pinning.serverConnection and will close.
     // However, if we are idle, then we must close to inform the idle client and minimize races.
     if (clientIsIdle && clientConnection != NULL)
         clientConnection->close();
 }
 
 const Comm::ConnectionPointer
 ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *aPeer)
 {
     debugs(33, 7, HERE << pinning.serverConnection);
 
     bool valid = true;

=== modified file 'src/client_side.h'
--- src/client_side.h	2014-03-30 12:00:34 +0000
+++ src/client_side.h	2014-05-11 11:00:50 +0000
@@ -19,40 +19,41 @@
  *  the Free Software Foundation; either version 2 of the License, or
  *  (at your option) any later version.
  *
  *  This program is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  *
  */
 
 #ifndef SQUID_CLIENTSIDE_H
 #define SQUID_CLIENTSIDE_H
 
 #include "comm.h"
 #include "HttpControlMsg.h"
 #include "HttpParser.h"
+#include "Pipeline.h"
 #include "SBuf.h"
 #if USE_AUTH
 #include "auth/UserRequest.h"
 #endif
 #if USE_OPENSSL
 #include "ssl/support.h"
 #endif
 
 class ConnStateData;
 class ClientHttpRequest;
 class clientStreamNode;
 class ChunkedCodingParser;
 class HelperReply;
 namespace AnyP
 {
 class PortCfg;
 } // namespace Anyp
 
 /**
  * Badly named.
@@ -77,148 +78,142 @@
  * buffer.
  *
  * The individual processing actions are done by other Jobs which we
  * kick off as needed.
  */
 class ClientSocketContext : public RefCountable
 {
 
 public:
     typedef RefCount<ClientSocketContext> Pointer;
     ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq);
     ~ClientSocketContext();
     bool startOfOutput() const;
     void writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag);
     void keepaliveNextRequest();
 
     Comm::ConnectionPointer clientConnection; /// details about the client connection socket.
     ClientHttpRequest *http;	/* we own this */
     HttpReply *reply;
     char reqbuf[HTTP_REQBUF_SZ];
-    Pointer next;
 
     struct {
 
         unsigned deferred:1; /* This is a pipelined request waiting for the current object to complete */
 
         unsigned parsed_ok:1; /* Was this parsed correctly? */
     } flags;
     bool mayUseConnection() const {return mayUseConnection_;}
 
     void mayUseConnection(bool aBool) {
         mayUseConnection_ = aBool;
         debugs(33,3, HERE << "This " << this << " marked " << aBool);
     }
 
     class DeferredParams
     {
 
     public:
         clientStreamNode *node;
         HttpReply *rep;
         StoreIOBuffer queuedBuffer;
     };
 
     DeferredParams deferredparams;
     int64_t writtenToSocket;
     void pullData();
     int64_t getNextRangeOffset() const;
     bool canPackMoreRanges() const;
     clientStream_status_t socketState();
     void sendBody(HttpReply * rep, StoreIOBuffer bodyData);
     void sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData);
     size_t lengthToSend(Range<int64_t> const &available);
     void noteSentBodyBytes(size_t);
     void buildRangeHeader(HttpReply * rep);
     clientStreamNode * getTail() const;
     clientStreamNode * getClientReplyContext() const;
     void connIsFinished();
-    void removeFromConnectionList(ConnStateData * conn);
     void deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData);
     bool multipartRangeRequest() const;
     void registerWithConn();
     void noteIoError(const int xerrno); ///< update state to reflect I/O error
 
     /// starts writing 1xx control message to the client
     void writeControlMsg(HttpControlMsg &msg);
 
 protected:
     static IOCB WroteControlMsg;
     void wroteControlMsg(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag, int xerrno);
 
 private:
     void prepareReply(HttpReply * rep);
     void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb);
     void packRange(StoreIOBuffer const &, MemBuf * mb);
-    void deRegisterWithConn();
     void doClose();
     void initiateClose(const char *reason);
 
     AsyncCall::Pointer cbControlMsgSent; ///< notifies HttpControlMsg Source
 
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
 
     CBDATA_CLASS2(ClientSocketContext);
 };
 
 class ConnectionDetail;
 #if USE_OPENSSL
 namespace Ssl
 {
 class ServerBump;
 }
 #endif
 /**
  * Manages a connection to a client.
  *
  * Multiple requests (up to pipeline_prefetch) can be pipelined. This object is responsible for managing
  * which one is currently being fulfilled and what happens to the queue if the current one
  * causes the client connection to be closed early.
  *
  * Act as a manager for the connection and passes data in buffer to the current parser.
  * the parser has ambiguous scope at present due to being made from global functions
  * I believe this object uses the parser to identify boundaries and kick off the
  * actual HTTP request handling objects (ClientSocketContext, ClientHttpRequest, HttpRequest)
  *
  * If the above can be confirmed accurate we can call this object PipelineManager or similar
  */
 class ConnStateData : public BodyProducer, public HttpControlMsgSink
 {
 
 public:
     explicit ConnStateData(const MasterXaction::Pointer &xact);
     ~ConnStateData();
 
     void readSomeData();
-    bool areAllContextsForThisConnection() const;
-    void freeAllContexts();
-    void notifyAllContexts(const int xerrno); ///< tell everybody about the err
     /// Traffic parsing
     bool clientParseRequests();
     void readNextRequest();
-    ClientSocketContext::Pointer getCurrentContext() const;
-    void addContextToQueue(ClientSocketContext * context);
-    int getConcurrentRequestCount() const;
     bool isOpen() const;
     void checkHeaderLimits();
 
+    /// try to complete a transaction or read more I/O
+    void kick();
+
     // HttpControlMsgSink API
     virtual void sendControlMsg(HttpControlMsg msg);
 
     // Client TCP connection details from comm layer.
     Comm::ConnectionPointer clientConnection;
 
     struct In {
         In();
         ~In();
         bool maybeMakeSpaceAvailable();
 
         ChunkedCodingParser *bodyParser; ///< parses chunked request body
         SBuf buf;
     } in;
 
     /** number of body bytes we need to comm_read for the "current" request
      *
      * \retval 0         We do not need to read any [more] body bytes
      * \retval negative  May need more but do not know how many; could be zero!
      * \retval positive  Need to read exactly that many more body bytes
@@ -226,48 +221,44 @@
     int64_t mayNeedToReadMoreBody() const;
 
 #if USE_AUTH
     /**
      * Fetch the user details for connection based authentication
      * NOTE: this is ONLY connection based because NTLM and Negotiate is against HTTP spec.
      */
     const Auth::UserRequest::Pointer &getAuth() const { return auth_; }
 
     /**
      * Set the user details for connection-based authentication to use from now until connection closure.
      *
      * Any change to existing credentials shows that something invalid has happened. Such as:
      * - NTLM/Negotiate auth was violated by the per-request headers missing a revalidation token
      * - NTLM/Negotiate auth was violated by the per-request headers being for another user
      * - SSL-Bump CONNECT tunnel with persistent credentials has ended
      */
     void setAuth(const Auth::UserRequest::Pointer &aur, const char *cause);
 #endif
 
-    /**
-     * used by the owner of the connection, opaque otherwise
-     * TODO: generalise the connection owner concept.
-     */
-    ClientSocketContext::Pointer currentobject;
+    /// pipeline/set of transactions waiting to be serviced
+    Pipeline pipeline;
 
     Ip::Address log_addr;
-    int nrequests;
 
     struct {
         bool readMore; ///< needs comm_read (for this request or new requests)
         bool swanSang; // XXX: temporary flag to check proper cleanup
     } flags;
     struct {
         Comm::ConnectionPointer serverConnection; /* pinned server side connection */
         char *host;             /* host name of pinned connection */
         int port;               /* port of pinned connection */
         bool pinned;             /* this connection was pinned */
         bool auth;               /* pinned for www authentication */
         bool zeroReply; ///< server closed w/o response (ERR_ZERO_SIZE_OBJECT)
         CachePeer *peer;             /* CachePeer the connection goes via */
         AsyncCall::Pointer readHandler; ///< detects serverConnection closure
         AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/
     } pinning;
 
     /// Squid listening port details where this connection arrived.
     AnyP::PortCfg *port;
 

=== modified file 'src/stat.cc'
--- src/stat.cc	2014-03-30 12:00:34 +0000
+++ src/stat.cc	2014-05-11 11:00:34 +0000
@@ -1869,42 +1869,41 @@
     char buf[MAX_IPSTRLEN];
 
     for (i = ClientActiveRequests.head; i; i = i->next) {
         const char *p = NULL;
         http = static_cast<ClientHttpRequest *>(i->data);
         assert(http);
         ConnStateData * conn = http->getConn();
         storeAppendPrintf(s, "Connection: %p\n", conn);
 
         if (conn != NULL) {
             const int fd = conn->clientConnection->fd;
             storeAppendPrintf(s, "\tFD %d, read %" PRId64 ", wrote %" PRId64 "\n", fd,
                               fd_table[fd].bytes_read, fd_table[fd].bytes_written);
             storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc);
             storeAppendPrintf(s, "\tin: buf %p, used %ld, free %ld\n",
                               conn->in.buf.c_str(), (long int) conn->in.buf.length(), (long int) conn->in.buf.spaceSize());
             storeAppendPrintf(s, "\tremote: %s\n",
                               conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN));
             storeAppendPrintf(s, "\tlocal: %s\n",
                               conn->clientConnection->local.toUrl(buf,MAX_IPSTRLEN));
-            storeAppendPrintf(s, "\tnrequests: %d\n",
-                              conn->nrequests);
+            storeAppendPrintf(s, "\tnrequests: %u\n", conn->pipeline.nrequests);
         }
 
         storeAppendPrintf(s, "uri %s\n", http->uri);
         storeAppendPrintf(s, "logType %s\n", LogTags_str[http->logType]);
         storeAppendPrintf(s, "out.offset %ld, out.size %lu\n",
                           (long int) http->out.offset, (unsigned long int) http->out.size);
         storeAppendPrintf(s, "req_sz %ld\n", (long int) http->req_sz);
         e = http->storeEntry();
         storeAppendPrintf(s, "entry %p/%s\n", e, e ? e->getMD5Text() : "N/A");
         storeAppendPrintf(s, "start %ld.%06d (%f seconds ago)\n",
                           (long int) http->al->cache.start_time.tv_sec,
                           (int) http->al->cache.start_time.tv_usec,
                           tvSubDsec(http->al->cache.start_time, current_time));
 #if USE_AUTH
         if (http->request->auth_user_request != NULL)
             p = http->request->auth_user_request->username();
         else
 #endif
             if (http->request->extacl_user.size() > 0) {
                 p = http->request->extacl_user.termedBuf();

=== modified file 'src/tests/stub_client_side.cc'
--- src/tests/stub_client_side.cc	2014-03-30 12:00:34 +0000
+++ src/tests/stub_client_side.cc	2014-05-11 11:00:34 +0000
@@ -4,57 +4,52 @@
 #define STUB_API "client_side.cc"
 #include "tests/STUB.h"
 
 //ClientSocketContext::ClientSocketContext(const ConnectionPointer&, ClientHttpRequest*) STUB
 //ClientSocketContext::~ClientSocketContext() STUB
 bool ClientSocketContext::startOfOutput() const STUB_RETVAL(false)
 void ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag) STUB
 void ClientSocketContext::keepaliveNextRequest() STUB
 void ClientSocketContext::pullData() STUB
 int64_t ClientSocketContext::getNextRangeOffset() const STUB_RETVAL(0)
 bool ClientSocketContext::canPackMoreRanges() const STUB_RETVAL(false)
 clientStream_status_t ClientSocketContext::socketState() STUB_RETVAL(STREAM_NONE)
 void ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData) STUB
 void ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData) STUB
 size_t ClientSocketContext::lengthToSend(Range<int64_t> const &available) STUB_RETVAL(0)
 void ClientSocketContext::noteSentBodyBytes(size_t) STUB
 void ClientSocketContext::buildRangeHeader(HttpReply * rep) STUB
 clientStreamNode * ClientSocketContext::getTail() const STUB_RETVAL(NULL)
 clientStreamNode * ClientSocketContext::getClientReplyContext() const STUB_RETVAL(NULL)
 void ClientSocketContext::connIsFinished() STUB
-void ClientSocketContext::removeFromConnectionList(ConnStateData * conn) STUB
 void ClientSocketContext::deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData) STUB
 bool ClientSocketContext::multipartRangeRequest() const STUB_RETVAL(false)
 void ClientSocketContext::registerWithConn() STUB
 void ClientSocketContext::noteIoError(const int xerrno) STUB
 void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB
 
 void ConnStateData::readSomeData() STUB
-bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false)
-void ConnStateData::freeAllContexts() STUB
-void ConnStateData::notifyAllContexts(const int xerrno) STUB
 bool ConnStateData::clientParseRequests() STUB_RETVAL(false)
 void ConnStateData::readNextRequest() STUB
-void ConnStateData::addContextToQueue(ClientSocketContext * context) STUB
-int ConnStateData::getConcurrentRequestCount() const STUB_RETVAL(0)
 bool ConnStateData::isOpen() const STUB_RETVAL(false)
 void ConnStateData::checkHeaderLimits() STUB
+void ConnStateData::kick() STUB
 void ConnStateData::sendControlMsg(HttpControlMsg msg) STUB
 int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0)
 #if USE_AUTH
 void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB
 #endif
 bool ConnStateData::transparent() const STUB_RETVAL(false)
 bool ConnStateData::reading() const STUB_RETVAL(false)
 void ConnStateData::stopReading() STUB
 void ConnStateData::stopReceiving(const char *error) STUB
 void ConnStateData::stopSending(const char *error) STUB
 void ConnStateData::expectNoForwarding() STUB
 void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB
 void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB
 bool ConnStateData::handleReadData(SBuf *buf) STUB_RETVAL(false)
 bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false)
 void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth) STUB
 void ConnStateData::unpinConnection() STUB
 const Comm::ConnectionPointer ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *peer) STUB_RETVAL(NULL)
 void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) STUB
 void ConnStateData::clientReadRequest(const CommIoCbParams &io) STUB


