注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

gmd20的个人空间

// 编程和生活

 
 
 

日志

 
 

asio网络的async_connect异步连接用的是额外的select线程不是ConnectEx+IOCP, asio里面封装ConnectEx的例子  

2013-10-27 22:54:25|  分类: 程序设计 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
根据asio的文档的说明。asio的 socket的async_connect的实现是用额外的select线程来做的,不是 overlapped I/O + 

IOCP来实现的。参见:http://think-async.com/Asio/asio-1.5.3/doc/asio/overview/implementation.html
很奇怪其他send recv操作都是用的IOCP的,不知道这里的connect为什么用select呢,select在大数量的socket的情况下

应该性能会差点吧。实际测试程序发起大量连接(有可能connect返回错误,不断重连加剧了负担),set-fd里面的开销

确实很大。看来这个connect函数有点特殊,用asio写代码,需要发起大量连接的时候要考虑一下这个问题????



connect
http://msdn.microsoft.com/en-us/library/ms737625(v=vs.85).aspx
函数是不支持 overlapped I/O的,所以肯定不能跟IOCP一起使用了。异步的connect的结果可以通过select 和 

WSAEventSelect来得到。asio选择了select,应该是在其他线程里面用select拿到结果,再把handler post回主线程进行

的吧。 不过在网上看到有人做libevent的IOCP的时候,用的WSAEventSelect。WSAEventSelect应该会比select好一
些吧。



在网上搜索了一下,其实是有另外一个ConnectEx函数是可以支持overlapped I/O,应该是可以和IOCP集成的。网上可以

找到很多人提到和一些例子。
IOCP
http://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
如果socket要用IOCP的话
服务端主要是AceeptEx WSASend WSASendto函数
客户端主要是ConnectEx WSARecv和WSARecvFrom函数
关键就是函数要支持OVERLAPPED 参数。


ConnectEx
http://msdn.microsoft.com/en-us/library/ms737606(v=vs.85).aspx
ConnectEx有点特殊的:
1. 这个函数调用,只能在运行时获取函数地址,通过函数指针来调用。这个MSDN有介绍了
2. 在调用ConnectEx之前,一定要先做bind操作,不然会返回错误。connect函数应该是会底下自己帮你做bind,但这个没有。不知道微软为什么这样子搞。这个bind就多了一次系统调用了啊。
3. ConnectEx支持在建立连接的同时发送网络数据出去。这个发送合并在一起,应该是可以省去一次send的系统调用了。在http短连接的情况下应该很合适啊。但前面bind也多了一次了不知道有人测试过没有性能有没有什么变化。
    不过后面的封装asio的async_connectex 函数的例子,我觉得最好也把这个一同发送数据的功能实现了吧。




先看一下asio的 async_connect操作的实现代码:

-----------------asio-1.4.8\include\asio\basic_socket.hpp--------------------------
 template <typename ConnectHandler>
  void async_connect(const endpoint_type& peer_endpoint, ConnectHandler handler)
  {
    if (!is_open())
    {
      asio::error_code ec;
      if (this->service.open(this->implementation,
            peer_endpoint.protocol(), ec))
      {
        this->get_io_service().post(
            asio::detail::bind_handler(handler, ec));
        return;
      }
    }

    this->service.async_connect(this->implementation, peer_endpoint, handler);
  }
----------------asio-1.4.8\include\asio\stream_socket_service.hpp-------------------------------
 class stream_socket_service
#if defined(GENERATING_DOCUMENTATION)
  : public asio::io_service::service
#else
  : public asio::detail::service_base<stream_socket_service<Protocol> >
#endif

 /// Start an asynchronous connect.
  template <typename ConnectHandler>
  void async_connect(implementation_type& impl,
      const endpoint_type& peer_endpoint, ConnectHandler handler)
  {
    service_impl_.async_connect(impl, peer_endpoint, handler);
  }


#if defined(ASIO_HAS_IOCP)
  typedef detail::win_iocp_socket_service<Protocol> service_impl_type;  ////////////////////
#else
  typedef detail::reactive_socket_service<Protocol> service_impl_type;
#endif
 // The platform-specific implementation.
  service_impl_type service_impl_;   //////////////////
};

--------------asio-1.4.8\include\asio\detail\win_iocp_socket_service.hpp----------------
class win_iocp_socket_service : public win_iocp_socket_service_base

 // Start an asynchronous connect.
  template <typename Handler>
  void async_connect(implementation_type& impl,
      const endpoint_type& peer_endpoint, Handler handler)
  {
    // Allocate and construct an operation to wrap the handler.
    typedef reactive_socket_connect_op<Handler> op;
    typename op::ptr p = { boost::addressof(handler),
      asio_handler_alloc_helpers::allocate(
        sizeof(op), handler), 0 };
    p.p = new (p.v) op(impl.socket_, handler);

    start_connect_op(impl, p.p, peer_endpoint.data(),
        static_cast<int>(peer_endpoint.size()));
    p.v = p.p = 0;
  }

