=== modified file 'src/CommCalls.cc'
--- src/CommCalls.cc	2009-07-12 22:56:47 +0000
+++ src/CommCalls.cc	2011-01-09 01:36:41 +0000
@@ -113,40 +113,46 @@
 {
 }
 
 /* CommTimeoutCbParams */
 
 CommTimeoutCbParams::CommTimeoutCbParams(void *aData):
         CommCommonCbParams(aData)
 {
 }
 
 
 /* CommAcceptCbPtrFun */
 
 CommAcceptCbPtrFun::CommAcceptCbPtrFun(IOACB *aHandler,
                                        const CommAcceptCbParams &aParams):
         CommDialerParamsT<CommAcceptCbParams>(aParams),
         handler(aHandler)
 {
 }
 
+CommAcceptCbPtrFun::CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o):
+        CommDialerParamsT<CommAcceptCbParams>(o.params),
+        handler(o.handler)
+{
+}
+
 void
 CommAcceptCbPtrFun::dial()
 {
     handler(params.fd, params.nfd, &params.details, params.flag, params.xerrno, params.data);
 }
 
 void
 CommAcceptCbPtrFun::print(std::ostream &os) const
 {
     os << '(';
     params.print(os);
     os << ')';
 }
 
 
 /* CommConnectCbPtrFun */
 
 CommConnectCbPtrFun::CommConnectCbPtrFun(CNCB *aHandler,
         const CommConnectCbParams &aParams):
         CommDialerParamsT<CommConnectCbParams>(aParams),

=== modified file 'src/CommCalls.h'
--- src/CommCalls.h	2010-08-24 00:12:54 +0000
+++ src/CommCalls.h	2011-01-09 04:56:21 +0000
@@ -159,42 +159,45 @@
     virtual void print(std::ostream &os) const {
         os << '(';
         this->params.print(os);
         os << ')';
     }
 
 public:
     Method method;
 
 protected:
     virtual void doDial() { ((&(*this->job))->*method)(this->params); }
 };
 
 
 // accept (IOACB) dialer
 class CommAcceptCbPtrFun: public CallDialer,
         public CommDialerParamsT<CommAcceptCbParams>
 {
 public:
     typedef CommAcceptCbParams Params;
+    typedef RefCount<CommAcceptCbPtrFun> Pointer;
 
     CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
+    CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o);
+
     void dial();
 
     virtual void print(std::ostream &os) const;
 
 public:
     IOACB *handler;
 };
 
 // connect (CNCB) dialer
 class CommConnectCbPtrFun: public CallDialer,
         public CommDialerParamsT<CommConnectCbParams>
 {
 public:
     typedef CommConnectCbParams Params;
 
     CommConnectCbPtrFun(CNCB *aHandler, const Params &aParams);
     void dial();
 
     virtual void print(std::ostream &os) const;
 
@@ -242,45 +245,51 @@
 public:
     typedef CommTimeoutCbParams Params;
 
     CommTimeoutCbPtrFun(PF *aHandler, const Params &aParams);
     void dial();
 
     virtual void print(std::ostream &os) const;
 
 public:
     PF *handler;
 };
 
 // AsyncCall to comm handlers implemented as global functions.
 // The dialer is one of the Comm*CbPtrFunT above
 // TODO: Get rid of this class by moving canFire() to canDial() method
 // of dialers.
 template <class Dialer>
 class CommCbFunPtrCallT: public AsyncCall
 {
 public:
+    typedef RefCount<CommCbFunPtrCallT<Dialer> > Pointer;
     typedef typename Dialer::Params Params;
 
     inline CommCbFunPtrCallT(int debugSection, int debugLevel,
                              const char *callName, const Dialer &aDialer);
 
+    inline CommCbFunPtrCallT(const Pointer &p) :
+            AsyncCall(p->debugSection, p->debugLevel, p->name),
+            dialer(p->dialer)
+        {}
+
     virtual CallDialer* getDialer() { return &dialer; }
 
 public:
     Dialer dialer;
 
 protected:
     inline virtual bool canFire();
     inline virtual void fire();
 };
 
 // Conveninece wrapper: It is often easier to call a templated function than
 // to create a templated class.
 template <class Dialer>
 inline
 CommCbFunPtrCallT<Dialer> *commCbCall(int debugSection, int debugLevel,
                                       const char *callName, const Dialer &dialer)
 {
     return new CommCbFunPtrCallT<Dialer>(debugSection, debugLevel, callName,
                                          dialer);
 }

=== modified file 'src/ProtoPort.cc'
--- src/ProtoPort.cc	2010-11-18 08:01:53 +0000
+++ src/ProtoPort.cc	2011-01-09 00:58:29 +0000
@@ -1,42 +1,47 @@
 /*
  * $Id$
  */
 
 #include "squid.h"
+#include "comm.h"
 #include "ProtoPort.h"
 #if HAVE_LIMITS
 #include <limits>
 #endif
 
-http_port_list::http_port_list(const char *aProtocol)
+http_port_list::http_port_list(const char *aProtocol) :
+        listenFd(-1)
 #if USE_SSL
-        :
-        http(*this), dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
+        , http(*this)
+        , dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
 #endif
 {
     protocol = xstrdup(aProtocol);
 }
 
 http_port_list::~http_port_list()
 {
-    delete listener;
+    if (listenFd >= 0) {
+        comm_close(listenFd);
+        listenFd = -1;
+    }
 
     safe_free(name);
     safe_free(defaultsite);
     safe_free(protocol);
 
 #if USE_SSL
     safe_free(cert);
     safe_free(key);
     safe_free(options);
     safe_free(cipher);
     safe_free(cafile);
     safe_free(capath);
     safe_free(dhfile);
     safe_free(sslflags);
     safe_free(sslContextSessionId);
 #endif
 }
 
 
 #if USE_SSL

=== modified file 'src/ProtoPort.h'
--- src/ProtoPort.h	2010-11-18 08:01:53 +0000
+++ src/ProtoPort.h	2011-01-09 00:55:34 +0000
@@ -1,70 +1,68 @@
 /*
  * $Id$
  */
 #ifndef SQUID_PROTO_PORT_H
 #define SQUID_PROTO_PORT_H
 
-//#include "typedefs.h"
 #include "cbdata.h"
-#include "comm/ListenStateData.h"
 
 #if USE_SSL
 #include "ssl/gadgets.h"
 #endif
 
 struct http_port_list {
     http_port_list(const char *aProtocol);
     ~http_port_list();
 
     http_port_list *next;
 
     Ip::Address s;
     char *protocol;            /* protocol name */
     char *name;                /* visible name */
     char *defaultsite;         /* default web site */
 
     unsigned int intercepted:1;        /**< intercepting proxy port */
     unsigned int spoof_client_ip:1;    /**< spoof client ip if possible */
     unsigned int accel:1;              /**< HTTP accelerator */
     unsigned int allow_direct:1;       /**< Allow direct forwarding in accelerator mode */
     unsigned int vhost:1;              /**< uses host header */
     unsigned int sslBump:1;            /**< intercepts CONNECT requests */
     unsigned int ignore_cc:1;          /**< Ignore request Cache-Control directives */
 
     int vport;                 /* virtual port support, -1 for dynamic, >0 static*/
     bool connection_auth_disabled;     /* Don't support connection oriented auth */
     int disable_pmtu_discovery;
 
     struct {
         unsigned int enabled;
         unsigned int idle;
         unsigned int interval;
         unsigned int timeout;
     } tcp_keepalive;
 
     /**
-     * The FD listening socket handler.
-     * If not NULL we are actively listening for client requests.
-     * delete to close the socket.
+     * The FD listening socket.
+     * If >= 0 we are actively listening for client requests.
+     * use comm_close(listenFd) to stop.
      */
-    Comm::ListenStateData *listener;
+    int listenFd;
 
 #if USE_SSL
     // XXX: temporary hack to ease move of SSL options to http_port
     http_port_list &http;
 
     char *cert;
     char *key;
     int version;
     char *cipher;
     char *options;
     char *clientca;
     char *cafile;
     char *capath;
     char *crlfile;
     char *dhfile;
     char *sslflags;
     char *sslContextSessionId; ///< "session id context" for staticSslContext
     bool generateHostCertificates; ///< dynamically make host cert for sslBump
     size_t dynamicCertMemCacheSize; ///< max size of generated certificates memory cache
 

=== modified file 'src/base/AsyncCall.h'
--- src/base/AsyncCall.h	2010-12-02 23:33:27 +0000
+++ src/base/AsyncCall.h	2011-01-09 04:58:01 +0000
@@ -28,40 +28,42 @@
  * You do not have to use the macros below to make or receive asynchronous
  * method calls, but they give you a uniform interface and handy call
  * debugging.
  */
 
 class CallDialer;
 class AsyncCallQueue;
 
 /**
  \todo add unique call IDs
  \todo CBDATA_CLASS2 kids
  \ingroup AsyncCallsAPI
  */
 class AsyncCall: public RefCountable
 {
 public:
     typedef RefCount <AsyncCall> Pointer;
     friend class AsyncCallQueue;
 
     AsyncCall(int aDebugSection, int aDebugLevel, const char *aName);
+    AsyncCall();
+    AsyncCall(const AsyncCall &);
     virtual ~AsyncCall();
 
     void make(); // fire if we can; handles general call debugging
 
     // can be called from canFire() for debugging; always returns false
     bool cancel(const char *reason);
 
     bool canceled() { return isCanceled != NULL; }
 
     virtual CallDialer *getDialer() = 0;
 
     void print(std::ostream &os);
 
     /// remove us from the queue; we are head unless we are queued after prev
     void dequeue(AsyncCall::Pointer &head, AsyncCall::Pointer &prev);
 
     void setNext(AsyncCall::Pointer aNext) {
         theNext = aNext;
     }
 
@@ -105,40 +107,44 @@
 
     // TODO: Add these for clarity when CommCbFunPtrCallT is gone
     //virtual bool canDial(AsyncCall &call) = 0;
     //virtual void dial(AsyncCall &call) = 0;
 
     virtual void print(std::ostream &os) const = 0;
 };
 
 /**
  \ingroup AsyncCallAPI
  * This template implements an AsyncCall using a specified Dialer class
  */
 template <class Dialer>
 class AsyncCallT: public AsyncCall
 {
 public:
     AsyncCallT(int aDebugSection, int aDebugLevel, const char *aName,
                const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName),
             dialer(aDialer) {}
 
+    AsyncCallT(const RefCount<AsyncCallT<Dialer> > &o):
+            AsyncCall(o->debugSection, o->debugLevel, o->name),
+            dialer(o->dialer) {}
+
     CallDialer *getDialer() { return &dialer; }
 
 protected:
     virtual bool canFire() {
         return AsyncCall::canFire() &&
                dialer.canDial(*this);
     }
     virtual void fire() { dialer.dial(*this); }
 
     Dialer dialer;
 };
 
 template <class Dialer>
 inline
 AsyncCall *
 asyncCall(int aDebugSection, int aDebugLevel, const char *aName,
           const Dialer &aDialer)
 {
     return new AsyncCallT<Dialer>(aDebugSection, aDebugLevel, aName, aDialer);
 }

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2011-01-10 09:43:43 +0000
+++ src/client_side.cc	2011-01-10 10:08:21 +0000
@@ -79,106 +79,104 @@
  * ClientKeepAliveNextRequest will then detect the presence of data in
  * the next ClientHttpRequest, and will send it, restablishing the
  * data flow.
  */
 
 #include "squid.h"
 
 #include "acl/FilledChecklist.h"
 #include "auth/UserRequest.h"
 #include "base/TextException.h"
 #include "ChunkedCodingParser.h"
 #include "client_side.h"
 #include "client_side_reply.h"
 #include "client_side_request.h"
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
 #endif
 #include "ClientRequestContext.h"
 #include "clientStream.h"
 #include "comm.h"
-#include "comm/Write.h"
-#include "comm/ListenStateData.h"
+#include "CommCalls.h"
 #include "comm/Loops.h"
+#include "comm/Write.h"
+#include "comm/TcpAcceptor.h"
 #include "ConnectionDetail.h"
 #include "eui/Config.h"
 #include "fde.h"
 #include "HttpHdrContRange.h"
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "ip/Intercept.h"
 #include "ipc/StartListening.h"
 #include "MemBuf.h"
 #include "MemObject.h"
 #include "ProtoPort.h"
 #include "rfc1738.h"
 #include "SquidTime.h"
 #if USE_SSL
 #include "ssl/context_storage.h"
 #include "ssl/helper.h"
 #include "ssl/support.h"
 #include "ssl/gadgets.h"
 #endif
 #if USE_SSL_CRTD
 #include "ssl/crtd_message.h"
 #include "ssl/certificate_db.h"
 #endif
 #include "Store.h"
 
 #if HAVE_LIMITS
 #include <limits>
 #endif
 
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
 #endif
 
-/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
+/// dials clientListenerConnectionOpened call
 class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
 {
 public:
-    typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
-    ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
-            handler(aHandler), portCfg(aPortCfg) {}
+    typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg, bool uses_ssl);
+    ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, bool aSslFlag):
+            handler(aHandler), portCfg(aPortCfg), uses_ssl(aSslFlag) {}
 
     virtual void print(std::ostream &os) const {
         startPrint(os) <<
         ", port=" << (void*)portCfg << ')';
     }
 
     virtual bool canDial(AsyncCall &) const { return true; }
