pvaClientCPP 4.8.1
pvaClientRPC.cpp
Go to the documentation of this file.
1/* pvaClientRPC.cpp */
12#include <sstream>
13#include <pv/event.h>
14#include <pv/bitSetUtil.h>
15#include <pv/rpcService.h>
16
17#define epicsExportSharedSymbols
18
19#include <pv/pvaClient.h>
20
21using namespace epics::pvData;
22using namespace epics::pvAccess;
23using namespace std;
24
25namespace epics { namespace pvaClient {
26
27class RPCRequesterImpl : public ChannelRPCRequester
28{
29 PvaClientRPC::weak_pointer pvaClientRPC;
30 PvaClient::weak_pointer pvaClient;
31public:
32 RPCRequesterImpl(
33 PvaClientRPCPtr const & pvaClientRPC,
34 PvaClientPtr const &pvaClient)
35 : pvaClientRPC(pvaClientRPC),
36 pvaClient(pvaClient)
37 {}
38 virtual ~RPCRequesterImpl() {
39 if(PvaClient::getDebug()) std::cout << "~RPCRequesterImpl" << std::endl;
40 }
41
42 virtual std::string getRequesterName() {
43 PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
44 if(!clientRPC) return string("pvaClientRPC is null");
45 return clientRPC->getRequesterName();
46 }
47
48 virtual void message(std::string const & message, MessageType messageType) {
49 PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
50 if(!clientRPC) return;
51 clientRPC->message(message,messageType);
52 }
53
54 virtual void channelRPCConnect(
55 const Status& status,
56 ChannelRPC::shared_pointer const & channelRPC)
57 {
58 PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
59 if(!clientRPC) return;
60 clientRPC->rpcConnect(status,channelRPC);
61 }
62
63 virtual void requestDone(
64 const Status& status,
65 ChannelRPC::shared_pointer const & channelRPC,
66 PVStructure::shared_pointer const & pvResponse)
67 {
68 PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
69 if(!clientRPC) return;
70 clientRPC->requestDone(status,channelRPC,pvResponse);
71 }
72};
73
75 PvaClientPtr const &pvaClient,
76 Channel::shared_pointer const & channel)
77{
78 StructureConstPtr structure(getFieldCreate()->createStructure());
79 PVStructurePtr pvRequest(getPVDataCreate()->createPVStructure(structure));
80 return create(pvaClient,channel,pvRequest);
81}
83 PvaClientPtr const &pvaClient,
84 Channel::shared_pointer const & channel,
85 PVStructurePtr const &pvRequest)
86{
87 PvaClientRPCPtr epv(new PvaClientRPC(pvaClient,channel,pvRequest));
88 epv->rpcRequester = RPCRequesterImplPtr(
89 new RPCRequesterImpl(epv,pvaClient));
90 return epv;
91}
92
93PvaClientRPC::PvaClientRPC(
94 PvaClientPtr const &pvaClient,
95 Channel::shared_pointer const & channel,
96 PVStructurePtr const &pvRequest)
97:
98 connectState(connectIdle),
99 pvaClient(pvaClient),
100 channel(channel),
101 pvRequest(pvRequest),
102 rpcState(rpcIdle),
103 responseTimeout(0.0)
104{
105 if(PvaClient::getDebug()) {
106 cout<< "PvaClientRPC::PvaClientRPC()"
107 << " channelName " << channel->getChannelName()
108 << endl;
109 }
110}
111
113{
114 if(PvaClient::getDebug()) {
115 string channelName("disconnected");
116 Channel::shared_pointer chan(channel.lock());
117 if(chan) channelName = chan->getChannelName();
118 cout<< "PvaClientRPC::~PvaClientRPC"
119 << " channelName " << channelName
120 << endl;
121 }
122}
123
124void PvaClientRPC::checkRPCState()
125{
126 if(PvaClient::getDebug()) {
127 string channelName("disconnected");
128 Channel::shared_pointer chan(channel.lock());
129 if(chan) channelName = chan->getChannelName();
130 cout << "PvaClientRPC::checkRPCState"
131 << " channelName " << channelName
132 << " connectState " << connectState
133 << endl;
134 }
135 if(connectState==connectIdle) connect();
136}
137
138string PvaClientRPC::getRequesterName()
139{
140 PvaClientPtr yyy = pvaClient.lock();
141 if(!yyy) return string("PvaClientRPC::getRequesterName() PvaClient isDestroyed");
142 return yyy->getRequesterName();
143}
144
145void PvaClientRPC::message(string const & message,MessageType messageType)
146{
147 PvaClientPtr yyy = pvaClient.lock();
148 if(!yyy) return;
149 yyy->message(message, messageType);
150}
151
152void PvaClientRPC::rpcConnect(
153 const Status& status,
154 ChannelRPC::shared_pointer const & channelRPC)
155{
156 Channel::shared_pointer chan(channel.lock());
157 if(PvaClient::getDebug()) {
158 string channelName("disconnected");
159 Channel::shared_pointer chan(channel.lock());
160 if(chan) channelName = chan->getChannelName();
161 cout << "PvaClientRPC::rpcConnect"
162 << " channelName " << channelName
163 << " status.isOK " << (status.isOK() ? "true" : "false")
164 << endl;
165 }
166 if(!chan) return;
167 connectStatus = status;
168 connectState = connected;
169 if(PvaClient::getDebug()) {
170 cout << "PvaClientRPC::rpcConnect calling waitForConnect.signal\n";
171 }
172 waitForConnect.signal();
173
174}
175
176void PvaClientRPC::requestDone(
177 const Status& status,
178 ChannelRPC::shared_pointer const & channelRPC,
179 PVStructure::shared_pointer const & pvResponse)
180{
181 PvaClientRPCRequesterPtr req = pvaClientRPCRequester.lock();
182 {
183 Lock xx(mutex);
184 requestStatus = status;
185 if(PvaClient::getDebug()) {
186 string channelName("disconnected");
187 Channel::shared_pointer chan(channel.lock());
188 if(chan) channelName = chan->getChannelName();
189 cout << "PvaClientRPC::requestDone"
190 << " channelName " << channelName
191 << endl;
192 }
193 if(rpcState!=rpcActive) {
194 string channelName("disconnected");
195 Channel::shared_pointer chan(channel.lock());
196 if(chan) channelName = chan->getChannelName();
197 string message = "channel "
198 + channelName
199 +" PvaClientRPC::requestDone"
200 + " but not active";
201 throw std::runtime_error(message);
202 }
203 if(req && (responseTimeout<=0.0)) {
204 rpcState = rpcIdle;
205 } else {
206 rpcState = rpcComplete;
207 if(!req) this->pvResponse = pvResponse;
208 waitForDone.signal();
209 }
210 }
211 if(req) {
212 req->requestDone(status,shared_from_this(),pvResponse);
213 }
214}
215
217{
218 if(PvaClient::getDebug()) cout << "PvaClientRPC::connect\n";
219 issueConnect();
220 Status status = waitConnect();
221 if(status.isOK()) return;
222 Channel::shared_pointer chan(channel.lock());
223 string channelName("disconnected");
224 if(chan) channelName = chan->getChannelName();
225 string message = string("channel ")
226 + channelName
227 + " PvaClientRPC::connect "
228 + status.getMessage();
229 throw RPCRequestException(Status::STATUSTYPE_ERROR,message);
230}
231
233{
234 if(PvaClient::getDebug()) cout << "PvaClientRPC::issueConnect\n";
235 Channel::shared_pointer chan(channel.lock());
236 if(connectState!=connectIdle) {
237 string channelName("disconnected");
238 if(chan) channelName = chan->getChannelName();
239 string message = string("channel ")
240 + channelName
241 + " pvaClientRPC already connected ";
242 throw std::runtime_error(message);
243 }
244 if(chan) {
245 connectState = connectActive;
246 channelRPC = chan->createChannelRPC(rpcRequester,pvRequest);
247 return;
248 }
249 throw std::runtime_error("PvaClientRPC::issueConnect() but channel disconnected");
250}
251
253{
254 if(PvaClient::getDebug()) cout << "PvaClientRPC::waitConnect\n";
255 if(connectState==connected) {
256 if(!connectStatus.isOK()) connectState = connectIdle;
257 return connectStatus;
258 }
259 if(connectState!=connectActive) {
260 Channel::shared_pointer chan(channel.lock());
261 string channelName("disconnected");
262 if(chan) channelName = chan->getChannelName();
263 string message = string("channel ")
264 + channelName
265 + " PvaClientRPC::waitConnect illegal connect state ";
266 throw std::runtime_error(message);
267 }
268 if(PvaClient::getDebug()) {
269 cout << "PvaClientRPC::waitConnect calling waitForConnect.wait\n";
270 }
271 waitForConnect.wait();
272 connectState = connectStatus.isOK() ? connected : connectIdle;
273 if(PvaClient::getDebug()) {
274 cout << "PvaClientRPC::waitConnect"
275 << " connectStatus " << (connectStatus.isOK() ? "connected" : "not connected");
276 }
277 return connectStatus;
278}
279
280PVStructure::shared_pointer PvaClientRPC::request(PVStructure::shared_pointer const & pvArgument)
281{
282 checkRPCState();
283 {
284 Lock xx(mutex);
285 if(rpcState!=rpcIdle) {
286 Channel::shared_pointer chan(channel.lock());
287 string channelName("disconnected");
288 if(chan) channelName = chan->getChannelName();
289 string message = "channel "
290 + channelName
291 + " PvaClientRPC::request request aleady active ";
292 throw std::runtime_error(message);
293 }
294 rpcState = rpcActive;
295 }
296 channelRPC->request(pvArgument);
297 if(responseTimeout>0.0) {
298 waitForDone.wait(responseTimeout);
299 } else {
300 waitForDone.wait();
301 }
302 Lock xx(mutex);
303 if(rpcState!=rpcComplete) {
304 Channel::shared_pointer chan(channel.lock());
305 string channelName("disconnected");
306 if(chan) channelName = chan->getChannelName();
307 string message = "channel "
308 + channelName
309 + " PvaClientRPC::request request timeout ";
310 throw RPCRequestException(Status::STATUSTYPE_ERROR,message);
311 }
312 rpcState = rpcIdle;
313 if(requestStatus.isOK()) return pvResponse;
314 Channel::shared_pointer chan(channel.lock());
315 string channelName("disconnected");
316 if(chan) channelName = chan->getChannelName();
317 string message = "channel "
318 + channelName
319 + " PvaClientRPC::request status ";
320 message += requestStatus.getMessage();
321 throw RPCRequestException(Status::STATUSTYPE_ERROR,message);
322}
323
324
326 PVStructure::shared_pointer const & pvArgument,
327 PvaClientRPCRequesterPtr const & pvaClientRPCRequester)
328{
329 checkRPCState();
330 this->pvaClientRPCRequester = pvaClientRPCRequester;
331 if(responseTimeout<=0.0) {
332 {
333 Lock xx(mutex);
334 if(rpcState!=rpcIdle) {
335 Channel::shared_pointer chan(channel.lock());
336 string channelName("disconnected");
337 if(chan) channelName = chan->getChannelName();
338 string message = "channel "
339 + channelName
340 + " PvaClientRPC::request request aleady active ";
341 throw std::runtime_error(message);
342 }
343 rpcState = rpcActive;
344 }
345 channelRPC->request(pvArgument);
346 return;
347 }
348 request(pvArgument);
349}
350
351
352}}
static PvaClientRPCPtr create(PvaClientPtr const &pvaClient, epics::pvAccess::Channel::shared_pointer const &channel)
Create a PvaClientRPC.
epics::pvData::PVStructure::shared_pointer request(epics::pvData::PVStructure::shared_pointer const &pvArgument)
Issue a request and wait for response.
void connect()
Call issueConnect and then waitConnect.
epics::pvData::Status waitConnect()
Wait until the channelRPC connection to the channel is complete.
void issueConnect()
Issue the channelRPC connection to the channel.
static bool getDebug()
Is debug set?
Definition pvaClient.cpp:97
std::tr1::shared_ptr< PvaClientRPCRequester > PvaClientRPCRequesterPtr
Definition pvaClient.h:89
std::tr1::shared_ptr< PvaClientRPC > PvaClientRPCPtr
Definition pvaClient.h:87
std::tr1::shared_ptr< RPCRequesterImpl > RPCRequesterImplPtr
Definition pvaClient.h:1679
std::tr1::shared_ptr< PvaClient > PvaClientPtr
Definition pvaClient.h:47