=== modified file 'doc/release-notes/release-3.4.sgml'
--- doc/release-notes/release-3.4.sgml	2013-05-14 17:54:30 +0000
+++ doc/release-notes/release-3.4.sgml	2013-05-16 03:38:06 +0000
@@ -179,6 +179,9 @@
 	<p>New format code <em>%note</em> to log a transaction annotation linked to the
 	   transaction by ICAP, eCAP, a helper, or the <em>note</em> squid.conf directive.
 
+	<tag>pipeline_prefetch</tag>
+	<p>Updated to take a numeric count of prefetched pipeline requests instead of ON/OFF.
+
 	<tag>unlinkd_program</tag>
 	<p>New helper response format utilizing result codes <em>OK</em> and <em>BH</em>,
 	   to signal helper lookup results. Also, key-value response values to return

=== modified file 'src/Parsing.cc'
--- src/Parsing.cc	2013-05-04 13:14:23 +0000
+++ src/Parsing.cc	2013-05-16 13:55:49 +0000
@@ -33,6 +33,7 @@
 #include "squid.h"
 #include "cache_cf.h"
 #include "compat/strtoll.h"
+#include "ConfigParser.h"
 #include "Parsing.h"
 #include "globals.h"
 #include "Debug.h"
@@ -161,7 +162,7 @@
 int
 GetInteger(void)
 {
-    char *token = strtok(NULL, w_space);
+    char *token = ConfigParser::strtokFile();
     int i;
 
     if (token == NULL)

=== modified file 'src/SquidConfig.h'
--- src/SquidConfig.h	2013-05-13 03:57:03 +0000
+++ src/SquidConfig.h	2013-05-16 03:21:38 +0000
@@ -332,7 +332,6 @@
 
         int ie_refresh;
         int vary_ignore_expire;
-        int pipeline_prefetch;
         int surrogate_is_remote;
         int request_entities;
         int detect_broken_server_pconns;
@@ -361,6 +360,8 @@
         int client_dst_passthru;
     } onoff;
 
+    int pipeline_max_prefetch;
+
     int forward_max_tries;
     int connect_retries;
 

=== modified file 'src/cache_cf.cc'
--- src/cache_cf.cc	2013-05-14 18:36:45 +0000
+++ src/cache_cf.cc	2013-05-22 12:17:28 +0000
@@ -965,6 +965,16 @@
                (uint32_t)Config.maxRequestBufferSize, (uint32_t)Config.maxRequestHeaderSize);
     }
 
+    /*
+     * Disable client side request pipelining if client_persistent_connections OFF.
+     * Waste of resources queueing any pipelined requests when the first will close the connection.
+     */
+    if (Config.pipeline_max_prefetch > 0 && !Config.onoff.client_pconns) {
+        debugs(3, DBG_PARSE_NOTE(DBG_IMPORTANT), "WARNING: pipeline_prefetch " << Config.pipeline_max_prefetch <<
+                   " requires client_persistent_connections ON. Forced pipeline_prefetch 0.");
+        Config.pipeline_max_prefetch = 0;
+    }
+
 #if USE_AUTH
     /*
      * disable client side request pipelining. There is a race with
@@ -973,12 +983,12 @@
      * pipelining OFF, the client may fail to authenticate, but squid's
      * state will be preserved.
      */