---------asio-1.4.8\include\asio\detail\impl\win_iocp_socket_service_base.ipp------------

void win_iocp_socket_service_base::start_connect_op(
    win_iocp_socket_service_base::base_implementation_type& impl,
    reactor_op* op, const socket_addr_type* addr, std::size_t addrlen)
{
  reactor& r = get_reactor();
  update_cancellation_thread_id(impl);

  if ((impl.state_ & socket_ops::non_blocking) != 0
      || socket_ops::set_internal_non_blocking(
        impl.socket_, impl.state_, op->ec_))
  {
    if (socket_ops::connect(impl.socket_, addr, addrlen, op->ec_) != 0)
    {
      if (op->ec_ == asio::error::in_progress
          || op->ec_ == asio::error::would_block)
      {
        op->ec_ = asio::error_code();
        r.start_op(reactor::connect_op, impl.socket_,
            impl.reactor_data_, op, false);
        return;
      }
    }
  }

  r.post_immediate_completion(op);
}



reactor& win_iocp_socket_service_base::get_reactor()
{
  reactor* r = static_cast<reactor*>(
        interlocked_compare_exchange_pointer(
          reinterpret_cast<void**>(&reactor_), 0, 0));
  if (!r)
  {
    r = &(use_service<reactor>(io_service_));
    interlocked_exchange_pointer(reinterpret_cast<void**>(&reactor_), r);
  }
  return *r;
}
可以看到上面的connect函数的结果,是需要select_reactor辅助处理的。



作为一个比较,普通的send 和receive操作的不需要reactor(Windows下面应该是select_reactor)的参与。
WSASend发送之后,返回的结果就会在发送出去之后由IOCP处理了。

void win_iocp_socket_service_base::start_send_op(
    win_iocp_socket_service_base::base_implementation_type& impl,
    WSABUF* buffers, std::size_t buffer_count,
    socket_base::message_flags flags, bool noop, operation* op)
{
  update_cancellation_thread_id(impl);
  iocp_service_.work_started();

  if (noop)
    iocp_service_.on_completion(op);
  else if (!is_open(impl))
    iocp_service_.on_completion(op, asio::error::bad_descriptor);
  else
  {
    DWORD bytes_transferred = 0;
    int result = ::WSASend(impl.socket_, buffers,
        static_cast<DWORD>(buffer_count), &bytes_transferred, flags, op, 0);
    DWORD last_error = ::WSAGetLastError();
    if (last_error == ERROR_PORT_UNREACHABLE)
      last_error = WSAECONNREFUSED;
    if (result != 0 && last_error != WSA_IO_PENDING)
      iocp_service_.on_completion(op, last_error, bytes_transferred);
    else
      iocp_service_.on_pending(op);
  }
}



--------------------asio-1.4.8\include\asio\detail\win_iocp_socket_service_base.hpp-----------
class win_iocp_socket_service_base

  // The reactor used for performing connect operations. This object is created
  // only if needed.
  reactor* reactor_;

--------------------asio/detail/select_reactor.hpp----------------------------------------

void select_reactor::run(bool block, op_queue<operation>& ops)
{
  asio::detail::mutex::scoped_lock lock(mutex_);

#if defined(ASIO_HAS_IOCP)
  // Check if the thread is supposed to stop.
  if (stop_thread_)
    return;
#endif // defined(ASIO_HAS_IOCP)

  // Set up the descriptor sets.
  fd_set_adapter fds[max_select_ops];
  fds[read_op].set(interrupter_.read_descriptor());
  socket_type max_fd = 0;
  bool have_work_to_do = !timer_queues_.all_empty();
  for (int i = 0; i < max_select_ops; ++i)
  {
    have_work_to_do = have_work_to_do || !op_queue_[i].empty();
    op_queue_[i].get_descriptors(fds[i], ops);
    if (fds[i].max_descriptor() > max_fd)
      max_fd = fds[i].max_descriptor();
  }

#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
  // Connection operations on Windows use both except and write fd_sets.
  have_work_to_do = have_work_to_do || !op_queue_[connect_op].empty();
  op_queue_[connect_op].get_descriptors(fds[write_op], ops);
  if (fds[write_op].max_descriptor() > max_fd)
    max_fd = fds[write_op].max_descriptor();
  op_queue_[connect_op].get_descriptors(fds[except_op], ops);
  if (fds[except_op].max_descriptor() > max_fd)
    max_fd = fds[except_op].max_descriptor();
#endif // defined(BOOST_WINDOWS) || defined(__CYGWIN__)

  // We can return immediately if there's no work to do and the reactor is
  // not supposed to block.
  if (!block && !have_work_to_do)
    return;

  // Determine how long to block while waiting for events.
  timeval tv_buf = { 0, 0 };
  timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;

  lock.unlock();

  // Block on the select call until descriptors become ready.
  asio::error_code ec;
  int retval = socket_ops::select(static_cast<int>(max_fd + 1),
      fds[read_op], fds[write_op], fds[except_op], tv, ec);

  // Reset the interrupter.
  if (retval > 0 && fds[read_op].is_set(interrupter_.read_descriptor()))
    interrupter_.reset();

  lock.lock();

  // Dispatch all ready operations.
  if (retval > 0)
  {
#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
    // Connection operations on Windows use both except and write fd_sets.
    op_queue_[connect_op].perform_operations_for_descriptors(
        fds[except_op], ops);
    op_queue_[connect_op].perform_operations_for_descriptors(
        fds[write_op], ops);
#endif // defined(BOOST_WINDOWS) || defined(__CYGWIN__)

    // Exception operations must be processed first to ensure that any
    // out-of-band data is read before normal data.
    for (int i = max_select_ops - 1; i >= 0; --i)
      op_queue_[i].perform_operations_for_descriptors(fds[i], ops);
  }
  timer_queues_.get_ready_timers(ops);
}








