我在多线程环境中使用
MQSeries Perl模块时遇到问题.在这里我尝试过:
>使用$mqMgr = MQSeries :: QueueManager-> new()在不同的线程中创建两个句柄.我认为这会给我两个与MQ的不同连接,但是我在第二次调用MQOPEN()时得到了返回代码2219,这可能意味着我从两次单独调用new()方法获得了与mq相同的underling连接.
>只声明一个$mqMgr作为全局共享变量.但是我无法将对MQSeries :: QueueManager对象的引用分配给$mqMgr.原因是“arg 1到threads :: shared :: share的类型必须是[$@%]之一(不是子例程条目)”
>只声明一个$mqMgr作为全局变量.得到相同的2219代码.
>尝试将MQCNO_HANDLE_SHARE_NO_BLOCK传递到MQSeries :: QueueManager-> new(),以便可以跨线程共享单个连接.但我找不到通过它的方法.
我的问题是,使用Perl模块MQSeries
>我如何/可以从不同的线程获得与MQ队列管理器的单独连接?
>我如何/可以跨不同的线程共享与MQ队列管理器的连接?
我环顾四周,但运气不好,任何信息都会受到赞赏.
相关问题:
更新1:添加一个示例,两个线程中的两个本地MQSeries :: QueueManager对象导致MQ错误代码2219.
use threads;
use Thread::Queue;
use MQSeries;
use MQSeries::QueueManager;
use MQSeries::Queue;
# globals
our $jobQ = Thread::Queue->new();
our $resultQ = Thread::Queue->new();
# ----------------------------------------------------------------------------
# sub routines
# ----------------------------------------------------------------------------
sub worker {
# fetch work from $jobQ and put result to $resultQ
# ...
}
sub monitor {
# fetch result from $resultQ and put it onto another MQ queue
my $mqQMgr = MQSeries::QueueManager->new( ... );
# different queue from the one in main
# this would cause error with MQ code 2219
my $mqQ = MQSeries::Queue->new( ... );
while (defined(my $result = $resultQ->dequeue())) {
# create an mq message and put it into $mqQ
my $mqMsg = MQSeries::Message->new();
$mqQ->put($mqMsg);
}
}
# main
unless (caller()) {
# create connection to MQ
my $mqQMgr = MQSeries::QueueManager->new( ... );
my $mqQ = MQSeries::Queue->new( ... );
# create worker and monitor thread
my @workers;
for (1 .. $nThreads) {
push(@workers, threads->create('worker'));
}
my $monitor = threads->create('monitor');
while (True) {
my $mqMsg = MQSeries::Message->new ();
my $retCode = $mqQ->get(
Message => $mqMsg,
GetMsgOptions => $someOption,
Wait => $sometime
);
die("error") if ($retCode == 0);
next if ($retCode == -1); # no message
# not we have some job to do
$jobQ->enqueue($mqMsg->Data);
}
}
最佳答案 尝试使用模块进行多线程处理时模块不是线程安全的,这是非常危险的.由于线程的工作方式,你可以克服当前的进程状态,包括文件句柄,套接字等等,这些东西可能会乱七八糟.
但是如果你尝试以异步/线程方式使用它们,它们的行为会非常奇怪,因为这些操作不一定是(必然的)原子的.
所以虽然我不能直接回答你的问题,因为我没有特定模块的经验:
除非您另有所知,否则假设您无法在线程之间共享.它可能是线程安全的,也可能不是.如果不是,它可能看起来还不错,直到有一天你因并发条件下的竞争条件而难以找到错误.
在threads :: shared中明确描述了共享标量/列表,基本上是安全的(即使这样,如果你没有锁定,你仍然会遇到非原子性问题).
因此,我建议您需要做的是:
>有一个’comms’线程,它完成与模块相关的所有工作,并使其他线程使用IPC与之通信. Thread :: Queue可以很好地为此工作.
>为了模块的目的,将每个线程视为完全独立.这包括加载它(带有需求和导入 – 不使用,因为它更早地行动)和实例化. (您可能会在线程启动之前“加载”模块,但实例化会创建描述符,套接字等.)
>当存在中断原子操作的危险时锁定东西.
上面的大部分也适用于fork并行性 – 但不是以完全相同的方式,因为fork使得“共享”的东西相当困难,所以你不太可能绊倒它.
编辑:
查看您发布的代码以及针对MQSeries源的交叉引用:
>有一个BEGIN块,它在你使用它的时候用MQSeries设置一些东西.
虽然我不能确定这是你的问题,但它让我非常谨慎 – 因为请记住,当它这样做时,它会设置一些东西 – 然后当你的线程开始时,它们会继承非共享的副本在“BEGIN”区块中“不管它做了什么”.
所以根据我之前的建议 – 我建议你尝试(因为我不能肯定地说,因为我没有参考实现):
require MQSeries;
MQSeries->import;
在线程启动后将其放入代码中 – 代替使用.例如.在你创建和在线程子程序之后.