Bi-directional I/O using Data Distribution Services requires a separate reader and writer.

This commit is contained in:
Erik Hofman 2021-04-15 09:45:17 +02:00
parent 9b3265c46c
commit bcfff2a967
2 changed files with 29 additions and 17 deletions

View File

@ -24,6 +24,7 @@
# include <simgear_config.h>
#endif
#include <cstring>
#include <algorithm>
#include <simgear/compiler.h>
@ -62,6 +63,7 @@ SG_DDS_Topic::setup(const char *topic, const dds_topic_descriptor_t *desc, size_
// If specified as a server (in direction) create a subscriber.
// If specified as a client (out direction), create a publisher.
// If sepcified as bi-directional create a publisher and a subscriber.
bool
SG_DDS_Topic::open(SGProtocolDir direction)
{
@ -117,10 +119,10 @@ SG_DDS_Topic::open(dds_entity_t p, dds_guid_t& g, SGProtocolDir direction)
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(1));
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1);
entry = dds_create_reader(participant, topic, qos, NULL);
if (entry < 0) {
reader = dds_create_reader(participant, topic, qos, NULL);
if (reader < 0) {
SG_LOG(SG_IO, SG_ALERT, "dds_create_reader: "
<< dds_strretcode(-entry));
<< dds_strretcode(-reader));
return false;
}
@ -131,14 +133,14 @@ SG_DDS_Topic::open(dds_entity_t p, dds_guid_t& g, SGProtocolDir direction)
{
dds_return_t rc;
entry = dds_create_writer(participant, topic, NULL, NULL);
if (entry < 0) {
writer = dds_create_writer(participant, topic, NULL, NULL);
if (writer < 0) {
SG_LOG(SG_IO, SG_ALERT, "dds_create_writer: "
<< dds_strretcode(-entry));
<< dds_strretcode(-writer));
return false;
}
rc = dds_set_status_mask(entry, DDS_PUBLICATION_MATCHED_STATUS);
rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
if (rc != DDS_RETCODE_OK) {
SG_LOG(SG_IO, SG_ALERT, "dds_set_status_mask: "
<< dds_strretcode(-rc));
@ -157,12 +159,14 @@ SG_DDS_Topic::read(char *buf, int length)
{
int result;
if (entry < 0 || length < (int)packet_size) {
if (reader < 0 || length < (int)packet_size) {
SG_LOG(SG_IO, SG_ALERT, "SG_DDS_Topic: data reader not properly set up.");
return 0;
}
dds_sample_info_t info[1];
result = dds_take(entry, (void**)&buf, info, 1, 1);
memset(buf, 0, length);
result = dds_take(reader, (void**)&buf, info, 1, 1);
if(result < 0)
{
errno = -result;
@ -184,13 +188,14 @@ SG_DDS_Topic::write(const char *buf, const int length)
{
int result;
if (entry < 0 || length < (int)packet_size) {
if (writer < 0 || length < (int)packet_size) {
SG_LOG(SG_IO, SG_ALERT, "SG_DDS_Topic: data writer not properly set up.");
return 0;
}
if (!status)
{
dds_return_t rc = dds_get_status_changes(entry, &status);
dds_return_t rc = dds_get_status_changes(writer, &status);
if (rc != DDS_RETCODE_OK) {
SG_LOG(SG_IO, SG_ALERT, "dds_get_status_changes: "
<< dds_strretcode(-rc));
@ -198,12 +203,12 @@ SG_DDS_Topic::write(const char *buf, const int length)
}
if (!(status & DDS_PUBLICATION_MATCHED_STATUS)) {
SG_LOG(SG_IO, SG_INFO, "DDS skipping write: no readers.");
SG_LOG(SG_IO, SG_INFO, "SG_DDS_Topic: skipping write: no readers.");
return length; // no readers yet.
}
}
result = dds_write(entry, buf);
result = dds_write(writer, buf);
if(result != DDS_RETCODE_OK)
{
errno = -result;
@ -229,7 +234,8 @@ SG_DDS_Topic::close()
shared_participant = true;
participant = -1;
topic = -1;
entry = -1;
reader = -1;
writer = -1;
return true;
}
@ -292,7 +298,7 @@ SG_DDS::add(SG_DDS_Topic *topic, const SGProtocolDir d)
{
readers.push_back(topic);
dds_entity_t entry = topic->get();
dds_entity_t entry = topic->get_reader();
dds_entity_t rdcond = dds_create_readcondition(entry,
DDS_NOT_READ_SAMPLE_STATE);
int status = dds_waitset_attach(waitset, rdcond, entry);

View File

@ -55,7 +55,8 @@ private:
bool shared_participant = true;
dds_entity_t participant = -1;
dds_entity_t topic = -1;
dds_entity_t entry = -1;
dds_entity_t reader = -1;
dds_entity_t writer = -1;
dds_guid_t guid;
@ -144,7 +145,12 @@ public:
// close the participant.
bool close();
dds_entity_t get() { return entry; }
dds_entity_t get_reader() { return reader; }
dds_entity_t get_writer() { return writer; }
dds_entity_t get(const SGProtocolDir d = SG_IO_IN) {
return (d == SG_IO_OUT) ? writer : reader;
}
const dds_guid_t& get_guid() { return guid; }
};