-    virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
+    virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg, uses_ssl); }
 
 public:
     Handler handler;
 
 private:
     http_port_list *portCfg; ///< from Config.Sockaddr.http
+    bool uses_ssl;
 };
 
-
-static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s);
-#if USE_SSL
-static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s);
-#endif
+static void clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl);
 
 /* our socket-related context */
 
 
 CBDATA_CLASS_INIT(ClientSocketContext);
 
 void *
 ClientSocketContext::operator new (size_t byteCount)
 {
     /* derived classes with different sizes must implement their own new */
     assert (byteCount == sizeof (ClientSocketContext));
     CBDATA_INIT_TYPE(ClientSocketContext);
     return cbdataAlloc(ClientSocketContext);
 }
 
 void
 ClientSocketContext::operator delete (void *address)
 {
     cbdataFree (address);
 }
@@ -3104,48 +3102,49 @@
 
 #else
 
         static int reported = 0;
 
         if (!reported) {
             debugs(33, 1, "Notice: httpd_accel_no_pmtu_disc not supported on your platform");
             reported = 1;
         }
 
 #endif
 
     }
 
     result->flags.readMoreRequests = true;
     return result;
 }
 
 /** Handle a new connection on HTTP socket. */
 void
-httpAccept(int sock, int newfd, ConnectionDetail *details,
-           comm_err_t flag, int xerrno, void *data)
+httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
 {
     http_port_list *s = (http_port_list *)data;
     ConnStateData *connState = NULL;
 
     if (flag != COMM_OK) {
-        debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+        // This should not occur with TcpAcceptor.
+        // However its possible the call was still queued when the client disconnected
+        debugs(33, 1, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
         return;
     }
 
     debugs(33, 4, "httpAccept: FD " << newfd << ": accepted");
     fd_note(newfd, "client http connect");
     connState = connStateCreate(&details->peer, &details->me, newfd, s);
 
     typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(33, 5,
                                           Dialer, connState, ConnStateData::connStateClosed);
     comm_add_close_handler(newfd, call);
 
     if (Config.onoff.log_fqdn)
         fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS);
 
     typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
     AsyncCall::Pointer timeoutCall =  JobCallback(33, 5,
                                       TimeoutDialer, connState, ConnStateData::requestTimeout);
     commSetTimeout(newfd, Config.Timeout.read, timeoutCall);
 
@@ -3350,49 +3349,50 @@
         debugs(83, 3, "clientNegotiateSSL: FD " << fd <<
                " client certificate: subject: " <<
                X509_NAME_oneline(X509_get_subject_name(client_cert), 0, 0));
 
         debugs(83, 3, "clientNegotiateSSL: FD " << fd <<
                " client certificate: issuer: " <<
                X509_NAME_oneline(X509_get_issuer_name(client_cert), 0, 0));
 
 
         X509_free(client_cert);
     } else {
         debugs(83, 5, "clientNegotiateSSL: FD " << fd <<
                " has no certificate.");
     }
 
     conn->readSomeData();
 }
 
 /** handle a new HTTPS connection */
 static void
-httpsAccept(int sock, int newfd, ConnectionDetail *details,
-            comm_err_t flag, int xerrno, void *data)
+httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
 {
     https_port_list *s = (https_port_list *)data;
     SSL_CTX *sslContext = s->staticSslContext.get();
 
     if (flag != COMM_OK) {
+        // This should not occur with TcpAcceptor.
+        // However its possible the call was still queued when the client disconnected
         errno = xerrno;
-        debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+        debugs(33, 1, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
         return;
     }
 
     SSL *ssl = NULL;
     if (!(ssl = httpsCreate(newfd, details, sslContext)))
         return;
 
     debugs(33, 5, "httpsAccept: FD " << newfd << " accepted, starting SSL negotiation.");
     fd_note(newfd, "client https connect");
     ConnStateData *connState = connStateCreate(details->peer, details->me,
                                newfd, &s->http);
     typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(33, 5,
                                           Dialer, connState, ConnStateData::connStateClosed);
     comm_add_close_handler(newfd, call);
 
     if (Config.onoff.log_fqdn)
         fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS);
 
     typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
@@ -3605,161 +3605,148 @@
                 !s->staticSslContext && !s->generateHostCertificates) {
             debugs(1, 1, "Will not bump SSL at http_port " <<
                    s->http.s << " due to SSL initialization failure.");
             s->sslBump = 0;
         }
         if (s->sslBump) {
             ++bumpCount;
             // Create ssl_ctx cache for this port.
             Ssl::TheGlobalContextStorage.addLocalStorage(s->s, s->dynamicCertMemCacheSize == std::numeric_limits<size_t>::max() ? 4194304 : s->dynamicCertMemCacheSize);
         }
 #endif
 #if USE_SSL_CRTD
         Ssl::Helper::GetInstance();
 #endif //USE_SSL_CRTD
 
         /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */
 
         const int openFlags = COMM_NONBLOCKING |
                               (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
 
-        AsyncCall::Pointer callback = asyncCall(33,2,
-                                                "clientHttpConnectionOpened",
-                                                ListeningStartedDialer(&clientHttpConnectionOpened, s));
-        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
-                            Ipc::fdnHttpSocket, callback);
+        // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+        typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+        RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
+        Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+        AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
+                                                  ListeningStartedDialer(&clientListenerConnectionOpened, s, false));
+        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall, sub);
 
-        HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
+        HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened
     }
 
 #if USE_SSL
     if (bumpCount && !Config.accessList.ssl_bump)
         debugs(33, 1, "WARNING: http_port(s) with SslBump found, but no " <<
                std::endl << "\tssl_bump ACL configured. No requests will be " <<
                "bumped.");
 #endif
 }
 
 /// process clientHttpConnectionsOpen result
 static void
-clientHttpConnectionOpened(int fd, int, http_port_list *s)
+clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl)
 {
-    if (!OpenedHttpSocket(fd, "Cannot open HTTP Port"))
+    s->listenFd = fd;
+    if (!OpenedHttpSocket(s->listenFd, (uses_ssl?"Cannot open HTTPS Port":"Cannot open HTTP Port")))
         return;
 
     Must(s);
+    Must(s->listenFd >= 0);
 
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
-                                         CommAcceptCbPtrFun(httpAccept, s));
-
-    s->listener = new Comm::ListenStateData(fd, call, true);
-
-    debugs(1, 1, "Accepting " <<
+    debugs(1, 1, "Accepting" <<
            (s->intercepted ? " intercepted" : "") <<
            (s->spoof_client_ip ? " spoofing" : "") <<
            (s->sslBump ? " bumpy" : "") <<
            (s->accel ? " accelerated" : "")
-           << " HTTP connections at " << s->s
-           << ", FD " << fd << "." );
+           << " HTTP" << (uses_ssl?"S":"") << " connections at "
+           << " FD " << s->listenFd << " on " << s->s);
 
-    Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+    Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for
 }
 
 #if USE_SSL
 static void
 clientHttpsConnectionsOpen(void)
 {
     https_port_list *s;
 
     for (s = Config.Sockaddr.https; s; s = (https_port_list *)s->http.next) {
         if (MAXHTTPPORTS == NHttpSockets) {
             debugs(1, 1, "Ignoring 'https_port' lines exceeding the limit.");
             debugs(1, 1, "The limit is " << MAXHTTPPORTS << " HTTPS ports.");
             continue;
         }
 
         if (!s->staticSslContext) {
             debugs(1, 1, "Ignoring https_port " << s->http.s <<
                    " due to SSL initialization failure.");
             continue;
         }
 
-        AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened",
-                                            ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http));
-
-        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING,
-                            Ipc::fdnHttpsSocket, call);
-
-        HttpSockets[NHttpSockets++] = -1;
-    }
-}
-
-/// process clientHttpsConnectionsOpen result
-static void
-clientHttpsConnectionOpened(int fd, int, http_port_list *s)
-{
-    if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port"))
-        return;
-
-    Must(s);
+        const int openFlags = COMM_NONBLOCKING |
+                              (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
 
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
-                                         CommAcceptCbPtrFun(httpsAccept, s));
+        // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS
+        typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+        RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
+        Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
 
-    s->listener = new Comm::ListenStateData(fd, call, true);
+        AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
+                                                  ListeningStartedDialer(&clientListenerConnectionOpened, &s->http, true));
 