-    if (Config.onoff.pipeline_prefetch) {
+    if (Config.pipeline_max_prefetch > 0) {
         Auth::Config *nego = Auth::Config::Find("Negotiate");
         Auth::Config *ntlm = Auth::Config::Find("NTLM");
         if ((nego && nego->active()) || (ntlm && ntlm->active())) {
-            debugs(3, DBG_IMPORTANT, "WARNING: pipeline_prefetch breaks NTLM and Negotiate authentication. Forced OFF.");
-            Config.onoff.pipeline_prefetch = 0;
+            debugs(3, DBG_PARSE_NOTE(DBG_IMPORTANT), "WARNING: pipeline_prefetch breaks NTLM and Negotiate authentication. Forced pipeline_prefetch 0.");
+            Config.pipeline_max_prefetch = 0;
         }
     }
 #endif
@@ -2691,6 +2701,29 @@
 
 #define free_tristate free_int
 
+void
+parse_pipelinePrefetch(int *var)
+{
+    char *token = ConfigParser::strtokFile();
+
+    if (token == NULL)
+        self_destruct();
+
+    if (!strcmp(token, "on")) {
+        debugs(0, DBG_PARSE_NOTE(DBG_IMPORTANT), "WARNING: 'pipeline_prefetch on' is deprecated. Please update to use 1 (or a higher number).");
+        *var = 1;
+    } else if (!strcmp(token, "off")) {
+        debugs(0, DBG_PARSE_NOTE(2), "WARNING: 'pipeline_prefetch off' is deprecated. Please update to use '0'.");
+        *var = 0;
+    } else {
+        ConfigParser::strtokFileUndo();
+        parse_int(var);
+    }
+}
+
+#define free_pipelinePrefetch free_int
+#define dump_pipelinePrefetch dump_int
+
 static void
 dump_refreshpattern(StoreEntry * entry, const char *name, RefreshPattern * head)
 {

=== modified file 'src/cf.data.depend'
--- src/cf.data.depend	2013-05-12 05:04:14 +0000
+++ src/cf.data.depend	2013-05-16 04:00:13 +0000
@@ -51,6 +51,7 @@
 onoff
 peer
 peer_access		cache_peer acl
+pipelinePrefetch
 PortCfg
 QosConfig
 refreshpattern

=== modified file 'src/cf.data.pre'
--- src/cf.data.pre	2013-05-14 17:53:18 +0000
+++ src/cf.data.pre	2013-05-21 02:50:03 +0000
@@ -8701,17 +8701,23 @@
 DOC_END
 
 NAME: pipeline_prefetch
-TYPE: onoff
-LOC: Config.onoff.pipeline_prefetch
-DEFAULT: off
+TYPE: pipelinePrefetch
+LOC: Config.pipeline_max_prefetch
+DEFAULT: 0
+DEFAULT_DOC: Do not pre-parse pipelined requests.
 DOC_START
-	To boost the performance of pipelined requests to closer
-	match that of a non-proxied environment Squid can try to fetch
-	up to two requests in parallel from a pipeline.
+	HTTP clients may send a pipeline of 1+N requests to Squid using a
+	single connection, without waiting for Squid to respond to the first
+	of those requests. This option limits the number of concurrent
+	requests Squid will try to handle in parallel. If set to N, Squid
+	will try to receive and process up to 1+N requests on the same
+	connection concurrently.
 
-	Defaults to off for bandwidth management and access logging
+	Defaults to 0 (off) for bandwidth management and access logging
 	reasons.
 
+	NOTE: pipelining requires persistent connections to clients.
+
 	WARNING: pipelining breaks NTLM and Negotiate/Kerberos authentication.
 DOC_END
 

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2013-05-23 08:18:09 +0000
+++ src/client_side.cc	2013-05-24 14:48:34 +0000
@@ -919,6 +919,7 @@
 #if USE_SSL
     delete sslServerBump;
 #endif
+    xfree(pipelineBlockReason_);
 }
 
 /**
@@ -1991,6 +1992,12 @@
 static ClientSocketContext *
 parseHttpRequestAbort(ConnStateData * csd, const char *uri)
 {
+    // if the parser detected header frame problems and cannot even parse it
+    // we cannot pipeline anything following the possibly invalid frame end.
+    csd->pipelineDisable(uri);
+
+    // XXX: why is this not using quitAfterError()?
+
     ClientHttpRequest *http;
     ClientSocketContext *context;
     StoreIOBuffer tempBuffer;
@@ -2302,6 +2309,10 @@
     /* Set method_p */
     *method_p = HttpRequestMethod(&hp->buf[hp->req.m_start], &hp->buf[hp->req.m_end]+1);
 
+    /* CONNECT requests block pipelining if allowed. */
+    if (*method_p == Http::METHOD_CONNECT)
+        csd->pipelineBlock("CONNECT request");
+
     /* deny CONNECT via accelerated ports */
     if (*method_p == Http::METHOD_CONNECT && csd->port && csd->port->flags.accelSurrogate) {
         debugs(33, DBG_IMPORTANT, "WARNING: CONNECT method received on " << csd->port->protocol << " Accelerator port " << csd->port->s.GetPort() );
@@ -2567,6 +2578,7 @@
     if (request)
         request->flags.proxyKeepalive = false;
     flags.readMore = false;
+    pipelineDisable("will close after error");
     debugs(33,4, HERE << "Will close after error: " << clientConnection);
 }
 
@@ -2747,6 +2759,10 @@
         goto finish;
     }
 
+    // if client disabled persistence we should stop pipelining more requests.
+    if (!request->persistent())
+        conn->pipelineDisable("client will close");
+
     request->clientConnectionManager = conn;
 
     request->flags.accelerated = http->flags.accel;
@@ -2887,6 +2903,7 @@
         // consume header early so that body pipe gets just the body
         connNoteUseOfBuffer(conn, http->req_sz);
         notedUseOfBuffer = true;
+        conn->pipelineBlock("request payload");
 
         /* Is it too large? */
         if (!chunked && // if chunked, we will check as we accumulate
@@ -2933,6 +2950,7 @@
     if (request != NULL && request->flags.resetTcp && Comm::IsConnOpen(conn->clientConnection)) {
         debugs(33, 3, HERE << "Sending TCP RST on " << conn->clientConnection);
         conn->flags.readMore = false;
+        conn->pipelineDisable("Sending TCP RST");
         comm_reset_close(conn->clientConnection);
     }
 }
@@ -2946,17 +2964,79 @@
     }
 }
 
