2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 <2017> 2018 2019 2020 2021 2022 2023 2024 | Index | 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 <2017> 2018 2019 2020 2021 2022 2023 2024 |
<== Date ==> | <== Thread ==> |
---|
Subject: | asynManager changes |
From: | Mark Rivers <[email protected]> |
To: | 'Eric Norum' <[email protected]>, "'Johnson, Andrew N.'" <[email protected]> |
Cc: | "'[email protected]'" <[email protected]> |
Date: | Tue, 12 Sep 2017 21:47:43 +0000 |
Hi Eric, Andrew and others, I ran into a problem with asyn when working on the Modbus driver the other day. The problem is with pasynManager->queueLockPort(), which is called from asynXXXSyncIO()
functions. Here is a scenario where the problem occurs when using the Modbus driver. - A drvModbusIPPort is created to talk TCP to the Modbus device. - The asynOption "disconnectOnReadTimeout" is set to "Y" for this TCP port. This is needed in order to have a speedy recovery if the network is briefly disconnected.
If this is not used then it takes the OS a very long time to decide that the socket is dead and to report that it is disconnected so that asyn will reconnect it. - There are several Modbus input drivers, each of which is polling a range of registers. They use the asynOctetSyncIO functions to read in their polling loop. - If the network cable is disconnected one of the Modbus drivers will wait for the timeout period before getting a timeout. Meanwhile the other Modbus drivers have called
pasynOctetSyncIO->writeRead, which calls pasynManager->queueLockPort, which in turn calls pasynManager->queueRequest
with no timeout. - When the read timeout occurs the TCP port will be disconnected. While it is disconnected the drivers that call queueLockPort will be hung, because the queueRequest
will not complete until the port reconnects. In other words pasynOctetSyncIO->writeRead() is hung for those other drivers until the TCP port reconnects. - This is not what we want, we want pasynOctetSyncIO->writeRead() to return an error after a certain period of time. This allows the Modbus driver to do callbacks to
device support putting I/O Intr scanned input records into invalid alarm. - The problem is that the call to pasynManager->queueRequest to lock the port should have a timeout, with a timeout handler routine that allows pasynOctetSyncIO->writeRead()
to complete and return an error status. - I have modified asynManager.c to do this. I had to decide what value to use for the queueRequest timeout. One possibility is to use the pasynUser->timeout for the
actual I/O request. But this is not really appropriate, because each transaction might be relatively fast, but there could be many transactions in the queue and we don't want the queueRequest to timeout when things are "normal". I decided on the following: - The port structure has a new queueLockPortTimeout value. This is set to a default of 2.0 seconds when the port is created. - This can be changed with a new iocsh command asynSetQueueLockPortTimeout(portName, timeout) - If the pasynUser->timeout that is passed for a particular I/O operation is larger than the port timeout value this larger value is used instead. I made the major mistake of not working on a branch for this, so I can't make a formal pull request. But I have attached the diffs from R4-31 that show what I have done. The changes are in the master branch on Github, and the release notes can be viewed as HTML here:
http://cars9.uchicago.edu/software/epics/RELEASE_NOTES.html I'd appreciate any comments. Thanks, Mark |
diff --git a/asyn/asynDriver/asynDriver.h b/asyn/asynDriver/asynDriver.h index 153e158..41f53cb 100644 --- a/asyn/asynDriver/asynDriver.h +++ b/asyn/asynDriver/asynDriver.h @@ -125,6 +125,7 @@ typedef struct asynManager { asynStatus (*unlockPort)(asynUser *pasynUser); asynStatus (*queueLockPort)(asynUser *pasynUser); asynStatus (*queueUnlockPort)(asynUser *pasynUser); + asynStatus (*setQueueLockPortTimeout)(asynUser *pasynUser, double timeout); asynStatus (*canBlock)(asynUser *pasynUser,int *yesNo); asynStatus (*getAddr)(asynUser *pasynUser,int *addr); asynStatus (*getPortName)(asynUser *pasynUser,const char **pportName); diff --git a/asyn/asynDriver/asynManager.c b/asyn/asynDriver/asynManager.c index 31d0cd4..b638965 100644 --- a/asyn/asynDriver/asynManager.c +++ b/asyn/asynDriver/asynManager.c @@ -46,6 +46,7 @@ #define DEFAULT_TRACE_BUFFER_SIZE 80 #define DEFAULT_SECONDS_BETWEEN_PORT_CONNECT 20 #define DEFAULT_AUTOCONNECT_TIMEOUT 0.5 +#define DEFAULT_QUEUE_LOCK_PORT_TIMEOUT 2.0 /* This is taken from dbDefs.h, which we don't want to include */ /* Subtract member byte offset, returning pointer to parent object */ @@ -210,11 +211,11 @@ struct port { asynInterface *pcommonInterface; epicsTimerId connectTimer; epicsThreadPrivateId queueLockPortId; + double queueLockPortTimeout; /* The following are for timestamp support */ epicsTimeStamp timeStamp; timeStampCallback timeStampSource; void *timeStampPvt; - double secondsBetweenPortConnect; }; @@ -285,6 +286,7 @@ static asynStatus lockPort(asynUser *pasynUser); static asynStatus unlockPort(asynUser *pasynUser); static asynStatus queueLockPort(asynUser *pasynUser); static asynStatus queueUnlockPort(asynUser *pasynUser); +static asynStatus setQueueLockPortTimeout(asynUser *pasynUser, double timeout); static asynStatus canBlock(asynUser *pasynUser,int *yesNo); static asynStatus getAddr(asynUser *pasynUser,int *addr); static asynStatus getPortName(asynUser *pasynUser,const char **pportName); @@ -345,6 +347,7 @@ static asynManager manager = { unlockPort, queueLockPort, queueUnlockPort, + setQueueLockPortTimeout, canBlock, getAddr, getPortName, @@ -925,6 +928,27 @@ static void queueLockPortCallback(asynUser *pasynUser) epicsEventSignal(plockPortPvt->queueLockPortEvent); } +static void queueLockPortTimeoutCallback(asynUser *pasynUser) +{ + userPvt *puserPvt = asynUserToUserPvt(pasynUser); + port *pport = puserPvt->pport; + queueLockPortPvt *plockPortPvt = pasynUser->userPvt; + + asynPrint(pasynUser, ASYN_TRACE_WARNING, + "%s asynManager::queueLockPortTimeoutCallback WARNING: queueLockPort timeout\n", + pport->portName); + + /* Set the pasynUser->auxStatus to asynTimeout to signal error to caller */ + pasynUser->auxStatus = asynTimeout; + + /* Signal the epicsEvent to let the waiting thread we have been called */ + asynPrint(pasynUser, ASYN_TRACE_FLOW, + "%s asynManager::queueLockPortTimeoutCallback signaling begin event\n", + pport->portName); + epicsEventSignal(plockPortPvt->queueLockPortEvent); + + /* queueUnlockPort is not going to be called because queueLockPort will have returned an error */ +}; /* asynManager methods */ static void reportPrintInterfaceList(FILE *fp,ELLLIST *plist,const char *title) @@ -1730,6 +1754,7 @@ static asynStatus queueLockPort(asynUser *pasynUser) queueLockPortPvt *plockPortPvt; asynUser *pasynUserCopy; asynStatus status = asynSuccess; + double timeout; asynPrint(pasynUser,ASYN_TRACE_FLOW, "%s asynManager::queueLockPort locking port\n", pport->portName); if(!pport) { @@ -1757,20 +1782,24 @@ static asynStatus queueLockPort(asynUser *pasynUser) plockPortPvt->queueLockPortCount++; return status; } - pasynUserCopy = pasynManager->duplicateAsynUser(pasynUser, queueLockPortCallback, 0); + pasynUserCopy = pasynManager->duplicateAsynUser(pasynUser, queueLockPortCallback, queueLockPortTimeoutCallback); if (!pasynUserCopy){ epicsSnprintf(pasynUser->errorMessage,pasynUser->errorMessageSize, "asynManager::queueLockPort duplicateAsynUser failed"); return asynError; } pasynUserCopy->userPvt = plockPortPvt; + /* Set the auxStatus field to asynSuccess. If it is not asynSuccess later we know queueRequest timed out */ + pasynUserCopy->auxStatus = asynSuccess; /* Take the mutex which will block the callback port thread */ /* Wait until the queued request executes the callback */ asynPrint(pasynUser,ASYN_TRACE_FLOW, "%s asynManager::queueLockPort taking mutex %p\n", pport->portName, plockPortPvt->queueLockPortMutex); epicsMutexMustLock(plockPortPvt->queueLockPortMutex); asynPrint(pasynUser,ASYN_TRACE_FLOW, "%s asynManager::queueLockPort queueing request\n", pport->portName); - status = pasynManager->queueRequest(pasynUserCopy, asynQueuePriorityLow, 0.0); + timeout = pport->queueLockPortTimeout; + if (pasynUserCopy->timeout > timeout) timeout = pasynUserCopy->timeout; + status = pasynManager->queueRequest(pasynUserCopy, asynQueuePriorityLow, timeout); if (status) { epicsSnprintf(pasynUser->errorMessage,pasynUser->errorMessageSize, "asynManager::queueLockPort queueRequest failed: %s", @@ -1782,6 +1811,14 @@ static asynStatus queueLockPort(asynUser *pasynUser) /* Wait for event from the port thread in the queueLockPortCallback function */ asynPrint(pasynUser,ASYN_TRACE_FLOW, "%s asynManager::queueLockPort waiting for event\n", pport->portName); epicsEventMustWait(plockPortPvt->queueLockPortEvent); + /* Check the auxStatus value */ + if (pasynUserCopy->auxStatus != asynSuccess) { + epicsSnprintf(pasynUser->errorMessage,pasynUser->errorMessageSize, + "asynManager::queueLockPort queueRequest timed out"); + epicsMutexUnlock(plockPortPvt->queueLockPortMutex); + pasynManager->freeAsynUser(pasynUserCopy); + return asynError; + } pasynManager->freeAsynUser(pasynUserCopy); asynPrint(pasynUser,ASYN_TRACE_FLOW, "%s asynManager::queueLockPort got event from callback\n", pport->portName); /* Remember that the port is locked by this thread. */ @@ -1917,6 +1954,7 @@ static asynStatus registerPort(const char *portName, pport->timeStampSource = defaultTimeStampSource; dpCommonInit(pport,0,autoConnect); pport->pasynUser = createAsynUser(0,0); + pport->queueLockPortTimeout = DEFAULT_QUEUE_LOCK_PORT_TIMEOUT; ellInit(&pport->deviceList); ellInit(&pport->interfaceList); if((attributes&ASYN_CANBLOCK)) { @@ -2149,6 +2187,22 @@ static asynStatus setAutoConnectTimeout(double timeout) return asynSuccess; } +static asynStatus setQueueLockPortTimeout(asynUser *pasynUser, double timeout) +{ + userPvt *puserPvt = asynUserToUserPvt(pasynUser); + port *pport = puserPvt->pport; + + if(!pport) { + epicsSnprintf(pasynUser->errorMessage,pasynUser->errorMessageSize, + "asynManager:setQueueLockPortTimeout not connected to device"); + return asynError; + } + epicsMutexMustLock(pport->asynManagerLock); + pport->queueLockPortTimeout = timeout; + epicsMutexUnlock(pport->asynManagerLock); + return asynSuccess; +} + static asynStatus registerInterruptSource(const char *portName, asynInterface *pasynInterface, void **pasynPvt) { @@ -2908,6 +2962,14 @@ static int traceVprintIOSource(asynUser *pasynUser,int reason, nout += errlogPrintf("\n"); } } + /* If the traceIOMask is 0 or traceTruncateSize <=0 we need to output a newline */ + if((traceIOMask == 0) || (traceTruncateSize <=0)) { + if(fp) { + nout += fprintf(fp,"\n"); + } else { + nout += errlogPrintf("\n"); + } + } fflush(fp); epicsMutexUnlock(pasynBase->lockTrace); return nout; diff --git a/asyn/miscellaneous/asynShellCommands.c b/asyn/miscellaneous/asynShellCommands.c index 7262087..b3fc656 100644 --- a/asyn/miscellaneous/asynShellCommands.c +++ b/asyn/miscellaneous/asynShellCommands.c @@ -1143,6 +1143,37 @@ static void asynSetMinTimerPeriodCall(const iocshArgBuf *args) asynSetMinTimerPeriod(args[0].dval); } +static const iocshArg asynSetQueueLockPortTimeoutArg0 = {"portName", iocshArgString}; +static const iocshArg asynSetQueueLockPortTimeoutArg1 = {"timeout", iocshArgDouble}; +static const iocshArg *const asynSetQueueLockPortTimeoutArgs[] = { + &asynSetQueueLockPortTimeoutArg0,&asynSetQueueLockPortTimeoutArg1}; +static const iocshFuncDef asynSetQueueLockPortTimeoutDef = + {"asynSetQueueLockPortTimeout", 2, asynSetQueueLockPortTimeoutArgs}; +epicsShareFunc int + asynSetQueueLockPortTimeout(const char *portName, double timeout) +{ + asynUser *pasynUser; + asynStatus status; + + pasynUser = pasynManager->createAsynUser(0,0); + status = pasynManager->connectDevice(pasynUser,portName,0); + if(status!=asynSuccess) { + printf("%s\n",pasynUser->errorMessage); + pasynManager->freeAsynUser(pasynUser); + return -1; + } + status = pasynManager->setQueueLockPortTimeout(pasynUser,timeout); + if(status!=asynSuccess) { + printf("%s\n",pasynUser->errorMessage); + } + pasynManager->freeAsynUser(pasynUser); + return 0; +} +static void asynSetQueueLockPortTimeoutCall(const iocshArgBuf * args) { + const char *portName = args[0].sval; + double timeout = args[1].dval; + asynSetQueueLockPortTimeout(portName,timeout); +} static void asynRegister(void) { @@ -1159,6 +1190,7 @@ static void asynRegister(void) iocshRegister(&asynSetTraceIOTruncateSizeDef,asynSetTraceIOTruncateSizeCall); iocshRegister(&asynEnableDef,asynEnableCall); iocshRegister(&asynAutoConnectDef,asynAutoConnectCall); + iocshRegister(&asynSetQueueLockPortTimeoutDef,asynSetQueueLockPortTimeoutCall); iocshRegister(&asynOctetConnectDef,asynOctetConnectCall); iocshRegister(&asynOctetDisconnectDef,asynOctetDisconnectCall); iocshRegister(&asynOctetReadDef,asynOctetReadCall); diff --git a/asyn/miscellaneous/asynShellCommands.h b/asyn/miscellaneous/asynShellCommands.h index b0bc0c4..97aaf4c 100644 --- a/asyn/miscellaneous/asynShellCommands.h +++ b/asyn/miscellaneous/asynShellCommands.h @@ -63,6 +63,8 @@ epicsShareFunc int asynUnregisterTimeStampSource(const char *portName); epicsShareFunc int asynSetMinTimerPeriod(double period); +epicsShareFunc int + asynSetQueueLockPortTimeout(const char *portName, double timeout); #ifdef __cplusplus }