-    debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
+        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall, sub);
 
-    Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+        HttpSockets[NHttpSockets++] = -1;
+    }
 }
-
 #endif
 
 void
 clientOpenListenSockets(void)
 {
     clientHttpConnectionsOpen();
 #if USE_SSL
     clientHttpsConnectionsOpen();
 #endif
 
     if (NHttpSockets < 1)
         fatal("No HTTP or HTTPS ports configured");
 }
 
 void
 clientHttpConnectionsClose(void)
 {
     for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
-        if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
-            delete s->listener;
-            s->listener = NULL;
+        if (s->listenFd >= 0) {
+            debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection");
+            comm_close(s->listenFd);
+            s->listenFd = -1;
         }
     }
 
 #if USE_SSL
     for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
-        if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
-            delete s->listener;
-            s->listener = NULL;
+        if (s->listenFd >= 0) {
+            debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection");
+            comm_close(s->listenFd);
+            s->listenFd = -1;
         }
     }
 #endif
 
     // TODO see if we can drop HttpSockets array entirely */
     for (int i = 0; i < NHttpSockets; i++) {
         HttpSockets[i] = -1;
     }
 
     NHttpSockets = 0;
 }
 
 int
 varyEvaluateMatch(StoreEntry * entry, HttpRequest * request)
 {
     const char *vary = request->vary_headers;
     int has_vary = entry->getReply()->header.has(HDR_VARY);
 #if X_ACCELERATOR_VARY
 
     has_vary |=

=== modified file 'src/comm.cc'
--- src/comm.cc	2011-01-10 09:43:43 +0000
+++ src/comm.cc	2011-01-10 12:31:49 +0000
@@ -23,43 +23,43 @@
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  *
  *
  * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
  */
 
 #include "squid.h"
 #include "StoreIOBuffer.h"
 #include "comm.h"
 #include "event.h"
 #include "fde.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
 #include "comm/IoCallback.h"
-#include "comm/Write.h"
-#include "comm/ListenStateData.h"
 #include "comm/Loops.h"
+#include "comm/Write.h"
+#include "comm/TcpAcceptor.h"
 #include "CommIO.h"
 #include "CommRead.h"
 #include "ConnectionDetail.h"
 #include "MemBuf.h"
 #include "pconn.h"
 #include "SquidTime.h"
 #include "CommCalls.h"
 #include "DescriptorSet.h"
 #include "icmp/net_db.h"
 #include "ip/Address.h"
 #include "ip/Intercept.h"
 #include "ip/QosConfig.h"
 #include "ip/tools.h"
 #include "ClientInfo.h"
 #if USE_SSL
 #include "ssl/support.h"
 #endif
 
 #include "cbdata.h"
 #if defined(_SQUID_CYGWIN_)
@@ -127,41 +127,41 @@
 static void commSetReuseAddr(int);
 static void commSetNoLinger(int);
 #ifdef TCP_NODELAY
 static void commSetTcpNoDelay(int);
 #endif
 static void commSetTcpRcvbuf(int, int);
 static PF commConnectFree;
 static IPH commConnectDnsHandle;
 
 typedef enum {
     COMM_CB_READ = 1,
     COMM_CB_DERIVED
 } comm_callback_t;
 
 static MemAllocator *conn_close_pool = NULL;
 fd_debug_t *fdd_table = NULL;
 
 bool
 isOpen(const int fd)
 {
-    return fd_table[fd].flags.open != 0;
+    return fd >= 0 && fd_table[fd].flags.open != 0;
 }
 
 /**
  * Attempt a read
  *
  * If the read attempt succeeds or fails, call the callback.
  * Else, wait for another IO notification.
  */
 void
 commHandleRead(int fd, void *data)
 {
     Comm::IoCallback *ccb = (Comm::IoCallback *) data;
 
     assert(data == COMMIO_FD_READCB(fd));
     assert(ccb->active());
     /* Attempt a read */
     statCounter.syscalls.sock.reads++;
     errno = 0;
     int retval;
     retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);

=== modified file 'src/comm/AcceptLimiter.cc'
--- src/comm/AcceptLimiter.cc	2009-12-31 02:35:01 +0000
+++ src/comm/AcceptLimiter.cc	2011-01-10 00:52:51 +0000
@@ -1,32 +1,51 @@
 #include "config.h"
 #include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
 #include "fde.h"
 
 Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
 
 Comm::AcceptLimiter &Comm::AcceptLimiter::Instance()
 {
     return Instance_;
 }
 
 void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd)
 {
     afd->isLimited++;
     debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
     deferred.push_back(afd);
 }
 
 void
+Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd)
+{
+    for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) {
+        if (deferred[i] == afd) {
+            deferred[i]->isLimited--;
+            deferred[i] = NULL; // fast. kick() will skip empty entries later.
+            debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+        }
+    }
+}
+
+void
 Comm::AcceptLimiter::kick()
 {
+    // TODO: this could be optimized further with an iterator to search
+    //       looking for first non-NULL, followed by dumping the first N
+    //       with only one shift()/pop_front operation
+
     debugs(5, 5, HERE << " size=" << deferred.size());
-    if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
-        debugs(5, 5, HERE << " doing one.");
+    while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
         /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
-        ListenStateData *temp = deferred.shift();
-        temp->isLimited--;
-        temp->acceptNext();
+        TcpAcceptor *temp = deferred.shift();
+        if (temp != NULL) {
+            debugs(5, 5, HERE << " doing one.");
+            temp->isLimited--;
+            temp->acceptNext();
+            break;
+        }
     }
 }

=== modified file 'src/comm/AcceptLimiter.h'
--- src/comm/AcceptLimiter.h	2010-01-13 01:13:17 +0000
+++ src/comm/AcceptLimiter.h	2011-01-08 14:09:20 +0000
@@ -1,42 +1,45 @@
 #ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H
 #define _SQUID_SRC_COMM_ACCEPT_LIMITER_H
 
 #include "Array.h"
 
 namespace Comm
 {
 
-class ListenStateData;
+class TcpAcceptor;
 
 /**
  * FIFO Queue holding listener socket handlers which have been activated
  * ready to dupe their FD and accept() a new client connection.
  * But when doing so there were not enough FD available to handle the
  * new connection. These handlers are awaiting some FD to become free.
  *
  * defer - used only by Comm layer ListenStateData adding themselves when FD are limited.
  * kick - used by Comm layer when FD are closed.
  */
 class AcceptLimiter
 {
 
 public:
     /** retrieve the global instance of the queue. */
     static AcceptLimiter &Instance();
 
     /** delay accepting a new client connection. */
-    void defer(Comm::ListenStateData *afd);
+    void defer(Comm::TcpAcceptor *afd);
+
+    /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */
+    void removeDead(const Comm::TcpAcceptor *afd);
 
     /** try to accept and begin processing any delayed client connections. */
     void kick();
 
 private:
     static AcceptLimiter Instance_;
 
     /** FIFO queue */
-    Vector<Comm::ListenStateData*> deferred;
+    Vector<Comm::TcpAcceptor*> deferred;
 };
 
 }; // namepace Comm
 
 #endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */

=== modified file 'src/comm/Makefile.am'
--- src/comm/Makefile.am	2011-01-10 09:43:43 +0000
+++ src/comm/Makefile.am	2011-01-10 12:32:06 +0000
@@ -1,25 +1,25 @@
 include $(top_srcdir)/src/Common.am
 include $(top_srcdir)/src/TestHeaders.am
 
 noinst_LTLIBRARIES = libcomm.la
 
 ## Library holding comm socket handlers
 libcomm_la_SOURCES= \
 	AcceptLimiter.cc \
 	AcceptLimiter.h \
-	ListenStateData.cc \
-	ListenStateData.h \
 	Loops.h \
 	ModDevPoll.cc \
 	ModEpoll.cc \
 	ModKqueue.cc \
 	ModPoll.cc \
 	ModSelect.cc \
 	ModSelectWin32.cc \
+	TcpAcceptor.cc \
+	TcpAcceptor.h \
 	\
 	IoCallback.cc \
 	IoCallback.h \
 	Write.cc \
 	Write.h \
 	\
 	comm_internal.h

=== renamed file 'src/comm/ListenStateData.cc' => 'src/comm/TcpAcceptor.cc'
--- src/comm/ListenStateData.cc	2011-01-10 09:43:43 +0000
+++ src/comm/TcpAcceptor.cc	2011-01-10 12:33:59 +0000
@@ -16,265 +16,351 @@
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
  *  the Free Software Foundation; either version 2 of the License, or
  *  (at your option) any later version.
  *
  *  This program is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  *
  *
  * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
  */
 
 #include "squid.h"
+#include "base/TextException.h"
 #include "CommCalls.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
 #include "comm/Loops.h"
+#include "comm/TcpAcceptor.h"
 #include "ConnectionDetail.h"
 #include "fde.h"
 #include "protos.h"
 #include "SquidTime.h"
 
+namespace Comm {
+    CBDATA_CLASS_INIT(TcpAcceptor);
+};
+
+Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+                               const char *note, const Subscription::Pointer &aSub) :
+        AsyncJob("Comm::TcpAcceptor"),
+        errcode(0),
+        isLimited(0),
+        theCallSub(aSub),
+        fd(listenFd),
+        local_addr(laddr),
+        newFd_(-1)
+{
+    /* open the conn if its not already open */
+    if (fd < 0) {
+        fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, local_addr, flags, note);
+        errcode = errno;
+
+        if (fd < 0) {
+            debugs(5, DBG_CRITICAL, HERE << "comm_open failed: FD " << fd << ", " << local_addr << " error: " << errcode);
+            return;
+        }
+        debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd << ", " << local_addr);
+    }
+}
+
+void
+Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub)
+{
+    debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << aSub);
+    unsubscribe("subscription change");
+    theCallSub = aSub;
+}
+
+void
+Comm::TcpAcceptor::unsubscribe(const char *reason)
+{
+    debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+    theCallSub = NULL;
+}
+
+void
+Comm::TcpAcceptor::start()
+{
+    debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << theCallSub);
+
+    Must(isOpen(fd));
+
+    setListen();
+
+    // if no error so far start accepting connections.
+    if (errcode == 0)
+        SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::TcpAcceptor::doneAll() const
+{
+    // stop when FD is closed
+    if (!isOpen(fd)) {
+        return AsyncJob::doneAll();
+    }
+
+    // stop when handlers are gone
+    if (theCallSub == NULL) {
+        return AsyncJob::doneAll();
+    }
+
+    // open FD with handlers...keep accepting.
+    return false;
+}
+
+void
+Comm::TcpAcceptor::swanSong()
+{
+    debugs(5,5, HERE);
+    unsubscribe("swanSong");
+    fd = -1;
+    AcceptLimiter::Instance().removeDead(this);
+    AsyncJob::swanSong();
+}
+
 /**
  * New-style listen and accept routines
  *
  * setListen simply registers our interest in an FD for listening.
  * The constructor takes a callback to call when an FD has been
  * accept()ed some time later.
  */
 void
