Get the GUID from DDS and store it for future use. Which coul be handy for point-to-point connections over DDS.

This commit is contained in:
Erik Hofman 2021-04-07 11:51:32 +02:00
parent 89271e85a0
commit 9ae26d7f80
3 changed files with 79 additions and 23 deletions

View File

@ -84,7 +84,21 @@ SG_DDS_Topic::open(SGProtocolDir direction)
bool bool
SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction) SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction)
{ {
dds_guid_t g;
int status = dds_get_guid(p, &g);
if (status < 0) {
SG_LOG(SG_IO, SG_ALERT, "dds_get_guid: "
<< dds_strretcode(-status));
}
return open(p, g, direction);
}
bool
SG_DDS_Topic::open(dds_entity_t p, dds_guid_t& g, SGProtocolDir direction)
{
guid = g;
participant = p; participant = p;
if (topic < 0) if (topic < 0)
{ {
topic = dds_create_topic(participant, descriptor, topic = dds_create_topic(participant, descriptor,
@ -96,13 +110,13 @@ SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction)
return false; return false;
} }
} }
if (direction == SG_IO_IN || direction == SG_IO_BI) if (direction == SG_IO_IN || direction == SG_IO_BI)
{ {
dds_qos_t *qos = dds_create_qos(); dds_qos_t *qos = dds_create_qos();
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(1)); dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(1));
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1); dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1);
entry = dds_create_reader(participant, topic, qos, NULL); entry = dds_create_reader(participant, topic, qos, NULL);
if (entry < 0) { if (entry < 0) {
SG_LOG(SG_IO, SG_ALERT, "dds_create_reader: " SG_LOG(SG_IO, SG_ALERT, "dds_create_reader: "
@ -115,7 +129,7 @@ SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction)
if (direction == SG_IO_OUT || direction == SG_IO_BI) if (direction == SG_IO_OUT || direction == SG_IO_BI)
{ {
dds_return_t rc; dds_return_t rc;
entry = dds_create_writer(participant, topic, NULL, NULL); entry = dds_create_writer(participant, topic, NULL, NULL);
if (entry < 0) { if (entry < 0) {
@ -127,7 +141,7 @@ SG_DDS_Topic::open(dds_entity_t p, SGProtocolDir direction)
rc = dds_set_status_mask(entry, DDS_PUBLICATION_MATCHED_STATUS); rc = dds_set_status_mask(entry, DDS_PUBLICATION_MATCHED_STATUS);
if (rc != DDS_RETCODE_OK) { if (rc != DDS_RETCODE_OK) {
SG_LOG(SG_IO, SG_ALERT, "dds_set_status_mask: " SG_LOG(SG_IO, SG_ALERT, "dds_set_status_mask: "
<< dds_strretcode(-rc)); << dds_strretcode(-rc));
return false; return false;
} }
} }
@ -142,7 +156,7 @@ int
SG_DDS_Topic::read(char *buf, int length) SG_DDS_Topic::read(char *buf, int length)
{ {
int result; int result;
if (entry < 0 || length < (int)packet_size) { if (entry < 0 || length < (int)packet_size) {
return 0; return 0;
} }
@ -190,7 +204,7 @@ SG_DDS_Topic::write(const char *buf, const int length)
} }
result = dds_write(entry, buf); result = dds_write(entry, buf);
if(result != DDS_RETCODE_OK) if(result != DDS_RETCODE_OK)
{ {
errno = -result; errno = -result;
result = 0; result = 0;
@ -243,6 +257,12 @@ SG_DDS::SG_DDS(dds_domainid_t domain_id, const char *config)
<< dds_strretcode(-participant)); << dds_strretcode(-participant));
return; return;
} }
int status = dds_get_guid(participant, &guid);
if (status < 0) {
SG_LOG(SG_IO, SG_ALERT, "dds_get_guid: "
<< dds_strretcode(-status));
}
} }
waitset = dds_create_waitset(participant); waitset = dds_create_waitset(participant);
@ -263,7 +283,7 @@ SG_DDS::~SG_DDS()
bool bool
SG_DDS::add(SG_DDS_Topic *topic, const SGProtocolDir d) SG_DDS::add(SG_DDS_Topic *topic, const SGProtocolDir d)
{ {
bool result = topic->open(participant, d); bool result = topic->open(participant, guid, d);
if (result) if (result)
{ {
if (d == SG_IO_OUT || d == SG_IO_BI) if (d == SG_IO_OUT || d == SG_IO_BI)

View File

@ -57,6 +57,8 @@ private:
dds_entity_t topic = -1; dds_entity_t topic = -1;
dds_entity_t entry = -1; dds_entity_t entry = -1;
dds_guid_t guid;
public: public:
/** Create an instance of SG_DDS_Topic. */ /** Create an instance of SG_DDS_Topic. */
@ -64,9 +66,14 @@ public:
SG_DDS_Topic(const char* topic, const dds_topic_descriptor_t *desc, size_t size); SG_DDS_Topic(const char* topic, const dds_topic_descriptor_t *desc, size_t size);
// Store the pointer to the buffer to be able to call read() without
// any paramaters. Make sure the buffer is always available and of
// sufficient size to store a type T otherwise a segmentation fault
// will occur.
template<typename T> template<typename T>
SG_DDS_Topic(T& type, const dds_topic_descriptor_t *desc) : SG_DDS_Topic() { SG_DDS_Topic(T& buf, const dds_topic_descriptor_t *desc) : SG_DDS_Topic()
buffer = (char*)(&type); {
buffer = reinterpret_cast<char*>(&buf);
setup<T>(desc); setup<T>(desc);
} }
@ -77,6 +84,17 @@ public:
// a custom topic name. // a custom topic name.
void setup(const char* topic, const dds_topic_descriptor_t *desc, size_t size); void setup(const char* topic, const dds_topic_descriptor_t *desc, size_t size);
// Store the pointer to the buffer to be able to call read() without
// any paramaters. Make sure the buffer is always available and of
// sufficient size to store a type T otherwise a segmentation fault
// will occur.
template<typename T>
void setup(T& buf, const dds_topic_descriptor_t *desc) {
buffer = reinterpret_cast<char*>(&buf);
std::string type = simgear::getTypeName<T>();
setup(type.c_str(), desc, sizeof(T));
}
template<typename T> template<typename T>
void setup(const dds_topic_descriptor_t *desc) { void setup(const dds_topic_descriptor_t *desc) {
std::string type = simgear::getTypeName<T>(); std::string type = simgear::getTypeName<T>();
@ -85,22 +103,28 @@ public:
// If specified as a server start a publishing participant. // If specified as a server start a publishing participant.
// If specified as a client start a subscribing participant. // If specified as a client start a subscribing participant.
// Create a new partipant and unique identifier and establish a connection.
bool open(const SGProtocolDir d); bool open(const SGProtocolDir d);
// If specified as a server start publishing to a participant. // Establish the connection using a shared participant.
// If specified as a client start subscribing to a participant. // Create the unique identifier (GUID) from the shared participant.
bool open(dds_entity_t p, const SGProtocolDir d); bool open(dds_entity_t p, const SGProtocolDir d);
// Establish the connection using a shared participant and shared GUID.
bool open(dds_entity_t p, dds_guid_t& g, const SGProtocolDir d);
// read data from the topic. // read data from the topic.
int read(char *buf, int length); int read(char *buf, int length);
// Calling read without any parameters requires to use de constructor
// with a buffer type.
int read() { int read() {
return buffer ? read(buffer, packet_size) : 0; return buffer ? read(buffer, packet_size) : 0;
} }
template<typename T> template<typename T>
bool read(T& sample) { bool read(T& sample) {
return (read((char*)&sample, sizeof(T)) == sizeof(T)) ? true : false; return (read(reinterpret_cast<char*>(&sample), sizeof(T)) == sizeof(T)) ? true : false;
} }
// write data to the topic. // write data to the topic.
@ -108,13 +132,20 @@ public:
template<typename T> template<typename T>
bool write(const T& sample) { bool write(const T& sample) {
return (write((char*)&sample, sizeof(T)) == sizeof(T)) ? true : false; return (write(reinterpret_cast<char*>(&sample), sizeof(T)) == sizeof(T)) ? true : false;
}
// Calling write without any parameters requires to use de constructor
// with a buffer type.
int write() {
return buffer ? write(buffer, packet_size) : 0;
} }
// close the participant. // close the participant.
bool close(); bool close();
dds_entity_t get() { return entry; } dds_entity_t get() { return entry; }
const dds_guid_t& get_guid() { return guid; }
}; };
// a class to manage multiple DDS topics // a class to manage multiple DDS topics
@ -130,6 +161,8 @@ private:
std::vector<SG_DDS_Topic*> readers; std::vector<SG_DDS_Topic*> readers;
std::vector<SG_DDS_Topic*> writers; std::vector<SG_DDS_Topic*> writers;
dds_guid_t guid;
public: public:
SG_DDS(dds_domainid_t d = FG_DDS_DOMAIN, const char *c = ""); SG_DDS(dds_domainid_t d = FG_DDS_DOMAIN, const char *c = "");
@ -144,6 +177,8 @@ public:
const std::vector<SG_DDS_Topic*>& get_writers() { return writers; } const std::vector<SG_DDS_Topic*>& get_writers() { return writers; }
bool wait(float dt = std::numeric_limits<float>::max()); bool wait(float dt = std::numeric_limits<float>::max());
const dds_guid_t& get_guid() { return guid; }
}; };

View File

@ -63,24 +63,25 @@ const dds_topic_descriptor_t DDSMessageData_Msg_desc =
void testSendReceive() void testSendReceive()
{ {
DDSMessageData_Msg out = { 0, (char*)"testSendReceive" };
DDSMessageData_Msg in;
SG_DDS participant; SG_DDS participant;
SG_DDS_Topic *writer = new SG_DDS_Topic(); // writer and reader store the pointer to the parsed buffers, so it knows
SG_DDS_Topic *reader = new SG_DDS_Topic(); // the buffer location and buffer size making it possible to call read()
// and write() without any parameters if the buffers remains available
writer->setup("DDS", &DDSMessageData_Msg_desc, sizeof(DDSMessageData_Msg)); // during it's lifetime.
reader->setup("DDS", &DDSMessageData_Msg_desc, sizeof(DDSMessageData_Msg)); SG_DDS_Topic *writer = new SG_DDS_Topic(out, &DDSMessageData_Msg_desc);
SG_DDS_Topic *reader = new SG_DDS_Topic(in, &DDSMessageData_Msg_desc);
participant.add(writer, SG_IO_OUT); participant.add(writer, SG_IO_OUT);
participant.add(reader, SG_IO_IN); participant.add(reader, SG_IO_IN);
DDSMessageData_Msg out = { 0, (char*)"testSendReceive" }; writer->write();
DDSMessageData_Msg in;
writer->write((char*)&out, sizeof(out));
participant.wait(); participant.wait();
reader->read((char*)&in, sizeof(in)); reader->read();
SG_VERIFY(in.userID == out.userID); SG_VERIFY(in.userID == out.userID);
SG_VERIFY(!strcmp(in.message, out.message)); SG_VERIFY(!strcmp(in.message, out.message));