-static int
-connOkToAddRequest(ConnStateData * conn)
-{
-    int result = conn->getConcurrentRequestCount() < (Config.onoff.pipeline_prefetch ? 2 : 1);
-
-    if (!result) {
-        debugs(33, 3, HERE << conn->clientConnection << " max concurrent requests reached");
-        debugs(33, 5, HERE << conn->clientConnection << " defering new request until one is done");
-    }
-
-    return result;
+bool
+ConnStateData::pipelineEnabled() const
+{
+    return !pipelineTerminated_ && Config.pipeline_max_prefetch > 0;
+}
+
+void
+ConnStateData::pipelineDisable(const char *reason)
+{
+    debugs(33, 4, clientConnection << " pipeline terminated because " << reason);
+
+    if (pipelineTerminated_) {
+        debugs(33, 3, clientConnection << " pipeline already terminated by: " << pipelineBlockReason_);
+        return;
+    }
+
+    pipelineTerminated_ = true;
+    xfree(pipelineBlockReason_); // may have been temporary-blocked. Terminate overrides that.
+    pipelineBlockReason_ = xstrdup(reason);
+}
+
+void
+ConnStateData::pipelineBlock(const char *reason)
+{
+    debugs(33, 4, clientConnection << " pipeline queue blocked because " << reason);
+
+    if (pipelineBlockReason_) {
+        debugs(33, 4, "pipeline queue already blocked due to: " << pipelineBlockReason_);
+        return;
+    }
+
+    pipelineBlockReason_ = xstrdup(reason);
+}
+
+bool
+ConnStateData::pipelineOkToAddRequest()
+{
+    // if client already sent Connection:close or some equivalent, we stop prefetching
+    if (pipelineTerminated_) {
+        debugs(33, 3, clientConnection << " defering new concurrent request due to " << pipelineBlockReason_);
+        return false;
+    }
+
+    const int existingRequestCount = getConcurrentRequestCount();
+
+    // if whatever request caused the temporary block has been completed, re-open the pipeline
+    if (existingRequestCount < 1) {
+        safe_free(pipelineBlockReason_)
+        return true;
+    }
+
+    // when still blocked
+    if (pipelineBlockReason_ != NULL) {
+        debugs(33, 3, clientConnection << " defering new concurrent request due to " << pipelineBlockReason_);
+        return true;
+    }
+
+    // default to the configured pipeline size.
+    // add 1 because the head of pipeline is counted in concurrent requests and not prefetch queue
+    const int concurrentRequestLimit = Config.pipeline_max_prefetch + 1;
+
+    // when queue filled already we cant add more.
+    if (existingRequestCount >= concurrentRequestLimit) {
+        debugs(33, 3, clientConnection << " max concurrent requests reached (" << concurrentRequestLimit << ")");
+        debugs(33, 5, clientConnection << " defering new request until one is done");
+        return false;
+    }
+
+    // TODO: check that pipeline queue length is no more than M seconds long already.
+    // For long-delay queues we need to push back at the client via leaving things in TCP buffers.
+    // By time M all queued objects are already at risk of getting client-timeout errors.
+
+    return true;
 }
 
 /**
@@ -2981,8 +3061,8 @@
         if (in.notYetUsed == 0)
             break;
 
-        /* Limit the number of concurrent requests to 2 */
-        if (!connOkToAddRequest(this)) {
+        /* Limit the number of concurrent requests */
+        if (!pipelineOkToAddRequest()) {
             break;
         }
 

=== modified file 'src/client_side.h'
--- src/client_side.h	2013-04-04 06:15:00 +0000
+++ src/client_side.h	2013-05-24 11:24:55 +0000
@@ -178,7 +178,7 @@
 /**
  * Manages a connection to a client.
  *
- * Multiple requests (up to 2) can be pipelined. This object is responsible for managing
+ * Multiple requests (up to pipeline_prefetch) can be pipelined. This object is responsible for managing
  * which one is currently being fulfilled and what happens to the queue if the current one
  * causes the client connection to be closed early.
  *
@@ -209,6 +209,17 @@
     ClientSocketContext::Pointer getCurrentContext() const;
     void addContextToQueue(ClientSocketContext * context);
     int getConcurrentRequestCount() const;
+
+    /// whether pipelining on this connection is enabled
+    /// Note that this will return true even if the pipeline is temporarily blocked or queue is full.
+    bool pipelineEnabled() const;
+    /// whether it is okay to parse and queue another pipelined request right now
+    bool pipelineOkToAddRequest();
+    /// temporarily block pipelining until the currently queued requests are all completed.
+    void pipelineBlock(const char *reason);
+    /// terminate pipelining any further requests due to eth reason given.
+    void pipelineDisable(const char *reason);
+
     bool isOpen() const;
     void checkHeaderLimits();
 
@@ -419,6 +430,11 @@
     /// the reason why we no longer read the request or nil
     const char *stoppedReceiving_;
 
+    /// whether pipelining on this connection has reached a state where it must stop completely.
+    bool pipelineTerminated_;
+    /// the reason why we will not prefetch new pipeline requests, or nil
+    const char *pipelineBlockReason_;
+
     AsyncCall::Pointer reader; ///< set when we are reading
     BodyPipe::Pointer bodyPipe; // set when we are reading request body
 