-Comm::ListenStateData::setListen()
+Comm::TcpAcceptor::setListen()
 {
     errcode = 0; // reset local errno copy.
     if (listen(fd, Squid_MaxFD >> 2) < 0) {
-        debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+        debugs(50, DBG_CRITICAL, "ERROR: listen(FD " << fd << ", " << local_addr << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
         errcode = errno;
         return;
     }
 
     if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
 #ifdef SO_ACCEPTFILTER
         struct accept_filter_arg afa;
         bzero(&afa, sizeof(afa));
         debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
         xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
         if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
-            debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+            debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
 #elif defined(TCP_DEFER_ACCEPT)
         int seconds = 30;
         if (strncmp(Config.accept_filter, "data=", 5) == 0)
             seconds = atoi(Config.accept_filter + 5);
         if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
-            debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+            debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
 #else
-        debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+        debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
 #endif
     }
 }
 
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
-        fd(aFd),
-        theCallback(call),
-        mayAcceptMore(accept_many)
-{
-    assert(aFd >= 0);
-    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
-    assert(isOpen(aFd));
-    setListen();
-    SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-Comm::ListenStateData::~ListenStateData()
-{
-    comm_close(fd);
-    fd = -1;
-}
-
 /**
  * This private callback is called whenever a filedescriptor is ready
  * to dupe itself and fob off an accept()ed connection
  *
  * It will either do that accept operation. Or if there are not enough FD
  * available to do the clone safely will push the listening FD into a list
  * of deferred operations. The list gets kicked and the dupe/accept() actually
  * done later when enough sockets become available.
  */
 void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::TcpAcceptor::doAccept(int fd, void *data)
 {
-    debugs(5, 2, HERE << "New connection on FD " << fd);
+    try {
+        debugs(5, 2, HERE << "New connection on FD " << fd);
 
-    assert(isOpen(fd));
-    ListenStateData *afd = static_cast<ListenStateData*>(data);
+        Must(isOpen(fd));
+        TcpAcceptor *afd = static_cast<TcpAcceptor*>(data);
 
-    if (!okToAccept()) {
-        AcceptLimiter::Instance().defer(afd);
-    } else {
-        afd->acceptNext();
+        if (!okToAccept()) {
+            AcceptLimiter::Instance().defer(afd);
+        } else {
+            afd->acceptNext();
+        }
+        SetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0);
+
+    } catch(const TextException &e) {
+        fatalf("FATAL: error while accepting new client connection: %s\n", e.message);
+    } catch(...) {
+        fatal("FATAL: error while accepting new client connection: [unkown]\n");
     }
-    SetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
 }
 
 bool
-Comm::ListenStateData::okToAccept()
+Comm::TcpAcceptor::okToAccept()
 {
     static time_t last_warn = 0;
 
     if (fdNFree() >= RESERVED_FD)
         return true;
 
     if (last_warn + 15 < squid_curtime) {
         debugs(5, DBG_CRITICAL, "WARNING! Your cache is running out of filedescriptors");
         last_warn = squid_curtime;
     }
 
     return false;
 }
 
 void
-Comm::ListenStateData::acceptOne()
+Comm::TcpAcceptor::acceptOne()
 {
     /*
      * We don't worry about running low on FDs here.  Instead,
      * doAccept() will use AcceptLimiter if we reach the limit
      * there.
      */
 
     /* Accept a new connection */
-    ConnectionDetail connDetails;
-    int newfd = oldAccept(connDetails);
+    ConnectionDetail newConnDetails;
+    comm_err_t status = oldAccept(newConnDetails);
 
     /* Check for errors */
-    if (newfd < 0) {
+    if (!isOpen(newFd_)) {
 
-        if (newfd == COMM_NOMESSAGE) {
+        if (status == COMM_NOMESSAGE) {
             /* register interest again */
-            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
+            debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub);
             SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
             return;
         }
 
         // A non-recoverable error; notify the caller */
-        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
-        notify(-1, COMM_ERROR, connDetails);
-        mayAcceptMore = false;
+        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << ", " << local_addr << " handler Subscription: " << theCallSub);
+        notify(status, newConnDetails);
+        mustStop("Listener socket closed");
         return;
     }
 
-    debugs(5, 5, HERE << "accepted: FD " << fd <<
-           " newfd: " << newfd << " from: " << connDetails.peer <<
-           " handler: " << theCallback);
-    notify(newfd, COMM_OK, connDetails);
+    debugs(5, 5, HERE << "Listener: FD " << fd <<
+           " accepted new connection from " << newConnDetails.peer <<
+           " handler Subscription: " << theCallSub);
+    notify(status, newConnDetails);
 }
 
 void
-Comm::ListenStateData::acceptNext()
+Comm::TcpAcceptor::acceptNext()
 {
-    assert(isOpen(fd));
+    Must(isOpen(fd));
     debugs(5, 2, HERE << "connection on FD " << fd);
     acceptOne();
 }
 
+// XXX: obsolete comment?
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
 void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails)
+Comm::TcpAcceptor::notify(comm_err_t flag, const ConnectionDetail &connDetails)
 {
     // listener socket handlers just abandon the port with COMM_ERR_CLOSING
     // it should only happen when this object is deleted...
     if (flag == COMM_ERR_CLOSING) {
         return;
     }
 
-    if (theCallback != NULL) {
-        typedef CommAcceptCbParams Params;
-        Params &params = GetCommParams<Params>(theCallback);
+    if (theCallSub != NULL) {
+        AsyncCall::Pointer call = theCallSub->callback();
+        CommAcceptCbParams &params = GetCommParams<CommAcceptCbParams>(call);
         params.fd = fd;
-        params.nfd = newfd;
+        params.nfd = newFd_;
         params.details = connDetails;
         params.flag = flag;
         params.xerrno = errcode;
-        ScheduleCallHere(theCallback);
-        if (!mayAcceptMore)
-            theCallback = NULL;
+        ScheduleCallHere(call);
     }
+
+    // drop the temporary recent accepted socket FD details.
+    // this prevents information crossover on calls.
+    newFd_ = -1;
 }
 
 /**
  * accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK         success. details parameter filled.
+ * \retval COMM_NOMESSAGE  attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR      an outright failure occured.
+ *                         Or if this client has too many connections already.
  */
-int
-Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+comm_err_t
+Comm::TcpAcceptor::oldAccept(ConnectionDetail &details)
 {
     PROF_start(comm_accept);
     statCounter.syscalls.sock.accepts++;
     int sock;
     struct addrinfo *gai = NULL;
     details.me.InitAddrInfo(gai);
 
     errcode = 0; // reset local errno copy.
     if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
         errcode = errno; // store last accept errno locally.
 
         details.me.FreeAddrInfo(gai);
 
         PROF_stop(comm_accept);
 
         if (ignoreErrno(errno)) {
             debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
             return COMM_NOMESSAGE;
         } else if (ENFILE == errno || EMFILE == errno) {
             debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
             return COMM_ERROR;
         } else {
             debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
             return COMM_ERROR;
         }
     }
 
+    Must(sock >= 0);
+    newFd_ = sock;
     details.peer = *gai;
 
     if ( Config.client_ip_max_connections >= 0) {
         if (clientdbEstablished(details.peer, 0) > Config.client_ip_max_connections) {
             debugs(50, DBG_IMPORTANT, "WARNING: " << details.peer << " attempting more than " << Config.client_ip_max_connections << " connections.");
             details.me.FreeAddrInfo(gai);
             return COMM_ERROR;
         }
     }
 
+    // lookup the local-end details of this new connection
     details.me.InitAddrInfo(gai);
-
     details.me.SetEmpty();
     getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
     details.me = *gai;
-
-    commSetCloseOnExec(sock);
+    details.me.FreeAddrInfo(gai);
 
     /* fdstat update */
+    // XXX : these are not all HTTP requests. use a note about type and ip:port details->
+    // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
     fd_open(sock, FD_SOCKET, "HTTP Request");
 
     fdd_table[sock].close_file = NULL;
     fdd_table[sock].close_line = 0;
 
     fde *F = &fd_table[sock];
     details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
     F->remote_port = details.peer.GetPort();
-    F->local_addr.SetPort(details.me.GetPort());
+    F->local_addr = details.me;
     F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET;
-    details.me.FreeAddrInfo(gai);
 
+    // set socket flags
+    commSetCloseOnExec(sock);
     commSetNonBlocking(sock);
 
     /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
     F->flags.transparent = fd_table[fd].flags.transparent;
 
     PROF_stop(comm_accept);
-    return sock;
+    return COMM_OK;
 }

=== renamed file 'src/comm/ListenStateData.h' => 'src/comm/TcpAcceptor.h'
--- src/comm/ListenStateData.h	2010-11-27 01:58:38 +0000
+++ src/comm/TcpAcceptor.h	2011-01-09 00:24:09 +0000
@@ -1,54 +1,102 @@
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
+#ifndef SQUID_COMM_TCPACCEPTOR_H
+#define SQUID_COMM_TCPACCEPTOR_H
 
 #include "base/AsyncCall.h"
-#include "comm.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
+#include "comm_err_t.h"
+#include "comm/TcpAcceptor.h"
+#include "ip/Address.h"
+
 #if HAVE_MAP
 #include <map>
 #endif
 