==========================================
asio主页上就有提到怎么封装这个ConnectEx的例子。
http://think-async.com/Asio/Examples 

我摘录关键部分出来如下:

template <typename Socket>
boost::system::error_code bind_to_any(Socket& socket, 
    const typename Socket::endpoint_type::protocol_type& protocol)
{
  typedef typename Socket::endpoint_type endpoint_type;

  static const boost::system::error_code ignored(WSAEINVAL,
      boost::asio::error::get_system_category());

  boost::system::error_code error;
  socket.bind(endpoint_type(protocol, 0), error);

  if (ignored == error)
  {
    error.clear();
  }

  return error;
}



template <typename Socket, typename Handler>
void async_connect(Socket& socket,
    const typename Socket::endpoint_type& peer_endpoint,
    const Handler& handler)
{
  if (!socket.is_open())
  {
    // Open the socket before use ConnectEx.
    boost::system::error_code error;
    socket.open(peer_endpoint.protocol(), error);
    if (error)
    {
        socket.get_io_service().post(bind_handler(handler, error));
        return;
    }
  }

  SOCKET native_socket = socket.native_handle();
  GUID connect_ex_guid = WSAID_CONNECTEX;
  LPFN_CONNECTEX connect_ex_func = 0;
  DWORD result_bytes;

  // ConnectEx函数的调用只能运行时拿到地址再调用。
  int ctrl_result = ::WSAIoctl(native_socket,
      SIO_GET_EXTENSION_FUNCTION_POINTER,
      &connect_ex_guid, sizeof(connect_ex_guid),
      &connect_ex_func, sizeof(connect_ex_func),
      &result_bytes, NULL, NULL);

  // If ConnectEx wasn't located then fall to common Asio async_connect.
  if ((SOCKET_ERROR == ctrl_result) || !connect_ex_func)
  {
       // 如果没能得到ConnectEx函数地址,退化为使用asio默认的async_connect,这个是用select 和附加线程实现

       socket.async_connect(peer_endpoint, handler);
       return;
  }

  // ConnectEx函数调用之前必须先做过bind操作。否则就返回失败。connect函数默认会自己帮你做bind操作的。
  if (boost::system::error_code error = detail::bind_to_any(socket, 
      peer_endpoint.protocol()))
  {

    socket.get_io_service().post(bind_handler(handler, error));
  }


  // Construct an OVERLAPPED-derived object to contain the handler.
  boost::asio::windows::overlapped_ptr overlapped(socket.get_io_service(),
      detail::make_connect_ex_handler(handler));


  // Initiate the ConnectEx operation.
  BOOL ok = connect_ex_func(native_socket, peer_endpoint.data(),
      boost::numeric_cast<int>(peer_endpoint.size()), NULL, 0, NULL,
      overlapped.get());
  DWORD last_error = ::WSAGetLastError();

  // Check if the operation completed immediately.
  if ((FALSE == ok) && (ERROR_IO_PENDING != last_error))
  {
    // The operation completed immediately, so a completion notification needs
    // to be posted. When complete() is called, ownership of the OVERLAPPED-
    // derived object passes to the io_service.
    boost::system::error_code error(last_error,
        boost::asio::error::get_system_category());
    overlapped.complete(error, 0);
  }
  else
  {
    // The operation was successfully initiated, so ownership of the
    // OVERLAPPED-derived object has passed to the io_service.
    overlapped.release();
  }

}


===========================================================
asio里面提供了windows::overlapped_ptr 辅助类,
所以跟asio 的windows IOCP io_service集成起来还比较简单。
这里还有一个使用Windows的TransmitFile 函数的例子,也可以参考一下。
这个TransmitFile有点类似Linux的SendFile调用啊,应该可以由操作系统文件cache里面直接发送,让你少一次复制吧。
http://think-async.com/Asio/asio-1.5.3/doc/asio/reference/windows__overlapped_ptr.html
http://think-async.com/Asio/asio-1.5.3/src/examples/windows/transmit_file.cpp


  评论这张
 
阅读(1260)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017