Fast DDS  Version 3.0.0
Fast DDS
Loading...
Searching...
No Matches
CacheChange.hpp
1// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
19#ifndef FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
20#define FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
21
22#include <atomic>
23#include <cassert>
24
25#include <fastdds/rtps/common/ChangeKind_t.hpp>
26#include <fastdds/rtps/common/FragmentNumber.hpp>
27#include <fastdds/rtps/common/InstanceHandle.hpp>
28#include <fastdds/rtps/common/SerializedPayload.hpp>
29#include <fastdds/rtps/common/Time_t.hpp>
30#include <fastdds/rtps/common/Types.hpp>
31#include <fastdds/rtps/common/VendorId_t.hpp>
32#include <fastdds/rtps/common/WriteParams.hpp>
33#include <fastdds/rtps/history/IPayloadPool.hpp>
34
35namespace eprosima {
36namespace fastdds {
37namespace rtps {
38
39struct CacheChange_t;
40
45{
50 CacheChange_t* volatile previous = nullptr;
53 CacheChange_t* volatile next = nullptr;
55 std::atomic_bool is_linked {false};
56};
57
72
77struct FASTDDS_EXPORTED_API CacheChange_t
78{
82 GUID_t writerGUID{};
84 InstanceHandle_t instanceHandle{};
86 SequenceNumber_t sequenceNumber{};
88 SerializedPayload_t serializedPayload{};
90 SerializedPayload_t inline_qos{};
92 bool isRead = false;
94 Time_t sourceTimestamp{};
97
98 union
99 {
102 };
103
104 WriteParams write_params{};
105 bool is_untyped_ = true;
106
112 : writer_info()
113 {
114 inline_qos.encapsulation = DEFAULT_ENDIAN == LITTLEEND ? PL_CDR_LE : PL_CDR_BE;
115 }
116
118 const CacheChange_t&) = delete;
119 const CacheChange_t& operator =(
120 const CacheChange_t&) = delete;
121
128 uint32_t payload_size,
129 bool is_untyped = false)
130 : serializedPayload(payload_size)
131 , is_untyped_(is_untyped)
132 {
133 }
134
140 bool copy(
141 const CacheChange_t* ch_ptr)
142 {
143 kind = ch_ptr->kind;
144 writerGUID = ch_ptr->writerGUID;
145 instanceHandle = ch_ptr->instanceHandle;
146 sequenceNumber = ch_ptr->sequenceNumber;
147 sourceTimestamp = ch_ptr->sourceTimestamp;
148 reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
149 write_params = ch_ptr->write_params;
150 isRead = ch_ptr->isRead;
151 vendor_id = ch_ptr->vendor_id;
152 fragment_size_ = ch_ptr->fragment_size_;
153 fragment_count_ = ch_ptr->fragment_count_;
154 first_missing_fragment_ = ch_ptr->first_missing_fragment_;
155
156 return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
157 }
158
165 const CacheChange_t* ch_ptr)
166 {
167 kind = ch_ptr->kind;
168 writerGUID = ch_ptr->writerGUID;
169 instanceHandle = ch_ptr->instanceHandle;
170 sequenceNumber = ch_ptr->sequenceNumber;
171 sourceTimestamp = ch_ptr->sourceTimestamp;
172 reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
173 write_params = ch_ptr->write_params;
174 isRead = ch_ptr->isRead;
175 vendor_id = ch_ptr->vendor_id;
176
177 // Copy certain values from serializedPayload
178 serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
179
180 // Copy fragment size and calculate fragment count
181 setFragmentSize(ch_ptr->fragment_size_, false);
182 }
183
184 virtual ~CacheChange_t() = default;
185
190 uint32_t getFragmentCount() const
191 {
192 return fragment_count_;
193 }
194
199 uint16_t getFragmentSize() const
200 {
201 return fragment_size_;
202 }
203
209 {
210 return first_missing_fragment_ >= fragment_count_;
211 }
212
217 {
218 return 0 < first_missing_fragment_;
219 }
220
226 FragmentNumberSet_t& frag_sns)
227 {
228 // Note: Fragment numbers are 1-based but we keep them 0 based.
229 frag_sns.base(first_missing_fragment_ + 1);
230
231 // Traverse list of missing fragments, adding them to frag_sns
232 uint32_t current_frag = first_missing_fragment_;
233 while (current_frag < fragment_count_)
234 {
235 frag_sns.add(current_frag + 1);
236 current_frag = get_next_missing_fragment(current_frag);
237 }
238 }
239
250 uint16_t fragment_size,
251 bool create_fragment_list = false)
252 {
253 fragment_size_ = fragment_size;
254 fragment_count_ = 0;
255 first_missing_fragment_ = 0;
256
257 if (fragment_size > 0)
258 {
259 // This follows RTPS 8.3.7.3.5
260 fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
261
262 if (create_fragment_list)
263 {
264 // Keep index of next fragment on the payload portion at the beginning of each fragment. Last
265 // fragment will have fragment_count_ as 'next fragment index'
266 size_t offset = 0;
267 for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
268 {
269 set_next_missing_fragment(i - 1, i); // index to next fragment in missing list
270 }
271 }
272 else
273 {
274 // List not created. This means we are going to send this change fragmented, so it is already
275 // assembled, and the missing list is empty (i.e. first missing points to fragment count)
276 first_missing_fragment_ = fragment_count_;
277 }
278 }
279 }
280
282 const SerializedPayload_t& incoming_data,
283 uint32_t fragment_starting_num,
284 uint32_t fragments_in_submessage)
285 {
286 uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
287 uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
288 uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
289
290 // Validate fragment indexes
291 if (last_fragment_index > fragment_count_)
292 {
293 return false;
294 }
295
296 // validate lengths
297 if (last_fragment_index < fragment_count_)
298 {
299 if (incoming_data.length < incoming_length)
300 {
301 return false;
302 }
303 }
304 else
305 {
306 incoming_length = serializedPayload.length - original_offset;
307 }
308
309 if (original_offset + incoming_length > serializedPayload.length)
310 {
311 return false;
312 }
313
314 if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
315 {
316 memcpy(
317 &serializedPayload.data[original_offset],
318 incoming_data.data, incoming_length);
319 }
320
321 return is_fully_assembled();
322 }
323
324private:
325
326 // Fragment size
327 uint16_t fragment_size_ = 0;
328
329 // Number of fragments
330 uint32_t fragment_count_ = 0;
331
332 // First fragment in missing list
333 uint32_t first_missing_fragment_ = 0;
334
335 uint32_t get_next_missing_fragment(
336 uint32_t fragment_index)
337 {
338 uint32_t* ptr = next_fragment_pointer(fragment_index);
339 return *ptr;
340 }
341
342 void set_next_missing_fragment(
343 uint32_t fragment_index,
344 uint32_t next_fragment_index)
345 {
346 uint32_t* ptr = next_fragment_pointer(fragment_index);
347 *ptr = next_fragment_index;
348 }
349
350 uint32_t* next_fragment_pointer(
351 uint32_t fragment_index)
352 {
353 size_t offset = fragment_size_;
354 offset *= fragment_index;
355 offset = (offset + 3u) & ~3u;
356 return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
357 }
358
368 bool received_fragments(
369 uint32_t initial_fragment,
370 uint32_t num_of_fragments)
371 {
372 bool at_least_one_changed = false;
373
374 if ((fragment_size_ > 0) && (initial_fragment < fragment_count_))
375 {
376 uint32_t last_fragment = initial_fragment + num_of_fragments;
377 if (last_fragment > fragment_count_)
378 {
379 last_fragment = fragment_count_;
380 }
381
382 if (initial_fragment <= first_missing_fragment_)
383 {
384 // Perform first = *first until first >= last_received
385 while (first_missing_fragment_ < last_fragment)
386 {
387 first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
388 at_least_one_changed = true;
389 }
390 }
391 else
392 {
393 // Find prev in missing list
394 uint32_t current_frag = first_missing_fragment_;
395 while (current_frag < initial_fragment)
396 {
397 uint32_t next_frag = get_next_missing_fragment(current_frag);
398 if (next_frag >= initial_fragment)
399 {
400 // This is the fragment previous to initial_fragment.
401 // Find future value for next by repeating next = *next until next >= last_fragment.
402 uint32_t next_missing_fragment = next_frag;
403 while (next_missing_fragment < last_fragment)
404 {
405 next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
406 at_least_one_changed = true;
407 }
408
409 // Update next and finish loop
410 if (at_least_one_changed)
411 {
412 set_next_missing_fragment(current_frag, next_missing_fragment);
413 }
414 break;
415 }
416 current_frag = next_frag;
417 }
418 }
419 }
420
421 return at_least_one_changed;
422 }
423
424};
425
426} // namespace rtps
427} // namespace fastdds
428} // namespace eprosima
429
430#endif // FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
Template class to hold a range of items using a custom bitmap.
Definition fixed_size_bitmap.hpp:76
T base() const noexcept
Get base of the range.
Definition fixed_size_bitmap.hpp:131
bool add(const T &item) noexcept
Adds an element to the range.
Definition fixed_size_bitmap.hpp:295
Structure Time_t, used to describe times at RTPS protocol.
Definition Time_t.hpp:38
This class contains additional information of a CacheChange.
Definition WriteParams.hpp:34
@ LITTLEEND
Little endianness.
Definition Types.hpp:44
eprosima::fastdds::rtps::VendorId_t VendorId_t
Structure VendorId_t, specifying the vendor Id of the implementation.
Definition Types.hpp:163
const VendorId_t c_VendorId_Unknown
Definition VendorId_t.hpp:34
constexpr Endianness_t DEFAULT_ENDIAN
Definition Types.hpp:80
ChangeKind_t
Enumerates the different types of CacheChange_t.
Definition ChangeKind_t.hpp:38
@ ALIVE
ALIVE.
Definition ChangeKind_t.hpp:39
eProsima namespace.
Structure CacheChange_t, contains information on a specific CacheChange.
Definition CacheChange.hpp:78
ChangeKind_t kind
Kind of change, default value ALIVE.
Definition CacheChange.hpp:80
bool is_untyped_
Definition CacheChange.hpp:105
bool contains_first_fragment()
Checks if the first fragment is present.
Definition CacheChange.hpp:216
InstanceHandle_t instanceHandle
Handle of the data associated with this change.
Definition CacheChange.hpp:84
bool isRead
Indicates if the cache has been read (only used in READERS)
Definition CacheChange.hpp:92
fastdds::rtps::VendorId_t vendor_id
Vendor Id of the writer that generated this change.
Definition CacheChange.hpp:96
Time_t sourceTimestamp
Source TimeStamp.
Definition CacheChange.hpp:94
CacheChange_t(const CacheChange_t &)=delete
void setFragmentSize(uint16_t fragment_size, bool create_fragment_list=false)
Set fragment size for this change.
Definition CacheChange.hpp:249
CacheChangeReaderInfo_t reader_info
Definition CacheChange.hpp:100
void copy_not_memcpy(const CacheChange_t *ch_ptr)
Copy information form a different change into this one.
Definition CacheChange.hpp:164
uint16_t getFragmentSize() const
Get the size of each fragment this change is split into.
Definition CacheChange.hpp:199
uint32_t getFragmentCount() const
Get the number of fragments this change is split into.
Definition CacheChange.hpp:190
bool copy(const CacheChange_t *ch_ptr)
Copy a different change into this one.
Definition CacheChange.hpp:140
bool add_fragments(const SerializedPayload_t &incoming_data, uint32_t fragment_starting_num, uint32_t fragments_in_submessage)
Definition CacheChange.hpp:281
CacheChangeWriterInfo_t writer_info
Definition CacheChange.hpp:101
GUID_t writerGUID
GUID_t of the writer that generated this change.
Definition CacheChange.hpp:82
CacheChange_t(uint32_t payload_size, bool is_untyped=false)
Constructor with payload size.
Definition CacheChange.hpp:127
SerializedPayload_t serializedPayload
Serialized Payload associated with the change.
Definition CacheChange.hpp:88
void get_missing_fragments(FragmentNumberSet_t &frag_sns)
Fills a FragmentNumberSet_t with the list of missing fragments.
Definition CacheChange.hpp:225
SequenceNumber_t sequenceNumber
SequenceNumber of the change.
Definition CacheChange.hpp:86
CacheChange_t()
Default constructor.
Definition CacheChange.hpp:111
WriteParams write_params
Definition CacheChange.hpp:104
bool is_fully_assembled()
Checks if all fragments have been received.
Definition CacheChange.hpp:208
Specific information for a reader.
Definition CacheChange.hpp:62
Time_t receptionTimestamp
Reception TimeStamp (only used in Readers)
Definition CacheChange.hpp:64
int32_t no_writers_generation_count
No-writers generation of the instance when this entry was added to it.
Definition CacheChange.hpp:68
uint32_t writer_ownership_strength
Ownership stregth of its writer when the sample was received.
Definition CacheChange.hpp:70
int32_t disposed_generation_count
Disposed generation of the instance when this entry was added to it.
Definition CacheChange.hpp:66
Specific information for a writer.
Definition CacheChange.hpp:45
std::atomic_bool is_linked
Used to know if the object is already in a list.
Definition CacheChange.hpp:55
size_t num_sent_submessages
Number of DATA / DATA_FRAG submessages sent to the transport (only used in Writers)
Definition CacheChange.hpp:47
CacheChange_t *volatile next
Used to link with next node in a list.
Definition CacheChange.hpp:53
CacheChange_t *volatile previous
Used to link with previous node in a list.
Definition CacheChange.hpp:50
Structure GUID_t, entity identifier, unique in DDS-RTPS Domain.
Definition Guid.hpp:40
Struct InstanceHandle_t, used to contain the key for WITH_KEY topics.
Definition InstanceHandle.hpp:154
Structure SequenceNumber_t, different for each change in the same writer.
Definition SequenceNumber.hpp:38
Structure SerializedPayload_t.
Definition SerializedPayload.hpp:59
octet * data
Pointer to the data.
Definition SerializedPayload.hpp:68
uint16_t encapsulation
Encapsulation of the data as suggested in the RTPS 2.1 specification chapter 10.
Definition SerializedPayload.hpp:64
uint32_t length
Actual length of the data.
Definition SerializedPayload.hpp:66