-class ConnectionDetail;
-
 namespace Comm
 {
 
-class ListenStateData
+class AcceptLimiter;
+
+/**
+ * Listens on an FD for new incoming connections and
+ * emits an active FD descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class TcpAcceptor : public AsyncJob
 {
+private:
+    virtual void start();
+    virtual bool doneAll() const;
+    virtual void swanSong();
 
 public:
-    ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
-    ListenStateData(const ListenStateData &r); // not implemented.
-    ~ListenStateData();
+    TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+                const char *note, const Subscription::Pointer &aSub);
 
-    void subscribe(AsyncCall::Pointer &call);
+    TcpAcceptor(const TcpAcceptor &r); // not implemented.
+
+    /** Subscribe a handler to receive calls back about new connections.
+     * Replaces any existing subscribed handler.
+     */
+    void subscribe(const Subscription::Pointer &aSub);
+
+    /** Remove the currently waiting callback subscription.
+     * Pending calls will remain scheduled.
+     */
+    void unsubscribe(const char *reason);
+
+    /** Try and accept another connection (synchronous).
+     * If one is pending already the subscribed callback handler will be scheduled
+     * to handle it before this method returns.
+     */
     void acceptNext();
-    void notify(int newfd, comm_err_t flag, const ConnectionDetail &details);
 
-    int fd;
+    /// Call the subscribed callback handler with details about a new connection.
+    void notify(comm_err_t flags, const ConnectionDetail &newConnDetails);
 
     /// errno code of the last accept() or listen() action if one occurred.
     int errcode;
 
-    /// whether this socket is delayed and on the AcceptLimiter queue.
-    int32_t isLimited;
+private:
+    friend class AcceptLimiter;
+    int32_t isLimited;                   ///< whether this socket is delayed and on the AcceptLimiter queue.
+    Subscription::Pointer theCallSub;    ///< used to generate AsyncCalls handling our events.
+
+public:
+    /// conn being listened on for new connections
+    /// Reserved for read-only use.
+    // NP: public only until we can hide it behind connection handles
+    int fd;
+
+private:
+    /// IP Address and port being listened on
+    Ip::Address local_addr;
+
+    /// temporary holder for newely accepted client FD
+    int newFd_;
 
 private:
-    /// Method to test if there are enough file escriptors to open a new client connection
+    /// Method to test if there are enough file descriptors to open a new client connection
     /// if not the accept() will be postponed
     static bool okToAccept();
 
     /// Method callback for whenever an FD is ready to accept a client connection.
     static void doAccept(int fd, void *data);
 
     void acceptOne();
-    int oldAccept(ConnectionDetail &details);
-
-    AsyncCall::Pointer theCallback;
-    bool mayAcceptMore;
-
+    comm_err_t oldAccept(ConnectionDetail &newConnDetails);
     void setListen();
+
+    CBDATA_CLASS2(TcpAcceptor);
 };
 
 } // namespace Comm
 
-#endif /* SQUID_LISTENERSTATEDATA_H */
+#endif /* SQUID_COMM_TCPACCEPTOR_H */

=== modified file 'src/ftp.cc'
--- src/ftp.cc	2011-01-14 14:10:21 +0000
+++ src/ftp.cc	2011-01-15 02:33:05 +0000
@@ -17,42 +17,43 @@
  *  sources; see the CREDITS file for full details.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
  *  the Free Software Foundation; either version 2 of the License, or
  *  (at your option) any later version.
  *
  *  This program is distributed in the hope that it will be useful,
  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  *  GNU General Public License for more details.
  *
  *  You should have received a copy of the GNU General Public License
  *  along with this program; if not, write to the Free Software
  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  *
  */
 
 #include "squid.h"
 #include "comm.h"
+#include "CommCalls.h"
+#include "comm/TcpAcceptor.h"
 #include "comm/Write.h"
-#include "comm/ListenStateData.h"
 #include "compat/strtoll.h"
 #include "ConnectionDetail.h"
 #include "errorpage.h"
 #include "fde.h"
 #include "forward.h"
 #include "html_quote.h"
 #include "HttpHdrContRange.h"
 #include "HttpHeaderRange.h"
 #include "HttpHeader.h"
 #include "HttpRequest.h"
 #include "HttpReply.h"
 #include "ip/tools.h"
 #include "MemBuf.h"
 #include "rfc1738.h"
 #include "Server.h"
 #include "SquidString.h"
 #include "SquidTime.h"
 #include "Store.h"
 #include "URLScheme.h"
 #include "wordlist.h"
@@ -136,47 +137,45 @@
 /// \ingroup ServerProtocolFTPInternal
 typedef void (FTPSM) (FtpStateData *);
 
 /// common code for FTP control and data channels
 // does not own the channel descriptor, which is managed by FtpStateData
 class FtpChannel
 {
 public:
     FtpChannel(): fd(-1) {}
 
     /// called after the socket is opened, sets up close handler
     void opened(int aFd, const AsyncCall::Pointer &aCloser);
 
     /** Handles all operations needed to properly close the active channel FD.
      * clearing the close handler, clearing the listen socket properly, and calling comm_close
      */
     void close();
 
     void clear(); /// just resets fd and close handler. does not close active connections.
 
-    int fd; /// channel descriptor; \todo: remove because the closer has it
+    int fd; /// channel descriptor
 
-    /** Current listening socket handler. delete on shutdown or abort.
-     * FTP stores a copy of the FD in the field fd above.
-     * Use close() to properly close the channel.
-     */
-    Comm::ListenStateData *listener;
+    Ip::Address local; ///< The local IP address:port this channel is using
+
+    int flags; ///< socket flags used when opening.
 
 private:
     AsyncCall::Pointer closer; /// Comm close handler callback
 };
 
 /// \ingroup ServerProtocolFTPInternal
 class FtpStateData : public ServerStateData
 {
 
 public:
     void *operator new (size_t);
     void operator delete (void *);
     void *toCbdata() { return this; }
 
     FtpStateData(FwdState *);
     ~FtpStateData();
     char user[MAX_URL];
     char password[MAX_URL];
     int password_url;
     char *reply_hdr;
@@ -228,40 +227,46 @@
     CBDATA_CLASS(FtpStateData);
 
 public:
     // these should all be private
     void start();
     void loginParser(const char *, int escaped);
     int restartable();
     void appendSuccessHeader();
     void hackShortcut(FTPSM * nextState);
     void failed(err_type, int xerrno);
     void failedErrorMessage(err_type, int xerrno);
     void unhack();
     void scheduleReadControlReply(int);
     void handleControlReply();
     void readStor();
     void parseListing();
     MemBuf *htmlifyListEntry(const char *line);
     void completedListing(void);
     void dataComplete();
     void dataRead(const CommIoCbParams &io);
+
+    /// ignore timeout on CTRL channel. set read timeout on DATA channel.
+    void switchTimeoutToDataChannel();
+    /// create a data channel acceptor and start listening.
+    void listenForDataChannel(const int fd, const char *note);
+
     int checkAuth(const HttpHeader * req_hdr);
     void checkUrlpath();
     void buildTitleUrl();
     void writeReplyBody(const char *, size_t len);
     void printfReplyBody(const char *fmt, ...);
     virtual int dataDescriptor() const;
     virtual void maybeReadVirginBody();
     virtual void closeServer();
     virtual void completeForwarding();
     virtual void abortTransaction(const char *reason);
     void processHeadResponse();
     void processReplyBody();
     void writeCommand(const char *buf);
     void setCurrentOffset(int64_t offset) { currentOffset = offset; }
     int64_t getCurrentOffset() const { return currentOffset; }
 
     static CNCB ftpPasvCallback;
     static PF ftpDataWrite;
     void ftpTimeout(const CommTimeoutCbParams &io);
     void ctrlClosed(const CommCloseCbParams &io);
@@ -426,52 +431,53 @@
     ftpReadEPSV,		/* SENT_EPSV_ALL */
     ftpReadEPSV,		/* SENT_EPSV_1 */
     ftpReadEPSV,		/* SENT_EPSV_2 */
     ftpReadPasv,		/* SENT_PASV */
     ftpReadCwd,		/* SENT_CWD */
     ftpReadList,		/* SENT_LIST */
     ftpReadList,		/* SENT_NLST */
     ftpReadRest,		/* SENT_REST */
     ftpReadRetr,		/* SENT_RETR */
     ftpReadStor,		/* SENT_STOR */
     ftpReadQuit,		/* SENT_QUIT */
     ftpReadTransferDone,	/* READING_DATA (RETR,LIST,NLST) */
     ftpWriteTransferDone,	/* WRITING_DATA (STOR) */
     ftpReadMkdir		/* SENT_MKDIR */
 };
 
 /// handler called by Comm when FTP control channel is closed unexpectedly
 void
 FtpStateData::ctrlClosed(const CommCloseCbParams &io)
 {
+    debugs(9, 4, HERE);
     ctrl.clear();
     deleteThis("FtpStateData::ctrlClosed");
 }
 
 /// handler called by Comm when FTP data channel is closed unexpectedly
 void
 FtpStateData::dataClosed(const CommCloseCbParams &io)
 {
-    if (data.listener) {
-        delete data.listener;
-        data.listener = NULL;
-        data.fd = -1;
+    debugs(9, 4, HERE);
+    if (data.fd >= 0) {
+        comm_close(data.fd);
+        // NP clear() does the: data.fd = -1;
     }
     data.clear();
     failed(ERR_FTP_FAILURE, 0);
     /* failed closes ctrl.fd and frees ftpState */
 
     /* NP: failure recovery may be possible when its only a data.fd failure.
      *     is the ctrl.fd is still fine, we can send ABOR down it and retry.
      *     Just need to watch out for wider Squid states like shutting down or reconfigure.
      */
 }
 
 FtpStateData::FtpStateData(FwdState *theFwdState) : AsyncJob("FtpStateData"), ServerStateData(theFwdState)
 {
     const char *url = entry->url();
     debugs(9, 3, HERE << "'" << url << "'" );
     statCounter.server.all.requests++;
     statCounter.server.ftp.requests++;
     theSize = -1;
     mdtm = -1;
 
@@ -589,40 +595,67 @@
                 rfc1738_unescape(password);
                 password_url = 1;
             }
             debugs(9, 9, HERE << ": found password='" << password << "'(" << len <<") unescaped.");
         }
     } else if (login[0]) {
         /* no password, just username */
         if (total_len > MAX_URL)
             total_len = MAX_URL -1;
         xstrncpy(user, login, total_len +1);
         debugs(9, 9, HERE << ": found user='" << user << "'(" << total_len <<"), escaped=" << escaped);
         if (escaped)
             rfc1738_unescape(user);
         debugs(9, 9, HERE << ": found user='" << user << "'(" << total_len <<") unescaped.");
     }
 
     debugs(9, 9, HERE << ": OUT: login='" << login << "', escaped=" << escaped << ", user=" << user << ", password=" << password);
 }
 
 void
