注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

gmd20的个人空间

// 编程和生活

 
 
 

日志

 
 

试用一下ZeroMQ,把log数据发送过去直接写文件保存  

2012-11-22 17:38:49|  分类: 程序设计 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
api确实很简单,使用很方便。  比较麻烦的就是 发送模式的选择,REQ -REP 要求一应一答的模式,不能连续收。换了 PUSH 和 PULL模式终于像自己想要的那样工作了。 其实就是一端接收,一端发送而已。

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <fstream>


std::ofstream file_stream;
unsigned int totoal_written;
const unsigned int kFileSize = 1024 * 1024 * 300;

int current_file;
const int kFileListSize =4;
const char * file_list [] = { {"log1.txt"},{"log2.txt"},{"log3.txt"} , {"log4.txt"}};

LARGE_INTEGER freq, t0, t1;

void OpenNewFile()
{
if (file_stream.is_open() ) {
file_stream.close();
}
current_file = (current_file +1) % kFileListSize;

const char * new_file = file_list[current_file];

file_stream.clear();
remove(new_file);

file_stream.open(new_file, std::ios::out | std::ios::trunc | std::ios::binary);
if (!file_stream.fail() && !file_stream.bad()) { // open successful
// std::cout << "good" <<std::endl;
return;
}

}

void Init()
{
totoal_written = 0;
current_file = 3;
OpenNewFile();

QueryPerformanceFrequency(&freq);
QueryPerformanceCounter(&t0);
}

void CaculateFileSize(int len)
{
totoal_written += len;
if (totoal_written >kFileSize) {
QueryPerformanceCounter(&t1);
LONGLONG time = //秒
(t1.QuadPart-t0.QuadPart)/freq.QuadPart;
double rate = totoal_written/(1024 *1024);
rate = rate / time;

printf("平均速度:%.3f M 每秒\n", rate);

t0 = t1;
totoal_written = 0;
OpenNewFile();
}
}

int main ()
{

Init();

// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_PULL);
socket.bind ("tcp://127.0.0.1:5555");

while (true) {
zmq::message_t msg;

// Wait for next request from client
try {
socket.recv (&msg);
} catch (zmq::error_t &e) {
std::cout << "接收错误!" << std::endl;
continue;
}

CaculateFileSize(msg.size());

// write data to disk
if(file_stream.is_open()
&& !file_stream.fail()) {

//std::cout << msg.size() << std::endl;
file_stream.write((char *)msg.data(),msg.size());
}
}
return 0;
}


 发送log的一端

#include "ZeroMQBackend.h"

#include <zmq.hpp>

#include "LogStream.h"

zmq::context_t * ZeroMQBackend::context = NULL;

ZeroMQBackend::ZeroMQBackend()
{
if (context == NULL) {
context = new zmq::context_t(1); // 创建一个1 IO thread。 这个context只能一个进程一个。
}
socket = new zmq::socket_t (*context, ZMQ_PUSH);
socket->connect ("tcp://127.0.0.1:5555");
}
ZeroMQBackend::~ZeroMQBackend ()
{
if (socket) {
socket->close();
delete socket;
}
}

void ZeroMQBackend::write(LogStream * stream)
{
zmq::message_t msg((void *)stream->buffer().data(),stream->buffer().length(),
ZeroMQBackend::writeComplete,stream);
try {
socket->send(msg,ZMQ_NOBLOCK);
} catch (zmq::error_t &e) {

}
}

// This function should be thread-safe. it is called in ZeroMQ's IO thread
void ZeroMQBackend::writeComplete(void * data, void * hint)
{
LogStream * stream = (LogStream *) hint;
stream->Release();
}


  评论这张
 
阅读(1208)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017