c – 不正确使用boost :: asio和boost :: thread

我正在使用boost :: asio和boost :: thread来实现一个接受消息的消息服务,如果没有消息被处理则异步发送消息,或者如果正在处理消息则将消息排队.

消息率在我看来很高,大约每秒2.000条消息.有这么多消息,我面对的是破败的消息,但很少见.在2.000中,大约4-8的消息已损坏.我认为问题是由于不正确使用boost :: asio和/或boost :: thread库.

我实现的代码主要基于this boost tutorial.我找不到错误,因为主要消息发现,我发现很难缩小问题范围.

也许其他人知道这里出了什么问题?

基本上这个类以下列方式使用:

(1)在我的程序开始时调用构造函数以启动线程,从而接受和传输消息的服务

(2)每当我想传输消息时,我都会调用MessageService :: transmitMessage(),它将使用async_write的任务委托给处理消息队列的线程.

using namespace google::protobuf::io;
using boost::asio::ip::tcp;

MessageService::MessageService(std::string ip, std::string port) :
    work(io_service), resolver(io_service), socket(io_service) {

    messageQueue = new std::deque<AgentMessage>;
    tcp::resolver::query query(ip, port);
    endpoint_iterator = resolver.resolve(query);

    tcp::endpoint endpoint = *endpoint_iterator;

    socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
            this, boost::asio::placeholders::error, ++endpoint_iterator));

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}

void MessageService::await() {

    while (!messageQueue->empty()) {

        signal(SIGINT, exit);

        int messagesLeft = messageQueue->size();
        sleep(3);
        std::cout << "Pending Profiler Agents Messages: "
                << messageQueue->size() << std::endl;
        if (messagesLeft == messageQueue->size()) {
            std::cout << "Connection Error" << std::endl;
            break;
        }
    }

    std::cout << i << std::endl;
}

void MessageService::write(AgentMessage agentMessage, long systemTime,
        int JVM_ID) {
    agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
    agentMessage.set_jvm_id(JVM_ID);
    agentMessage.set_systemtime(systemTime);
    io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}

void MessageService::do_close() {
    socket.close();
}

void MessageService::transmitMessage(AgentMessage agentMessage) {

    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

void MessageService::do_write(AgentMessage agentMessage) {

    bool write_in_progress = !messageQueue->empty();
    messageQueue->push_back(agentMessage);

    if (!write_in_progress) {
        transmitMessage(agentMessage);
    }
}

void MessageService::handle_write(const boost::system::error_code &error) {

    if (!error) {
        messageQueue->pop_front();
        if (!messageQueue->empty()) {
            transmitMessage(messageQueue->front());
        }
    } else {
        std::cout << error << std::endl;
        do_close();
    }
}

void MessageService::handle_connect(const boost::system::error_code &error,
        tcp::resolver::iterator endpoint_iterator) {
    // can be used to receive commands from the Java profiler interface
}

MessageService::~MessageService() {
    // TODO Auto-generated destructor stub
}

头文件:

    using boost::asio::ip::tcp;

class MessageService {
public:
    MessageService(std::string ip, std::string port);
    virtual ~MessageService();
    void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
    void await();

private:
    boost::asio::io_service io_service;
    boost::asio::io_service::work work;
    tcp::resolver resolver;
    tcp::resolver::iterator endpoint_iterator;
    tcp::socket socket;
    std::deque<AgentMessage> *messageQueue;

    void do_write(AgentMessage agentMessage);

    void do_close();

    void handle_write(const boost::system::error_code &error);

    void handle_connect(const boost::system::error_code &error,
            tcp::resolver::iterator endpoint_iterator);

    void transmitMessage(AgentMessage agentMessage);
};

最佳答案 这种方法对我来说似乎很可疑

void MessageService::transmitMessage(AgentMessage agentMessage) {
    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

您似乎将AgentMessage(应该通过const引用btw传递)序列化为streambuf.但是,在调用async_write完成处理程序之前,不保证此序列化数据存在,这在async_write documentation中有明确说明.

buffers

One or more buffers containing the data to be written.
Although the buffers object may be
copied as necessary, ownership of the
underlying memory blocks is retained
by the caller, which must guarantee
that they remain valid until the
handler is called.

要解决此问题,请确保缓冲区保持在作用域中,直到调用完成处理程序.一种方法是将缓冲区作为参数传递给有界完成处理程序:

boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error,
            coded_output
            // ^^^ buffer goes here
            ));

然后从完成处理程序中删除它.我建议你也看看使用shared_ptr而不是使用裸指针.

点赞