+FtpStateData::switchTimeoutToDataChannel()
+{
+    commSetTimeout(ctrl.fd, -1, NULL, NULL);
+
+    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout);
+    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
+}
+
+void
+FtpStateData::listenForDataChannel(const int fd, const char *note)
+{
+    typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> AcceptDialer;
+    typedef AsyncCallT<AcceptDialer> AcceptCall;
+    RefCount<AcceptCall> call = static_cast<AcceptCall*>(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection));
+    Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
+    Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(fd, data.local, data.flags, note, sub);
+
+    // Ensure we have a copy of the FD opened for listening and a close handler on it.
+    assert(data.fd == -1 || data.fd == tmp->fd);
+    data.fd = -1;
+    data.opened(tmp->fd, dataCloser());
+
+    AsyncJob::Start(tmp);
+}
+
+void
 FtpStateData::ftpTimeout(const CommTimeoutCbParams &io)
 {
     debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" );
 
     if (SENT_PASV == state && io.fd == data.fd) {
         /* stupid ftp.netscape.com */
         fwd->dontRetry(false);
         fwd->ftpPasvFailed(true);
         debugs(9, DBG_IMPORTANT, "ftpTimeout: timeout in SENT_PASV state" );
     }
 
     failed(ERR_READ_TIMEOUT, 0);
     /* failed() closes ctrl.fd and frees ftpState */
 }
 
 #if DEAD_CODE // obsoleted by ERR_DIR_LISTING
 void
 FtpStateData::listingFinish()
 {
     // TODO: figure out what this means and how to show it ...
@@ -1049,44 +1082,50 @@
     size_t usable;
     size_t len = data.readBuf->contentSize();
 
     if (!len) {
         debugs(9, 3, HERE << "no content to parse for " << entry->url()  );
         return;
     }
 
     /*
      * We need a NULL-terminated buffer for scanning, ick
      */
     sbuf = (char *)xmalloc(len + 1);
     xstrncpy(sbuf, buf, len + 1);
     end = sbuf + len - 1;
 
     while (*end != '\r' && *end != '\n' && end > sbuf)
         end--;
 
     usable = end - sbuf;
 
-    debugs(9, 3, HERE << "usable = " << usable);
+    debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes.");
 
     if (usable == 0) {
-        debugs(9, 3, HERE << "didn't find end for " << entry->url()  );
+        if (buf[0] == '\0' && len == 1) {
+            debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?");
+            data.readBuf->consume(len);
+        } else {
+            debugs(9, 3, HERE << "didn't find end for " << entry->url());
+            debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << rfc1738_do_escape(buf,0) << "'");
+        }
         xfree(sbuf);
         return;
     }
 
     debugs(9, 3, HERE << (unsigned long int)len << " bytes to play with");
 
     line = (char *)memAllocate(MEM_4K_BUF);
     end++;
     s = sbuf;
     s += strspn(s, crlf);
 
     for (; s < end; s += strcspn(s, crlf), s += strspn(s, crlf)) {
         debugs(9, 7, HERE << "s = {" << s << "}");
         linelen = strcspn(s, crlf) + 1;
 
         if (linelen < 2)
             break;
 
         if (linelen > 4096)
             linelen = 4096;
@@ -1121,41 +1160,48 @@
 
 void
 FtpStateData::dataComplete()
 {
     debugs(9, 3,HERE);
 
     /* Connection closed; transfer done. */
 
     /// Close data channel, if any, to conserve resources while we wait.
     data.close();
 
     /* expect the "transfer complete" message on the control socket */
     /*
      * DPW 2007-04-23
      * Previously, this was the only place where we set the
      * 'buffered_ok' flag when calling scheduleReadControlReply().
      * It caused some problems if the FTP server returns an unexpected
      * status code after the data command.  FtpStateData was being
      * deleted in the middle of dataRead().
      */
-    scheduleReadControlReply(0);
+//    scheduleReadControlReply(0);
+    /* AYJ: 2011-01-13: 226 status possibly waiting in the ctrl buffer.
+     * The connection will hang is we DONT send buffered_ok.
+     * This happens on all transfers which can be completly sent by the
+     * server before the 150 started status message is read in by Squid.
+     * ie all transfers of about one packet hang.
+     */
+    scheduleReadControlReply(1);
 }
 
 void
 FtpStateData::maybeReadVirginBody()
 {
     if (data.fd < 0)
         return;
 
     if (data.read_pending)
         return;
 
     const int read_sz = replyBodySpace(*data.readBuf, 0);
 
     debugs(11,9, HERE << "FTP may read up to " << read_sz << " bytes");
 
     if (read_sz < 2)	// see http.cc
         return;
 
     data.read_pending = true;
 
@@ -1657,41 +1703,41 @@
  */
 void
 FtpStateData::scheduleReadControlReply(int buffered_ok)
 {
     debugs(9, 3, HERE << "FD " << ctrl.fd);
 
     if (buffered_ok && ctrl.offset > 0) {
         /* We've already read some reply data */
         handleControlReply();
     } else {
         /* XXX What about Config.Timeout.read? */
         typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
         AsyncCall::Pointer reader = JobCallback(9, 5,
                                                 Dialer, this, FtpStateData::ftpReadControlReply);
         comm_read(ctrl.fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader);
         /*
          * Cancel the timeout on the Data socket (if any) and
          * establish one on the control socket.
          */
 
-        if (data.fd > -1) {
+        if (data.fd >= 0) {
             AsyncCall::Pointer nullCall =  NULL;
             commSetTimeout(data.fd, -1, nullCall);
         }
 
         typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
         AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
                                          TimeoutDialer, this, FtpStateData::ftpTimeout);
 
         commSetTimeout(ctrl.fd, Config.Timeout.read, timeoutCall);
     }
 }
 
 void FtpStateData::ftpReadControlReply(const CommIoCbParams &io)
 {
     debugs(9, 3, "ftpReadControlReply: FD " << io.fd << ", Read " << io.size << " bytes");
 
     if (io.size > 0) {
         kb_incr(&statCounter.server.all.kbytes_in, io.size);
         kb_incr(&statCounter.server.ftp.kbytes_in, io.size);
     }
@@ -2705,104 +2751,78 @@
     FtpStateData *ftpState = (FtpStateData *)data;
     debugs(9, 3, HERE);
     ftpState->request->recordLookup(dns);
 
     if (status != COMM_OK) {
         debugs(9, 2, HERE << "Failed to connect. Retrying without PASV.");
         ftpState->fwd->dontRetry(false);	/* this is a retryable error */
         ftpState->fwd->ftpPasvFailed(true);
         ftpState->failed(ERR_NONE, 0);
         /* failed closes ctrl.fd and frees ftpState */
         return;
     }
 
     ftpRestOrList(ftpState);
 }
 
 /// \ingroup ServerProtocolFTPInternal
 static int
 ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
 {
-    int fd;
-    Ip::Address addr;
     struct addrinfo *AI = NULL;
-    int on = 1;
     int x = 0;
 
     /// Close old data channels, if any. We may open a new one below.
-    ftpState->data.close();
+    if (!(ftpState->data.flags & COMM_REUSEADDR))
+        ftpState->data.close();
 
     /*
      * Set up a listen socket on the same local address as the
      * control connection.
      */
-
-    addr.InitAddrInfo(AI);
-
+    ftpState->data.local.InitAddrInfo(AI);
     x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen);
-
-    addr = *AI;
-
-    addr.FreeAddrInfo(AI);
+    ftpState->data.local = *AI;
+    ftpState->data.local.FreeAddrInfo(AI);
 
     if (x) {
         debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror());
         return -1;
     }
 
     /*
      * REUSEADDR is needed in fallback mode, since the same port is
      * used for both control and data.
      */
     if (fallback) {
+        int on = 1;
         setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on));
+        ftpState->ctrl.flags |= COMM_REUSEADDR;
+        ftpState->data.flags |= COMM_REUSEADDR;
     } else {
         /* if not running in fallback mode a new port needs to be retrieved */
-        addr.SetPort(0);
-    }
-
-    fd = comm_open(SOCK_STREAM,
-                   IPPROTO_TCP,
-                   addr,
-                   COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0),
-                   ftpState->entry->url());
-    debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd  );
-
-    if (fd < 0) {
-        debugs(9, DBG_CRITICAL, HERE << "comm_open failed");
-        return -1;
+        ftpState->data.local.SetPort(0);
+        ftpState->data.flags = COMM_NONBLOCKING;
     }
 
-    typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-    AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                    acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-    ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false);
-
-    if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
-        comm_close(fd);
-        return -1;
-    }
-
-    ftpState->data.opened(fd, ftpState->dataCloser());
-    ftpState->data.port = comm_local_port(fd);
-    ftpState->data.host = NULL;
-    return fd;
+    ftpState->listenForDataChannel((fallback?ftpState->ctrl.fd:-1), ftpState->entry->url());
+    return ftpState->data.fd;
 }
 
 /// \ingroup ServerProtocolFTPInternal
 static void
 ftpSendPORT(FtpStateData * ftpState)
 {
     int fd;
     Ip::Address ipa;
     struct addrinfo *AI = NULL;
     unsigned char *addrptr;
     unsigned char *portptr;
 
     /* check the server control channel is still available */
     if (!ftpState || !ftpState->haveControlChannel("ftpSendPort"))
         return;
 
     if (Config.Ftp.epsv_all && ftpState->flags.epsv_all_sent) {
         debugs(9, DBG_IMPORTANT, "FTP does not allow PORT method after 'EPSV ALL' has been sent.");
         return;
     }
@@ -2864,40 +2884,41 @@
     if (Config.Ftp.epsv_all && ftpState->flags.epsv_all_sent) {
         debugs(9, DBG_IMPORTANT, "FTP does not allow EPRT method after 'EPSV ALL' has been sent.");
         return;
     }
 
     if (!Config.Ftp.eprt) {
         /* Disabled. Switch immediately to attempting old PORT command. */
         debugs(9, 3, "EPRT disabled by local administrator");
         ftpSendPORT(ftpState);
         return;
     }
 
     int fd;
     Ip::Address addr;
     struct addrinfo *AI = NULL;
     char buf[MAX_IPSTRLEN];
 
     debugs(9, 3, HERE);
     ftpState->flags.pasv_supported = 0;
     fd = ftpOpenListenSocket(ftpState, 0);
+    debugs(9, 3, "Listening for FTP data connection with FD " << fd);
 
     Ip::Address::InitAddrInfo(AI);
 
     if (getsockname(fd, AI->ai_addr, &AI->ai_addrlen)) {
         Ip::Address::FreeAddrInfo(AI);
         debugs(9, DBG_CRITICAL, HERE << "getsockname(" << fd << ",..): " << xstrerror());
 
         /* XXX Need to set error message */
         ftpFail(ftpState);
         return;
     }
 
     addr = *AI;
 
     /* RFC 2428 defines EPRT as IPv6 equivalent to IPv4 PORT command. */
     /* Which can be used by EITHER protocol. */
     snprintf(cbuf, 1024, "EPRT |%d|%s|%d|\r\n",
              ( addr.IsIPv6() ? 2 : 1 ),
              addr.NtoA(buf,MAX_IPSTRLEN),
              addr.GetPort() );
@@ -2916,111 +2937,102 @@
 
     if (code != 200) {
         /* Failover to attempting old PORT command. */
         debugs(9, 3, "EPRT not supported by remote end");
         ftpSendPORT(ftpState);
         return;
     }
 
     ftpRestOrList(ftpState);
 }
 
 /**
  \ingroup ServerProtocolFTPInternal
  \par
  * "read" handler to accept FTP data connections.
  *
  \param io    comm accept(2) callback parameters
  */
 void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
 {
-    char ntoapeer[MAX_IPSTRLEN];
-    debugs(9, 3, "ftpAcceptDataConnection");
-
-    // one connection accepted. the handler has stopped listening. drop our local pointer to it.
-    data.listener = NULL;
+    debugs(9, 3, HERE);
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         abortTransaction("entry aborted when accepting data conn");
         return;
     }
 
