注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

gmd20的个人空间

// 编程和生活

 
 
 

日志

 
 

freeDiameter源码阅读之消息队列和消息处理流程  

2013-04-03 13:23:33|  分类: 程序设计 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |


fifo消息队列的定义和基本操作
=============
include/freeDiameter/libfdproto.h
-----------
/*
* FUNCTION: fd_fifo_post
*
* PARAMETERS:
* queue : The queue in which the element must be posted.
* item : The element that is put in the queue.
*
* DESCRIPTION:
* An element is added in a queue. Elements are retrieved from the queue in FIFO order
* with the fd_fifo_get, fd_fifo_tryget, or fd_fifo_timedget functions.
*
* RETURN VALUE:
* 0 : The element is queued.
* EINVAL : A parameter is invalid.
* ENOMEM : Not enough memory to complete the operation.
*/
int fd_fifo_post_int ( struct fifo * queue, void ** item );
#define fd_fifo_post(queue, item) \
fd_fifo_post_int((queue), (void *)(item))

/*
* FUNCTION: fd_fifo_get
*
* PARAMETERS:
* queue : The queue from which the first element must be retrieved.
* item : On return, the first element of the queue is stored here.
*
* DESCRIPTION:
* This function retrieves the first element from a queue. If the queue is empty, the function will block the
* thread until a new element is posted to the queue, or until the thread is canceled (in which case the
* function does not return).
*
* RETURN VALUE:
* 0 : A new element has been retrieved.
* EINVAL : A parameter is invalid.
*/
int fd_fifo_get_int ( struct fifo * queue, void ** item );
#define fd_fifo_get(queue, item) \
fd_fifo_get_int((queue), (void *)(item))
------------------------------------------------------------


fifo.c
--------------
struct fifo {
int eyec; /* An eye catcher, also used to check a queue is valid. FIFO_EYEC */

pthread_mutex_t mtx; /* Mutex protecting this queue */
pthread_cond_t cond_pull; /* condition variable for pulling threads */
pthread_cond_t cond_push; /* condition variable for pushing threads */

struct fd_list list; /* sentinel for the list of elements */
int count; /* number of objects in the list */
int thrs; /* number of threads waiting for a new element (when count is 0) */

int max; /* maximum number of items to accept if not 0 */
int thrs_push; /* number of threads waitnig to push an item */

uint16_t high; /* High level threshold (see libfreeDiameter.h for details) */
uint16_t low; /* Low level threshhold */
void *data; /* Opaque pointer for threshold callbacks */
void (*h_cb)(struct fifo *, void **); /* The callbacks */
void (*l_cb)(struct fifo *, void **);
int highest;/* The highest count value for which h_cb has been called */
int highest_ever; /* The max count value this queue has reached (for tweaking) */
};

-----------------
fd_fifo_post_int
pthread_mutex_lock( &queue->mtx 这个队列是由锁保护的链表。
ret = pthread_cond_wait( &queue->cond_push, &queue->mtx ); //循环等待条件变量,等到可以插入
fd_list_insert_before
pthread_cond_signal(&queue->cond_pull // 通知等待的读取线程
pthread_cond_signal(&queue->cond_push
pthread_mutex_unlock( &queue->mtx )
----------------
fd_fifo_get_int
fifo_tget
fifo_tget
pthread_mutex_lock( &queue->mtx
*item = mq_pop(queue); // 如有有就从链表取出元素
pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime ); //有超时的等待
pthread_cond_wait( &queue->cond_pull, &queue->mtx );
pthread_mutex_unlock( &queue->mtx )

=========================================
使用的几个例子
=========

evens.c
---------
fd_event_send
fd_fifo_post
fd_event_get
fd_fifo_get

====================
message.c
----------
fd_msg_send
fd_msg_send_timeout
fd_fifo_post //把消息加到队列
===================
peers.c
----------
fd_peer_alloc
fd_fifo_new(&p->p_tosend
=======================





上一篇文章说到peer的状态机处理进程会把消息放到全局的消息队列里面去。
p_psm.c
p_psm_th
fd_fifo_post(fd_g_incoming, &msg), goto psm_end );


下面看看这个消息队列是怎么使用和工作的


---------------
queue.c 全局的i消息队列的定义在这里

/* The global message queues */
struct fifo * fd_g_incoming = NULL; 接收到的新消息
struct fifo * fd_g_outgoing = NULL;
struct fifo * fd_g_local = NULL;

/* Initialize the message queues. */
int fd_queues_init(void)
{
TRACE_ENTRY();
CHECK_FCT( fd_fifo_new ( &fd_g_incoming, 20 ) );
CHECK_FCT( fd_fifo_new ( &fd_g_outgoing, 30 ) );
CHECK_FCT( fd_fifo_new ( &fd_g_local, 25 ) );
return 0;


==============================
routing_dispatch.c
------------
/* The dispatch thread */
static void * dispatch_thr(void * arg)
{
return process_thr(arg, msg_dispatch, fd_g_local, "Dispatch");
}
/* The (routing-in) thread -- see description in freeDiameter.h */
static void * routing_in_thr(void * arg)
{
return process_thr(arg, msg_rt_in, fd_g_incoming, "Routing-IN");
}
/* The (routing-out) thread -- see description in freeDiameter.h */
static void * routing_out_thr(void * arg)
{
return process_thr(arg, msg_rt_out, fd_g_outgoing, "Routing-OUT");
}

这几个线程在程序启动时由 main () ->fd_core_parseconf -> fd_rtdisp_init () 函数里面创建
---------
process_thr
do { 循环从消息队列中取出消息
ret = fd_fifo_timedget ( queue, &msg, &ts );
//调用msg_rt_in 函数 处理消息
(*action_cb)(&msg), goto fatal_error)
} while (1)
-------------------
* The ROUTING-IN message processing */
static int msg_rt_in(struct msg ** pmsg)

fd_app_check(&fd_g_config->cnf_apps, hdr->msg_appl, &app) //判断是不是bending能处理的app
fd_fifo_post(fd_g_local, pmsg) ); //本地的转交local 线程处理
fd_fifo_post(fd_g_incoming // 有时需要转换,然后重新入队再走一下处理流程

/* Call all registered callbacks for this message */
pthread_rwlock_rdlock( &rt_fwd_lock )
for {
ret = (*rh->rt_fwd_cb)(rh->cbdata, pmsg),
]
pthread_rwlock_unlock( &rt_fwd_lock )

------------------
本地的fd_g_local, 队列对应的处理函数
/* The DISPATCH message processing */
static int msg_dispatch(struct msg ** pmsg
(1) 对于 answer消息,看看是不是对应request发送的时候注册了回调函数,
fd_msg_answ_getq
fd_msg_anscb_get
(*anscb)(data, pmsg); // 调完回调处理就完成了。
(2)
fd_msg_sess_get 获取对应的session
fd_msg_dispatch //调用看看谁注册要这个消息的,调用扩展应用的回调
fd_fifo_post(fd_g_outgoing, pmsg) 根据扩展的返回状态,决定是不是往外发消息


  评论这张
 
阅读(1081)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017