MySQL使用了称之为psi/pfs的一系列文件和结构来进行performance监控。Psi全称为performance schema interface,pfs全称为performance storage。
该机制使用pthead来进行操作,其首先定义了pthread的线程存储变量(pfs.cc):
thread_local_key_t THR_PFS;
thread_local_key_t THR_PFS_VG; // global_variables
thread_local_key_t THR_PFS_SV; // session_variables
thread_local_key_t THR_PFS_VBT; // variables_by_thread
thread_local_key_t THR_PFS_SG; // global_status
thread_local_key_t THR_PFS_SS; // session_status
thread_local_key_t THR_PFS_SBT; // status_by_thread
thread_local_key_t THR_PFS_SBU; // status_by_user
thread_local_key_t THR_PFS_SBH; // status_by_host
thread_local_key_t THR_PFS_SBA; // status_by_account
bool THR_PFS_initialized= false;
这里的thread_local_key_t
实际上是pthread_key_t
,即pthread
线程存储变量。pthread_key_t
的使用就像一个全局变量,哪个线程都可以用,但是实际上对应了线程内部的变量值,可以参见该例:http://www.jianshu.com/p/d52c…。pthread规定,线程存储变量thread_local_key_t
必须要先初始化。MySQL在pfs_server.cc
中对这些变量统一初始化:
void pre_initialize_performance_schema()
{
pfs_initialized= false;
init_all_builtin_memory_class();
PFS_table_stat::g_reset_template.reset();
global_idle_stat.reset();
global_table_io_stat.reset();
global_table_lock_stat.reset();
if (my_create_thread_local_key(&THR_PFS, destroy_pfs_thread))
return;
if (my_create_thread_local_key(&THR_PFS_VG, NULL)) // global_variables
return;
if (my_create_thread_local_key(&THR_PFS_SV, NULL)) // session_variables
return;
if (my_create_thread_local_key(&THR_PFS_VBT, NULL)) // variables_by_thread
return;
if (my_create_thread_local_key(&THR_PFS_SG, NULL)) // global_status
return;
if (my_create_thread_local_key(&THR_PFS_SS, NULL)) // session_status
return;
if (my_create_thread_local_key(&THR_PFS_SBT, NULL)) // status_by_thread
return;
if (my_create_thread_local_key(&THR_PFS_SBU, NULL)) // status_by_user
return;
if (my_create_thread_local_key(&THR_PFS_SBH, NULL)) // status_by_host
return;
if (my_create_thread_local_key(&THR_PFS_SBA, NULL)) // status_by_account
return;
THR_PFS_initialized= true;
}
注意,这个初始化只做一次,以后创建线程时直接使用即可。上的第一个变量THR_PFS就是我们要使用的。
如何使用
使用上的方式初始化,首先要set相应的value:
/**
@brief Execute the JOIN generated by parallel
@param join [in] JOIN structure
*/
void execute_join(parallel_execution_thread_arg* parallel_arg) {
/*
* Get the parameter:
* 1. JOIN
* 2. pfs
*/
/// TODO: do we need to handle error?
std::cout << "****************I am in worker thread*****************" << std::endl;
/// Get join
JOIN* join= parallel_arg->join;
/// Get and Set pfs
PSI_thread* pfs= parallel_arg->pfs;
pfs_set_thread_v1(pfs);
/// Delete
delete parallel_arg;
/// Set the new thread context
my_thread_set_THR_THD(join->thd);
/// Execute
join->exec();
}
上面的函数是我在MySQL中新加入的代码,其中使用pfs_set_thread_v1进行set操作,即把当前THR_PFS对应的值设置为pfs。
get操作。由于我们加入了boost
线程库,所以当启动一个线程时需要把JOIN
结构和pfs
结构传入。思路是首先通过THR_PFS
获得pfs
线程句柄,作为参数传入到新的线程中。再新线程执行函数中,把pfs
线程句柄set进去。具体在sql_select.cc中,我们加入了如下代码:
/**
Parallel execution.
@details When a JOIN is parallel, its JOINs will execute parallelly.
Put all JOINs into thread pool to execute.
*/
void JOIN::parallel_exec_joins() {
for (uint i= 0; i < m_parallel_joins.size(); i ++) {
/// Delete it in join->exec
parallel_execution_thread_arg* parallel_arg= new parallel_execution_thread_arg();
/// Set join
JOIN* join= m_parallel_joins[i];
parallel_arg->join= join;
/// Set pfs
PSI_thread* pfs= pfs_get_thread_v1();
parallel_arg->pfs= pfs;
/// Thread pool
generic_thread_pool.SubmitTask(execute_join, (parallel_execution_thread_arg* &&)parallel_arg);
}
}
可以看到,我们通过MySQL的pfs_get_thread_v1
获得pfs线程句柄传入到新的线程。
上面的例子是针对pfs的线程。对于MySQL普通线程的例子在上面的execute_join也能找到。注意里面有一行code:
/// Set the new thread context
my_thread_set_THR_THD(join->thd);
这里就是把当前的thd设置到pthread中。所以我们看到,在MySQL中的很多地方都用到了这个东西,用法也已经明确了。