+    if (io.flag != COMM_OK) {
+        data.close();
+        debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno));
+        /** \todo Need to send error message on control channel*/
+        ftpFail(this);
+        return;
+    }
+
+    /* data listening conn is no longer even open. abort. */
+    if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) {
+        data.clear(); // ensure that it's cleared and not just closed.
+        return;
+    }
+
     /** \par
      * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being
      * made by the remote client which is connected to the FTP control socket.
+     * Or the one which we were told to listen for by control channel messages (may differ under NAT).
      * This prevents third-party hacks, but also third-party load balancing handshakes.
      */
     if (Config.Ftp.sanitycheck) {
+        char ntoapeer[MAX_IPSTRLEN];
         io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN);
 
-        if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) {
+        if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 &&
+                strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) {
             debugs(9, DBG_IMPORTANT,
                    "FTP data connection from unexpected server (" <<
                    io.details.peer << "), expecting " <<
-                   fd_table[ctrl.fd].ipaddr);
+                   fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr);
 
-            /* close the bad soures connection down ASAP. */
+            /* close the bad sources connection down ASAP. */
             comm_close(io.nfd);
 
-            /* we are ony accepting once, so need to re-open the listener socket. */
-            typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-            AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                            acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-            data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+            /* drop the bad connection (io) by ignoring the attempt. */
             return;
         }
     }
 
-    if (io.flag != COMM_OK) {
-        debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno));
-        /** \todo XXX Need to set error message */
-        ftpFail(this);
-        return;
-    }
-
     /**\par
-     * Replace the Listen socket with the accepted data socket */
+     * Replace the Listening socket with the accepted data socket */
     data.close();
     data.opened(io.nfd, dataCloser());
     data.port = io.details.peer.GetPort();
-    io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN);
+    data.host = xstrdup(fd_table[io.nfd].ipaddr);
 
     debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " <<
            "FD " << io.nfd << " to " << io.details.peer << " FD table says: " <<
            "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " <<
            "data-peer= " << fd_table[data.fd].ipaddr);
 
+    assert(haveControlChannel("ftpAcceptDataConnection"));
+    assert(ctrl.message == NULL);
 
-    AsyncCall::Pointer nullCall = NULL;
-    commSetTimeout(ctrl.fd, -1, nullCall);
-
-    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-    AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                      TimeoutDialer, this, FtpStateData::ftpTimeout);
-    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
-    /*\todo XXX We should have a flag to track connect state...
-     *    host NULL -> not connected, port == local port
-     *    host set  -> connected, port == remote port
-     */
-    /* Restart state (SENT_NLST/LIST/RETR) */
-    FTP_SM_FUNCS[state] (this);
+    // Ctrl channel operations will determine what happens to this data connection
 }
 
 /// \ingroup ServerProtocolFTPInternal
 static void
 ftpRestOrList(FtpStateData * ftpState)
 {
     debugs(9, 3, HERE);
 
     if (ftpState->typecode == 'D') {
         ftpState->flags.isdir = 1;
 
         if (ftpState->flags.put) {
             ftpSendMkdir(ftpState);	/* PUT name;type=d */
         } else {
             ftpSendNlst(ftpState);	/* GET name;type=d  sec 3.2.2 of RFC 1738 */
         }
     } else if (ftpState->flags.put) {
         ftpSendStor(ftpState);
     } else if (ftpState->flags.isdir)
         ftpSendList(ftpState);
@@ -3058,68 +3070,52 @@
 
 /// \ingroup ServerProtocolFTPInternal
 /// \deprecated use ftpState->readStor() instead.
 static void
 ftpReadStor(FtpStateData * ftpState)
 {
     ftpState->readStor();
 }
 
 void FtpStateData::readStor()
 {
     int code = ctrl.replycode;
     debugs(9, 3, HERE);
 
     if (code == 125 || (code == 150 && data.host)) {
         if (!startRequestBodyFlow()) { // register to receive body data
             ftpFail(this);
             return;
         }
 
-        /*\par
-         * When client status is 125, or 150 without a hostname, Begin data transfer. */
+        /* When client status is 125, or 150 without a hostname, Begin data transfer. */
         debugs(9, 3, HERE << "starting data transfer");
+        switchTimeoutToDataChannel();
         sendMoreRequestBody();
-        /** \par
-         * Cancel the timeout on the Control socket and
-         * establish one on the data socket.
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, this, FtpStateData::ftpTimeout);
-
-        commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
         state = WRITING_DATA;
         debugs(9, 3, HERE << "writing data channel");
     } else if (code == 150) {
         /*\par
-         * When client code is 150 with a hostname, Accept data channel. */
+         * When client code is 150 without a hostname, Accept data channel. */
         debugs(9, 3, "ftpReadStor: accepting data channel");
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-
-        data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+        switchTimeoutToDataChannel();
+        listenForDataChannel(data.fd, data.host);
     } else {
         debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
         ftpFail(this);
     }
 }
 
 /// \ingroup ServerProtocolFTPInternal
 static void
 ftpSendRest(FtpStateData * ftpState)
 {
     /* check the server control channel is still available */
     if (!ftpState || !ftpState->haveControlChannel("ftpSendRest"))
         return;
 
     debugs(9, 3, HERE);
 
     snprintf(cbuf, 1024, "REST %"PRId64"\r\n", ftpState->restart_offset);
     ftpState->writeCommand(cbuf);
     ftpState->state = SENT_REST;
 }
@@ -3205,129 +3201,92 @@
 
     if (ftpState->filepath) {
         snprintf(cbuf, 1024, "NLST %s\r\n", ftpState->filepath);
     } else {
         snprintf(cbuf, 1024, "NLST\r\n");
     }
 
     ftpState->writeCommand(cbuf);
     ftpState->state = SENT_NLST;
 }
 
 /// \ingroup ServerProtocolFTPInternal
 static void
 ftpReadList(FtpStateData * ftpState)
 {
     int code = ftpState->ctrl.replycode;
     debugs(9, 3, HERE);
 
     if (code == 125 || (code == 150 && ftpState->data.host)) {
         /* Begin data transfer */
-        /* XXX what about Config.Timeout.read? */
+        debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+        ftpState->switchTimeoutToDataChannel();
         ftpState->maybeReadVirginBody();
         ftpState->state = READING_DATA;
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
         return;
     } else if (code == 150) {
         /* Accept data channel */
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+        debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+        ftpState->switchTimeoutToDataChannel();
+        ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
         return;
     } else if (!ftpState->flags.tried_nlst && code > 300) {
         ftpSendNlst(ftpState);
     } else {
         ftpFail(ftpState);
         return;
     }
 }
 
 /// \ingroup ServerProtocolFTPInternal
 static void
 ftpSendRetr(FtpStateData * ftpState)
 {
     /* check the server control channel is still available */
     if (!ftpState || !ftpState->haveControlChannel("ftpSendRetr"))
         return;
 
     debugs(9, 3, HERE);
 
     assert(ftpState->filepath != NULL);
     snprintf(cbuf, 1024, "RETR %s\r\n", ftpState->filepath);
     ftpState->writeCommand(cbuf);
     ftpState->state = SENT_RETR;
 }
 
 /// \ingroup ServerProtocolFTPInternal
 static void
 ftpReadRetr(FtpStateData * ftpState)
 {
     int code = ftpState->ctrl.replycode;
     debugs(9, 3, HERE);
 
     if (code == 125 || (code == 150 && ftpState->data.host)) {
         /* Begin data transfer */
         debugs(9, 3, HERE << "reading data channel");
-        /* XXX what about Config.Timeout.read? */
+        ftpState->switchTimeoutToDataChannel();
         ftpState->maybeReadVirginBody();
         ftpState->state = READING_DATA;
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
     } else if (code == 150) {
         /* Accept data channel */
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+        ftpState->switchTimeoutToDataChannel();
+        ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
     } else if (code >= 300) {
         if (!ftpState->flags.try_slash_hack) {
             /* Try this as a directory missing trailing slash... */
             ftpState->hackShortcut(ftpSendCwd);
         } else {
             ftpFail(ftpState);
         }
     } else {
         ftpFail(ftpState);
     }
 }
 
 /**
  * Generate the HTTP headers and template fluff around an FTP
  * directory listing display.
  */
 void
 FtpStateData::completedListing()
 {
     assert(entry);
@@ -3948,48 +3907,51 @@
 AsyncCall::Pointer
 FtpStateData::dataCloser()
 {
     typedef CommCbMemFunT<FtpStateData, CommCloseCbParams> Dialer;
     return JobCallback(9, 5, Dialer, this, FtpStateData::dataClosed);
 }
 
 /// configures the channel with a descriptor and registers a close handler
 void
 FtpChannel::opened(int aFd, const AsyncCall::Pointer &aCloser)
 {
     assert(fd < 0);
     assert(closer == NULL);
 
     assert(aFd >= 0);
     assert(aCloser != NULL);
 
     fd = aFd;
     closer = aCloser;
     comm_add_close_handler(fd, closer);
+
+    // grab the local IP address:port details for this connection
+    struct addrinfo *AI = NULL;
+    local.InitAddrInfo(AI);
+    getsockname(aFd, AI->ai_addr, &AI->ai_addrlen);
+    local = *AI;
+    local.FreeAddrInfo(AI);
 }
 
 /// planned close: removes the close handler and calls comm_close
 void
 FtpChannel::close()
 {
     // channels with active listeners will be closed when the listener handler dies.
-    if (listener) {
-        delete listener;
-        listener = NULL;
-        comm_remove_close_handler(fd, closer);
-        closer = NULL;
-        fd = -1;
-    } else if (fd >= 0) {
-        comm_remove_close_handler(fd, closer);
-        closer = NULL;
+    if (fd >= 0) {
+        if (closer != NULL) {
+            comm_remove_close_handler(fd, closer);
+            closer = NULL;
+        }
         comm_close(fd); // we do not expect to be called back
         fd = -1;
     }
 }
 
 /// just resets fd and close handler
 void
 FtpChannel::clear()
 {
     fd = -1;
     closer = NULL;
 }

=== modified file 'src/htcp.cc'
--- src/htcp.cc	2011-01-10 09:43:43 +0000
+++ src/htcp.cc	2011-01-10 09:56:40 +0000
@@ -1500,46 +1500,47 @@
     if (Config.Port.htcp <= 0) {
         debugs(31, 1, "HTCP Disabled.");
         return;
     }
 
     Ip::Address incomingAddr = Config.Addrs.udp_incoming;
     incomingAddr.SetPort(Config.Port.htcp);
 
     if (!Ip::EnableIpv6 && !incomingAddr.SetIPv4()) {
         debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << incomingAddr << " is not an IPv4 address.");
         fatal("HTCP port cannot be opened.");
     }
     /* split-stack for now requires default IPv4-only HTCP */
     if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && incomingAddr.IsAnyAddr()) {
         incomingAddr.SetIPv4();
     }
 
     AsyncCall::Pointer call = asyncCall(31, 2,
                                         "htcpIncomingConnectionOpened",
                                         HtcpListeningStartedDialer(&htcpIncomingConnectionOpened));
