Read out the response from the socket.
83 {
85
86 while( true )
87 {
88 switch( readstage )
89 {
90
91
92
93
94 case ReadStart:
95 {
96 inmsg = std::make_shared<Message>();
97
98
99
100 readstage = ReadHeader;
101 continue;
102 }
103
104
105
106 case ReadHeader:
107 {
108 XRootDStatus st = xrdTransport.
GetHeader( *inmsg, &socket );
109 if( !st.IsOK() || st.code ==
suRetry )
110 return st;
111
112 log->Dump(
AsyncSockMsg,
"[%s] Received message header for 0x%x size: %d",
113 strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114
117 {
118 log->Dump(
AsyncSockMsg,
"[%s] Will readout the attn action code "
119 "of message 0x%x", strmname.c_str(), inmsg.get() );
120 inmsg->ReAllocate( 16 );
121 readstage = ReadAttn;
122 continue;
123 }
124
125 inmsgsize = inmsg->GetCursor();
127
128 if( inhandler )
129 {
130 log->Dump(
AsyncSockMsg,
"[%s] Will use the raw handler to read body "
131 "of message 0x%x", strmname.c_str(), inmsg.get() );
132
133
134
135 readstage = ReadRawData;
136 continue;
137 }
138
139
140
141
142 readstage = ReadMsgBody;
143 continue;
144 }
145
146
147
148 case ReadAttn:
149 {
150 XRootDStatus st = ReadAttnActnum();
151 if( !st.IsOK() || st.code ==
suRetry )
152 return st;
153
154
155
156
157 if( HasEmbeddedRsp() )
158 {
159 inmsg->Free();
160 readstage = ReadHeader;
161 continue;
162 }
163
164
165
166
167 inmsgsize = inmsg->GetCursor();
168 readstage = ReadMsgBody;
169 continue;
170 }
171
172
173
174
175 case ReadMore:
176 {
177 XRootDStatus st = xrdTransport.
GetMore( *inmsg, &socket );
178 if( !st.IsOK() || st.code ==
suRetry )
179 return st;
180 inmsgsize = inmsg->GetCursor();
181
182
183
184
185 readstage = ReadDone;
186 continue;
187 }
188
189
190
191
192 case ReadRawData:
193 {
194 uint32_t bytesRead = 0;
195 XRootDStatus st = inhandler->
ReadMessageBody( inmsg.get(), &socket, bytesRead );
196 if( !st.IsOK() )
197 return st;
198 inmsgsize += bytesRead;
200 return st;
201
202
203
204 readstage = ReadDone;
205 continue;
206 }
207
208
209
210 case ReadMsgBody:
211 {
212 XRootDStatus st = xrdTransport.
GetBody( *inmsg, &socket );
213 if( !st.IsOK() || st.code ==
suRetry )
214 return st;
215 inmsgsize = inmsg->GetCursor();
216
217
218
219
220
221
222 if( IsStatusRsp() )
223 {
225 inhandler );
226
229
231 {
232
233
234
235 readstage = ReadRawData;
236 continue;
237 }
238
240 {
241
242
243
244
245
246 readstage = ReadMore;
247 continue;
248 }
249 }
250
251
252
253
254 readstage = ReadDone;
255 continue;
256 }
257
258 case ReadDone:
259 {
260
261
262
263 log->Dump(
AsyncSockMsg,
"[%s] Received message 0x%x of %d bytes",
264 strmname.c_str(), inmsg.get(), inmsgsize );
265
266 strm.
OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267 }
268 }
269
270 break;
271 }
272
273
274
275
276 return XRootDStatus();
277 }
static Log * GetLog()
Get default log.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
@ More
there are more (non-raw) data to be read
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
virtual XRootDStatus GetMore(Message &message, Socket *socket)=0
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t AsyncSockMsg
const uint16_t errCorruptedHeader