+    Subscription::Pointer nilSub;
 
     Ipc::StartListening(SOCK_DGRAM,
                         IPPROTO_UDP,
                         incomingAddr,
                         COMM_NONBLOCKING,
-                        Ipc::fdnInHtcpSocket, call);
+                        Ipc::fdnInHtcpSocket, call, nilSub);
 
     if (!Config.Addrs.udp_outgoing.IsNoAddr()) {
         Ip::Address outgoingAddr = Config.Addrs.udp_outgoing;
         outgoingAddr.SetPort(Config.Port.htcp);
 
         if (!Ip::EnableIpv6 && !outgoingAddr.SetIPv4()) {
             debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoingAddr << " is not an IPv4 address.");
             fatal("HTCP port cannot be opened.");
         }
         /* split-stack for now requires default IPv4-only HTCP */
         if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoingAddr.IsAnyAddr()) {
             outgoingAddr.SetIPv4();
         }
 
         enter_suid();
         htcpOutSocket = comm_open_listener(SOCK_DGRAM,
                                            IPPROTO_UDP,
                                            outgoingAddr,
                                            COMM_NONBLOCKING,
                                            "Outgoing HTCP Socket");

=== modified file 'src/icp_v2.cc'
--- src/icp_v2.cc	2011-01-10 09:43:43 +0000
+++ src/icp_v2.cc	2011-01-10 09:56:40 +0000
@@ -682,45 +682,47 @@
 
     if ((port = Config.Port.icp) <= 0)
         return;
 
     addr = Config.Addrs.udp_incoming;
     addr.SetPort(port);
 
     if (!Ip::EnableIpv6 && !addr.SetIPv4()) {
         debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address.");
         fatal("ICP port cannot be opened.");
     }
     /* split-stack for now requires default IPv4-only ICP */
     if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) {
         addr.SetIPv4();
     }
 
     AsyncCall::Pointer call = asyncCall(12, 2,
                                         "icpIncomingConnectionOpened",
                                         IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr));
 
+    Subscription::Pointer nilSub;
+
     Ipc::StartListening(SOCK_DGRAM,
                         IPPROTO_UDP,
                         addr,
                         COMM_NONBLOCKING,
-                        Ipc::fdnInIcpSocket, call);
+                        Ipc::fdnInIcpSocket, call, nilSub);
 
     addr.SetEmpty(); // clear for next use.
     addr = Config.Addrs.udp_outgoing;
     if ( !addr.IsNoAddr() ) {
         enter_suid();
         addr.SetPort(port);
 
         if (!Ip::EnableIpv6 && !addr.SetIPv4()) {
             debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address.");
             fatal("ICP port cannot be opened.");
         }
         /* split-stack for now requires default IPv4-only ICP */
         if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) {
             addr.SetIPv4();
         }
 
         theOutIcpConnection = comm_open_listener(SOCK_DGRAM,
                               IPPROTO_UDP,
                               addr,
                               COMM_NONBLOCKING,

=== modified file 'src/ipc/SharedListen.cc'
--- src/ipc/SharedListen.cc	2010-10-28 18:52:59 +0000
+++ src/ipc/SharedListen.cc	2011-01-09 00:21:05 +0000
@@ -133,22 +133,23 @@
     Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
     PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
     Must(por.callback != NULL);
     TheSharedListenRequestMap.erase(response.mapId);
 
     if (fd >= 0) {
         OpenListenerParams &p = por.params;
         struct addrinfo *AI = NULL;
         p.addr.GetAddrInfo(AI);
         AI->ai_socktype = p.sock_type;
         AI->ai_protocol = p.proto;
         comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
         p.addr.FreeAddrInfo(AI);
     }
 
     StartListeningCb *cbd =
         dynamic_cast<StartListeningCb*>(por.callback->getDialer());
     Must(cbd);
     cbd->fd = fd;
     cbd->errNo = response.errNo;
+    cbd->handlerSubscription = por.params.handlerSubscription;
     ScheduleCallHere(por.callback);
 }

=== modified file 'src/ipc/SharedListen.h'
--- src/ipc/SharedListen.h	2010-07-06 23:09:44 +0000
+++ src/ipc/SharedListen.h	2011-01-09 00:18:50 +0000
@@ -1,50 +1,54 @@
 /*
  * $Id$
  *
  * DEBUG: section 54    Interprocess Communication
  *
  */
 
 #ifndef SQUID_IPC_SHARED_LISTEN_H
 #define SQUID_IPC_SHARED_LISTEN_H
 
 #include "base/AsyncCall.h"
+#include "base/Subscription.h"
 
 namespace Ipc
 {
 
 /// "shared listen" is when concurrent processes are listening on the same fd
 
 /// comm_open_listener() parameters holder
 class OpenListenerParams
 {
 public:
     OpenListenerParams();
 
     bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
 
     int sock_type;
     int proto;
     Ip::Address addr; ///< will be memset and memcopied
     int flags;
     int fdNote; ///< index into fd_note() comment strings
+
+    /// handler to subscribe to Comm::TcpAcceptor
+    Subscription::Pointer handlerSubscription;
 };
 
 class TypedMsgHdr;
 
 /// a request for a listen socket with given parameters
 class SharedListenRequest
 {
 public:
     SharedListenRequest(); ///< from OpenSharedListen() which then sets public data
     explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
     void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 
 public:
     int requestorId; ///< kidId of the requestor
 
     OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
 
     int mapId; ///< to map future response to the requestor's callback
 };
 

=== modified file 'src/ipc/StartListening.cc'
--- src/ipc/StartListening.cc	2010-07-06 23:09:44 +0000
+++ src/ipc/StartListening.cc	2011-01-10 00:52:21 +0000
@@ -1,58 +1,66 @@
 /*
  * $Id$
  *
  * DEBUG: section 54    Interprocess Communication
  *
  */
 
 #include "config.h"
-#include "comm.h"
 #include "base/TextException.h"
+#include "comm.h"
+#include "comm/TcpAcceptor.h"
 #include "ipc/SharedListen.h"
 #include "ipc/StartListening.h"
 
 
 Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
 {
 }
 
 Ipc::StartListeningCb::~StartListeningCb()
 {
 }
 
 std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
 {
     return os << "(FD " << fd << ", err=" << errNo;
 }
 
-
-void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
-                         int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+void
+Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, int flags,
+                    FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub)
 {
-    OpenListenerParams p;
-    p.sock_type = sock_type;
-    p.proto = proto;
-    p.addr = addr;
-    p.flags = flags;
-    p.fdNote = fdNote;
-
-    if (UsingSmp()) { // if SMP is on, share
+   if (UsingSmp()) { // if SMP is on, share
+        OpenListenerParams p;
+        p.sock_type = sock_type;
+        p.proto = proto;
+        p.addr = addr;
+        p.flags = flags;
+        p.fdNote = fdNote;
+        p.handlerSubscription = sub;
         Ipc::JoinSharedListen(p, callback);
         return; // wait for the call back
     }
 
+    StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(callback->getDialer());
+    Must(cbd);
+
     enter_suid();
-    const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
-                                        FdNote(p.fdNote));
-    const int errNo = (sock >= 0) ? 0 : errno;
+    if (sock_type == SOCK_STREAM) {
+        // TCP: setup a job to handle accept() with subscribed handler
+        Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(cbd->fd, addr, flags, FdNote(fdNote), sub);
+        cbd->fd = tmp->fd;
+        AsyncJob::Start(tmp);
+    } else if (sock_type == SOCK_DGRAM) {
+        // UDP: setup the listener socket, but do not set a subscriber
+        // TODO: create a UDP sbscription so packet event calls get scheduled and queued Async.
+        cbd->fd = comm_open_listener(sock_type, proto, addr, flags, FdNote(fdNote));
+    } else {
+        fatalf("Invalid Socket Type (%d)",sock_type);
+    }
+    cbd->errNo = cbd->fd >= 0 ? 0 : errno;
     leave_suid();
 
-    debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
-
-    StartListeningCb *cbd =
-        dynamic_cast<StartListeningCb*>(callback->getDialer());
-    Must(cbd);
-    cbd->fd = sock;
-    cbd->errNo = errNo;
+    debugs(54, 3, HERE << "opened listen FD " << cbd->fd << " on " << addr);
     ScheduleCallHere(callback);
 }

=== modified file 'src/ipc/StartListening.h'
--- src/ipc/StartListening.h	2010-11-21 04:40:05 +0000
+++ src/ipc/StartListening.h	2011-01-09 00:30:35 +0000
@@ -1,45 +1,49 @@
 /*
  * $Id$
  *
  * DEBUG: section 54    Interprocess Communication
  *
  */
 
 #ifndef SQUID_IPC_START_LISTENING_H
 #define SQUID_IPC_START_LISTENING_H
 
 #include "ip/forward.h"
 #include "ipc/FdNotes.h"
 #include "base/AsyncCall.h"
+#include "base/Subscription.h"
 
 #if HAVE_IOSFWD
 #include <iosfwd>
 #endif
 
 namespace Ipc
 {
 
 /// common API for all StartListening() callbacks
 class StartListeningCb
 {
 public:
     StartListeningCb();
     virtual ~StartListeningCb();
 
     /// starts printing arguments, return os
     std::ostream &startPrint(std::ostream &os) const;
 
 public:
     int fd; ///< opened listening socket or -1
     int errNo; ///< errno value from the comm_open_listener() call
+
+    /// The subscription we will pass on to the Comm::TcpAcceptor
+    Subscription::Pointer handlerSubscription;
 };
 
 /// Depending on whether SMP is on, either ask Coordinator to send us
 /// the listening FD or call comm_open_listener() directly.
-extern void StartListening(int sock_type, int proto, Ip::Address &addr,
-                           int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+extern void StartListening(int sock_type, int proto, Ip::Address &addr, int flags,
+                           FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub);
 
 } // namespace Ipc;
 
 
 #endif /* SQUID_IPC_START_LISTENING_H */


