Experimental Physics and Industrial Control System
Ralph Lange has proposed merging lp:~epics-core/epics-base/parallel-cbthreads-2 into lp:epics-base with lp:~epics-core/epics-base/ioc-shutdown as a prerequisite.
Requested reviews:
mdavidsaver (mdavidsaver)
EPICS Core Developers (epics-core)
For more details, see:
https://code.launchpad.net/~epics-core/epics-base/parallel-cbthreads-2/+merge/192255
Implements parallel callback threads on multi-core (SMP) systems.
This new branch is based on the original parallel-cbthreads branch.
It also includes Michael's update-callback branch, and a new dbScanIoTest,
which also depends on the ioc-shutdown branch to start/stop iocCore for multiple tests.
--
https://code.launchpad.net/~epics-core/epics-base/parallel-cbthreads-2/+merge/192255
Your team EPICS Core Developers is requested to review the proposed merge of lp:~epics-core/epics-base/parallel-cbthreads-2 into lp:epics-base.
=== modified file 'src/ioc/db/callback.c'
--- src/ioc/db/callback.c 2013-10-22 21:58:26 +0000
+++ src/ioc/db/callback.c 2013-10-22 21:58:27 +0000
@@ -3,8 +3,9 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
-* in file LICENSE that is included with this distribution.
+* in file LICENSE that is included with this distribution.
\*************************************************************************/
/* callback.c */
@@ -25,6 +26,7 @@
#include "epicsThread.h"
#include "epicsExit.h"
#include "epicsInterrupt.h"
+#include "epicsString.h"
#include "epicsTimer.h"
#include "epicsRingPointer.h"
#include "errlog.h"
@@ -36,6 +38,7 @@
#include "taskwd.h"
#include "errMdef.h"
#include "dbCommon.h"
+#include "epicsExport.h"
#define epicsExportSharedSymbols
#include "dbAddr.h"
#include "dbAccessDefs.h"
@@ -49,6 +52,12 @@
static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES];
static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES];
+/* Parallel callback threads (configured and actual counts) */
+static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 };
+static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES];
+int callbackParallelThreadsDefault = 2;
+epicsExportAddress(int,callbackParallelThreadsDefault);
+
/* Timer for Delayed Requests */
static epicsTimerQueueId timerQueue;
@@ -59,7 +68,7 @@
static void *exitCallback;
/* Static data */
-static char *threadName[NUM_CALLBACK_PRIORITIES] = {
+static char *threadNamePrefix[NUM_CALLBACK_PRIORITIES] = {
"cbLow", "cbMedium", "cbHigh"
};
static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = {
@@ -80,6 +89,54 @@
return 0;
}
+int callbackParallelThreads(int count, const char *prio)
+{
+ int i;
+ dbMenu *pdbMenu;
+ int gotMatch;
+
+ if (callbackOnceFlag != EPICS_THREAD_ONCE_INIT) {
+ errlogPrintf("Callback system already initialized\n");
+ return -1;
+ }
+
+ if (count < 0)
+ count = epicsThreadGetCPUs() + count;
+ else if (count == 0)
+ count = callbackParallelThreadsDefault;
+
+ if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) {
+ for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
+ callbackThreadCount[i] = count;
+ }
+ } else {
+ if (!pdbbase) {
+ errlogPrintf("pdbbase not specified\n");
+ return -1;
+ }
+ /* Find prio in menuPriority */
+ pdbMenu = (dbMenu *)ellFirst(&pdbbase->menuList);
+ while (pdbMenu) {
+ gotMatch = (strcmp("menuPriority", pdbMenu->name)==0) ? TRUE : FALSE;
+ if (gotMatch) {
+ for (i = 0; i < pdbMenu->nChoice; i++) {
+ gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE;
+ if (gotMatch) break;
+ }
+ if (gotMatch) {
+ callbackThreadCount[i] = count;
+ break;
+ } else {
+ errlogPrintf("Unknown priority \"%s\"\n", prio);
+ return -1;
+ }
+ }
+ pdbMenu = (dbMenu *)ellNext(&pdbMenu->node);
+ }
+ }
+ return 0;
+}
+
static void callbackTask(void *arg)
{
int priority = *(int *)arg;
@@ -90,7 +147,12 @@
while(TRUE) {
void *ptr;
epicsEventMustWait(callbackSem[priority]);
- while((ptr = epicsRingPointerPop(callbackQ[priority]))) {
+ while ((ptr = epicsRingPointerPop(callbackQ[priority]))) {
+ /* Retrigger if there are more threads and more work */
+ if (callbackThreadsRunning[priority] > 1
+ && !epicsRingPointerIsEmpty(callbackQ[priority])) {
+ epicsEventTrigger(callbackSem[priority]);
+ }
CALLBACK *pcallback = (CALLBACK *)ptr;
if (ptr == &exitCallback) goto shutdown;
ringOverflow[priority] = FALSE;
@@ -111,11 +173,11 @@
cbCtl = ctlExit;
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
- int lockKey = epicsInterruptLock();
- int ok = epicsRingPointerPush(callbackQ[i], &exitCallback);
- epicsInterruptUnlock(lockKey);
- epicsEventSignal(callbackSem[i]);
- if (ok) epicsEventWait(startStopEvent);
+ while (callbackThreadsRunning[i]--) {
+ int ok = epicsRingPointerPush(callbackQ[i], &exitCallback);
+ epicsEventSignal(callbackSem[i]);
+ if (ok) epicsEventWait(startStopEvent);
+ }
}
epicsTimerQueueRelease(timerQueue);
callbackOnceFlag = EPICS_THREAD_ONCE_INIT;
@@ -129,26 +191,38 @@
static void callbackInitOnce(void *arg)
{
int i;
+ int j;
+ char threadName[32];
startStopEvent = epicsEventMustCreate(epicsEventEmpty);
cbCtl = ctlRun;
timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh);
+
for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
epicsThreadId tid;
callbackSem[i] = epicsEventMustCreate(epicsEventEmpty);
- callbackQ[i] = epicsRingPointerCreate(callbackQueueSize);
+ callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize);
if (callbackQ[i] == 0)
- cantProceed("epicsRingPointerCreate failed for %s\n",
- threadName[i]);
+ cantProceed("epicsRingPointerLockedCreate failed for %s\n",
+ threadNamePrefix[i]);
ringOverflow[i] = FALSE;
- tid = epicsThreadCreate(threadName[i], threadPriority[i],
- epicsThreadGetStackSize(epicsThreadStackBig),
- (EPICSTHREADFUNC)callbackTask, &priorityValue[i]);
- if (tid == 0)
- cantProceed("Failed to spawn callback task %s\n", threadName[i]);
- else
- epicsEventWait(startStopEvent);
+
+ for (j = 0; j < callbackThreadCount[i]; j++) {
+ if (callbackThreadCount[i] > 1 )
+ sprintf(threadName, "%s-%d", threadNamePrefix[i], j);
+ else
+ strcpy(threadName, threadNamePrefix[i]);
+ tid = epicsThreadCreate(threadName, threadPriority[i],
+ epicsThreadGetStackSize(epicsThreadStackBig),
+ (EPICSTHREADFUNC)callbackTask, &priorityValue[i]);
+ if (tid == 0) {
+ cantProceed("Failed to spawn callback thread %s\n", threadName);
+ } else {
+ epicsEventWait(startStopEvent);
+ callbackThreadsRunning[i]++;
+ }
+ }
}
epicsAtExit(callbackExit, NULL);
}
@@ -159,36 +233,35 @@
}
/* This routine can be called from interrupt context */
-void callbackRequest(CALLBACK *pcallback)
+int callbackRequest(CALLBACK *pcallback)
{
int priority;
int pushOK;
- int lockKey;
if (!pcallback) {
epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n");
- return;
+ return S_db_notInit;
}
priority = pcallback->priority;
if (priority < 0 || priority >= NUM_CALLBACK_PRIORITIES) {
epicsInterruptContextMessage("callbackRequest: Bad priority\n");
- return;
+ return S_db_badChoice;
}
- if (ringOverflow[priority]) return;
+ if (ringOverflow[priority]) return S_db_bufFull;
- lockKey = epicsInterruptLock();
pushOK = epicsRingPointerPush(callbackQ[priority], pcallback);
- epicsInterruptUnlock(lockKey);
if (!pushOK) {
char msg[48] = "callbackRequest: ";
- strcat(msg, threadName[priority]);
+ strcat(msg, threadNamePrefix[priority]);
strcat(msg, " ring buffer full\n");
epicsInterruptContextMessage(msg);
ringOverflow[priority] = TRUE;
+ return S_db_bufFull;
}
epicsEventSignal(callbackSem[priority]);
+ return 0;
}
static void ProcessCallback(CALLBACK *pcallback)
@@ -209,11 +282,11 @@
callbackSetUser(pRec, pcallback);
}
-void callbackRequestProcessCallback(CALLBACK *pcallback,
+int callbackRequestProcessCallback(CALLBACK *pcallback,
int Priority, void *pRec)
{
callbackSetProcess(pcallback, Priority, pRec);
- callbackRequest(pcallback);
+ return callbackRequest(pcallback);
}
static void notify(void *pPrivate)
=== modified file 'src/ioc/db/callback.h'
--- src/ioc/db/callback.h 2013-10-22 21:58:26 +0000
+++ src/ioc/db/callback.h 2013-10-22 21:58:27 +0000
@@ -3,6 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -56,11 +57,11 @@
( (USER) = (void *)((CALLBACK *)(PCALLBACK))->user )
epicsShareFunc void callbackInit(void);
-epicsShareFunc void callbackRequest(CALLBACK *pCallback);
epicsShareFunc void callbackShutdown(void);
+epicsShareFunc int callbackRequest(CALLBACK *pCallback);
epicsShareFunc void callbackSetProcess(
CALLBACK *pcallback, int Priority, void *pRec);
-epicsShareFunc void callbackRequestProcessCallback(
+epicsShareFunc int callbackRequestProcessCallback(
CALLBACK *pCallback,int Priority, void *pRec);
epicsShareFunc void callbackRequestDelayed(
CALLBACK *pCallback,double seconds);
@@ -68,6 +69,7 @@
epicsShareFunc void callbackRequestProcessCallbackDelayed(
CALLBACK *pCallback, int Priority, void *pRec, double seconds);
epicsShareFunc int callbackSetQueueSize(int size);
+epicsShareFunc int callbackParallelThreads(int count, const char *prio);
#ifdef __cplusplus
}
=== modified file 'src/ioc/db/dbAccessDefs.h'
--- src/ioc/db/dbAccessDefs.h 2012-07-15 21:08:50 +0000
+++ src/ioc/db/dbAccessDefs.h 2013-10-22 21:58:27 +0000
@@ -199,6 +199,8 @@
#define S_db_cntSpwn (M_dbAccess|63) /*Cannot spawn dbContTask*/
#define S_db_cntCont (M_dbAccess|65) /*Cannot resume dbContTask*/
#define S_db_noMemory (M_dbAccess|66) /*unable to allocate data structure from pool*/
+#define S_db_notInit (M_dbAccess|67) /*Not initialized*/
+#define S_db_bufFull (M_dbAccess|68) /*Buffer full*/
epicsShareFunc long dbPutSpecial(struct dbAddr *paddr,int pass);
epicsShareFunc struct rset * dbGetRset(const struct dbAddr *paddr);
=== modified file 'src/ioc/db/dbIocRegister.c'
--- src/ioc/db/dbIocRegister.c 2012-07-15 21:08:50 +0000
+++ src/ioc/db/dbIocRegister.c 2013-10-22 21:58:27 +0000
@@ -23,6 +23,8 @@
#include "dbIocRegister.h"
#include "dbState.h"
+epicsShareDef int callbackParallelThreadsDefault;
+
/* dbLoadDatabase */
static const iocshArg dbLoadDatabaseArg0 = { "file name",iocshArgString};
static const iocshArg dbLoadDatabaseArg1 = { "path",iocshArgString};
@@ -298,6 +300,18 @@
callbackSetQueueSize(args[0].ival);
}
+/* callbackParallelThreads */
+static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt};
+static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString};
+static const iocshArg * const callbackParallelThreadsArgs[2] =
+ {&callbackParallelThreadsArg0,&callbackParallelThreadsArg1};
+static const iocshFuncDef callbackParallelThreadsFuncDef =
+ {"callbackParallelThreads",2,callbackParallelThreadsArgs};
+static void callbackParallelThreadsCallFunc(const iocshArgBuf *args)
+{
+ callbackParallelThreads(args[0].ival, args[1].sval);
+}
+
/* dbStateCreate */
static const iocshArg dbStateArgName = { "name", iocshArgString };
static const iocshArg * const dbStateCreateArgs[] = { &dbStateArgName };
@@ -394,6 +408,10 @@
iocshRegister(&scanpiolFuncDef,scanpiolCallFunc);
iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc);
+ iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc);
+
+ /* Needed before callback system is initialized */
+ callbackParallelThreadsDefault = epicsThreadGetCPUs();
iocshRegister(&dbStateCreateFuncDef, dbStateCreateCallFunc);
iocshRegister(&dbStateSetFuncDef, dbStateSetCallFunc);
=== modified file 'src/ioc/db/dbScan.c'
--- src/ioc/db/dbScan.c 2013-10-22 21:58:26 +0000
+++ src/ioc/db/dbScan.c 2013-10-22 21:58:27 +0000
@@ -119,6 +119,8 @@
CALLBACK callback;
scan_list scan_list;
struct io_scan_list *next;
+ io_scan_complete cb;
+ void * arg;
} io_scan_list;
static io_scan_list *iosl_head[NUM_CALLBACK_PRIORITIES] = {
@@ -510,19 +512,31 @@
}
}
-
-void scanIoRequest(IOSCANPVT pioscanpvt)
+/* return a bit mask indicating each prioity level
+ * in which a callback request was queued.
+ */
+unsigned int scanIoRequest(IOSCANPVT pioscanpvt)
{
int prio;
+ unsigned int queued = 0;
- if (scanCtl != ctlRun) return;
+ if (scanCtl != ctlRun) return 0;
for (prio = 0; prio < NUM_CALLBACK_PRIORITIES; prio++) {
io_scan_list *piosl = &pioscanpvt[prio];
if (ellCount(&piosl->scan_list.list) > 0)
- callbackRequest(&piosl->callback);
+ if(!callbackRequest(&piosl->callback))
+ queued |= 1<<prio;
}
-}
-
+ return queued;
+}
+
+/* May not be called while a scan request is queued or running */
+void scanIoSetComplete(IOSCANPVT pioscanpvt, io_scan_complete cb, void* arg)
+{
+ pioscanpvt->cb = cb;
+ pioscanpvt->arg = arg;
+}
+
void scanOnce(struct dbCommon *precord)
{
static int newOverflow = TRUE;
@@ -735,9 +749,15 @@
static void ioeventCallback(CALLBACK *pcallback)
{
io_scan_list *piosl;
+ io_scan_list *pioslLow;
callbackGetUser(piosl, pcallback);
scanList(&piosl->scan_list);
+ pioslLow = piosl - pcallback->priority;
+ if(pioslLow->cb)
+ (*pioslLow->cb)(pioslLow->arg,
+ pioslLow,
+ pcallback->priority);
}
static void printList(scan_list *psl, char *message)
=== modified file 'src/ioc/db/dbScan.h'
--- src/ioc/db/dbScan.h 2013-10-22 21:58:26 +0000
+++ src/ioc/db/dbScan.h 2013-10-22 21:58:27 +0000
@@ -39,6 +39,8 @@
typedef struct io_scan_list *IOSCANPVT;
typedef struct event_list *EVENTPVT;
+typedef void (*io_scan_complete)(void *, IOSCANPVT, int);
+
struct dbCommon;
epicsShareFunc long scanInit(void);
@@ -65,7 +67,8 @@
epicsShareFunc int scanpiol(void);
epicsShareFunc void scanIoInit(IOSCANPVT *);
-epicsShareFunc void scanIoRequest(IOSCANPVT);
+epicsShareFunc unsigned int scanIoRequest(IOSCANPVT);
+epicsShareFunc void scanIoSetComplete(IOSCANPVT, io_scan_complete, void*);
#ifdef __cplusplus
}
=== modified file 'src/ioc/db/test/Makefile'
--- src/ioc/db/test/Makefile 2013-10-22 21:58:26 +0000
+++ src/ioc/db/test/Makefile 2013-10-22 21:58:27 +0000
@@ -30,11 +30,28 @@
testHarness_SRCS += callbackTest.c
TESTS += callbackTest
+TESTPROD_HOST += callbackParallelTest
+callbackParallelTest_SRCS += callbackParallelTest.c
+testHarness_SRCS += callbackParallelTest.c
+TESTS += callbackParallelTest
+
TESTPROD_HOST += dbStateTest
dbStateTest_SRCS += dbStateTest.c
testHarness_SRCS += dbStateTest.c
TESTS += dbStateTest
+TARGETS += $(COMMON_DIR)/scanIoTest.dbd
+scanIoTest_DBD += menuGlobal.dbd
+scanIoTest_DBD += menuConvert.dbd
+scanIoTest_DBD += yRecord.dbd
+TESTPROD_HOST += scanIoTest
+scanIoTest_SRCS += scanIoTest.c
+scanIoTest_SRCS += scanIoTest_registerRecordDeviceDriver.cpp
+testHarness_SRCS += scanIoTest.c
+testHarness_SRCS += scanIoTest_registerRecordDeviceDriver.cpp
+TESTFILES += $(COMMON_DIR)/scanIoTest.dbd ../scanIoTest.db
+TESTS += scanIoTest
+
TARGETS += $(COMMON_DIR)/dbChannelTest.dbd
dbChannelTest_DBD += xRecord.dbd
TESTPROD_HOST += dbChannelTest
@@ -82,3 +99,4 @@
include $(TOP)/configure/RULES
xRecord$(DEP): $(COMMON_DIR)/xRecord.h
+scanIoTest$(DEP): $(COMMON_DIR)/yRecord.h
=== added file 'src/ioc/db/test/callbackParallelTest.c'
--- src/ioc/db/test/callbackParallelTest.c 1970-01-01 00:00:00 +0000
+++ src/ioc/db/test/callbackParallelTest.c 2013-10-22 21:58:27 +0000
@@ -0,0 +1,184 @@
+/*************************************************************************\
+* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne
+* National Laboratory.
+* Copyright (c) 2002 The Regents of the University of California, as
+* Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+/* $Revision-Id$ */
+
+/* Author: Marty Kraimer Date: 26JAN2000 */
+
+#include <stddef.h>
+#include <stdlib.h>
+#include <stddef.h>
+#include <string.h>
+#include <stdio.h>
+#include <math.h>
+
+#include "callback.h"
+#include "cantProceed.h"
+#include "epicsThread.h"
+#include "epicsEvent.h"
+#include "epicsTime.h"
+#include "epicsUnitTest.h"
+#include "testMain.h"
+
+/*
+ * This test checks both immediate and delayed callbacks in two steps.
+ * In the first step (pass1) NCALLBACKS immediate callbacks are queued.
+ * As each is run it starts a second delayed callback (pass2).
+ * The last delayed callback which runs signals an epicsEvent
+ * to the main thread.
+ *
+ * Two time intervals are measured. The time to queue and run each of
+ * the immediate callbacks, and the actual delay of the delayed callback.
+ */
+
+#define NCALLBACKS 169
+#define DELAY_QUANTUM 0.25
+
+#define TEST_DELAY(i) ((i / NUM_CALLBACK_PRIORITIES) * DELAY_QUANTUM)
+
+typedef struct myPvt {
+ CALLBACK cb1;
+ CALLBACK cb2;
+ epicsTimeStamp pass1Time;
+ epicsTimeStamp pass2Time;
+ double delay;
+ int pass;
+ int resultFail;
+} myPvt;
+
+epicsEventId finished;
+
+static void myCallback(CALLBACK *pCallback)
+{
+ myPvt *pmyPvt;
+
+ callbackGetUser(pmyPvt, pCallback);
+
+ pmyPvt->pass++;
+
+ if (pmyPvt->pass == 1) {
+ epicsTimeGetCurrent(&pmyPvt->pass1Time);
+ callbackRequestDelayed(&pmyPvt->cb2, pmyPvt->delay);
+ } else if (pmyPvt->pass == 2) {
+ epicsTimeGetCurrent(&pmyPvt->pass2Time);
+ } else {
+ pmyPvt->resultFail = 1;
+ return;
+ }
+}
+
+static void finalCallback(CALLBACK *pCallback)
+{
+ myCallback(pCallback);
+ epicsEventSignal(finished);
+}
+
+static void updateStats(double *stats, double val)
+{
+ if (stats[0] > val) stats[0] = val;
+ if (stats[1] < val) stats[1] = val;
+ stats[2] += val;
+ stats[3] += pow(val, 2.0);
+ stats[4] += 1.;
+}
+
+static void printStats(double *stats, const char* tag) {
+ testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f",
+ tag, stats[0], stats[2]/stats[4], stats[1],
+ sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]);
+}
+
+MAIN(callbackParallelTest)
+{
+ myPvt *pcbt[NCALLBACKS];
+ epicsTimeStamp start;
+ int noCpus = epicsThreadGetCPUs();
+ int i, j;
+ /* Statistics: min/max/sum/sum^2/n for each priority */
+ double setupError[NUM_CALLBACK_PRIORITIES][5];
+ double timeError[NUM_CALLBACK_PRIORITIES][5];
+ double defaultError[5] = {1,-1,0,0,0};
+
+ for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++)
+ for (j = 0; j < 5; j++)
+ setupError[i][j] = timeError[i][j] = defaultError[j];
+
+ testPlan(NCALLBACKS * 2 + 1);
+
+ testDiag("Starting %d parallel callback threads", noCpus);
+
+ callbackParallelThreads(noCpus, "");
+ callbackInit();
+ epicsThreadSleep(1.0);
+
+ finished = epicsEventMustCreate(epicsEventEmpty);
+
+ for (i = 0; i < NCALLBACKS ; i++) {
+ pcbt[i] = callocMustSucceed(1, sizeof(myPvt), "pcbt");
+ callbackSetCallback(myCallback, &pcbt[i]->cb1);
+ callbackSetCallback(myCallback, &pcbt[i]->cb2);
+ callbackSetUser(pcbt[i], &pcbt[i]->cb1);
+ callbackSetUser(pcbt[i], &pcbt[i]->cb2);
+ callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb1);
+ callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb2);
+ pcbt[i]->delay = TEST_DELAY(i);
+ pcbt[i]->pass = 0;
+ }
+
+ /* Last callback is special */
+ callbackSetCallback(finalCallback, &pcbt[NCALLBACKS-1]->cb2);
+ callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb1);
+ callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb2);
+ pcbt[NCALLBACKS-1]->delay = TEST_DELAY(NCALLBACKS) + 1.0;
+ pcbt[NCALLBACKS-1]->pass = 0;
+
+ testOk1(epicsTimeGetCurrent(&start)==epicsTimeOK);
+
+ for (i = 0; i < NCALLBACKS ; i++) {
+ callbackRequest(&pcbt[i]->cb1);
+ }
+
+ testDiag("Waiting %.02f sec", pcbt[NCALLBACKS-1]->delay);
+
+ epicsEventWait(finished);
+
+ for (i = 0; i < NCALLBACKS ; i++) {
+ if(pcbt[i]->resultFail || pcbt[i]->pass!=2)
+ testFail("pass = %d for delay = %f", pcbt[i]->pass, pcbt[i]->delay);
+ else {
+ double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start);
+ testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05",
+ pcbt[i]->delay, delta);
+ updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta);
+ }
+ }
+
+ for (i = 0; i < NCALLBACKS ; i++) {
+ double delta, error;
+ if(pcbt[i]->resultFail || pcbt[i]->pass!=2)
+ continue;
+ delta = epicsTimeDiffInSeconds(&pcbt[i]->pass2Time, &pcbt[i]->pass1Time);
+ error = delta - pcbt[i]->delay;
+ testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05",
+ pcbt[i]->delay, error);
+ updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error);
+ }
+
+ testDiag("Setup time statistics");
+ printStats(setupError[0], "LOW");
+ printStats(setupError[1], "MID");
+ printStats(setupError[2], "HIGH");
+
+ testDiag("Delay time statistics");
+ printStats(timeError[0], "LOW");
+ printStats(timeError[1], "MID");
+ printStats(timeError[2], "HIGH");
+
+ return testDone();
+}
=== modified file 'src/ioc/db/test/callbackTest.c'
--- src/ioc/db/test/callbackTest.c 2012-07-06 17:35:26 +0000
+++ src/ioc/db/test/callbackTest.c 2013-10-22 21:58:27 +0000
@@ -3,6 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -79,11 +80,34 @@
epicsEventSignal(finished);
}
+static void updateStats(double *stats, double val)
+{
+ if (stats[0] > val) stats[0] = val;
+ if (stats[1] < val) stats[1] = val;
+ stats[2] += val;
+ stats[3] += pow(val, 2.0);
+ stats[4] += 1.;
+}
+
+static void printStats(double *stats, const char* tag) {
+ testDiag("Priority %4s min/avg/max/sigma = %f / %f / %f / %f",
+ tag, stats[0], stats[2]/stats[4], stats[1],
+ sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]);
+}
+
MAIN(callbackTest)
{
myPvt *pcbt[NCALLBACKS];
epicsTimeStamp start;
- int i;
+ int i, j;
+ /* Statistics: min/max/sum/sum^2/n for each priority */
+ double setupError[NUM_CALLBACK_PRIORITIES][5];
+ double timeError[NUM_CALLBACK_PRIORITIES][5];
+ double defaultError[5] = {1,-1,0,0,0};
+
+ for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++)
+ for (j = 0; j < 5; j++)
+ setupError[i][j] = timeError[i][j] = defaultError[j];
testPlan(NCALLBACKS * 2 + 1);
@@ -128,8 +152,8 @@
double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start);
testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05",
pcbt[i]->delay, delta);
+ updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta);
}
-
}
for (i = 0; i < NCALLBACKS ; i++) {
@@ -140,7 +164,18 @@
error = delta - pcbt[i]->delay;
testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05",
pcbt[i]->delay, error);
+ updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error);
}
+ testDiag("Setup time statistics");
+ printStats(setupError[0], "LOW");
+ printStats(setupError[1], "MID");
+ printStats(setupError[2], "HIGH");
+
+ testDiag("Delay time statistics");
+ printStats(timeError[0], "LOW");
+ printStats(timeError[1], "MID");
+ printStats(timeError[2], "HIGH");
+
return testDone();
}
=== modified file 'src/ioc/db/test/epicsRunDbTests.c'
--- src/ioc/db/test/epicsRunDbTests.c 2012-06-28 14:55:46 +0000
+++ src/ioc/db/test/epicsRunDbTests.c 2013-10-22 21:58:27 +0000
@@ -28,6 +28,7 @@
runTest(callbackTest);
runTest(dbStateTest);
+ runTest(scanIoTest);
runTest(testDbChannel);
runTest(chfPluginTest);
runTest(arrShorthandTest);
=== modified file 'src/ioc/misc/dbCore.dbd'
--- src/ioc/misc/dbCore.dbd 2011-02-26 05:56:51 +0000
+++ src/ioc/misc/dbCore.dbd 2013-10-22 21:58:27 +0000
@@ -12,3 +12,5 @@
variable(dbRecordsOnceOnly,int)
variable(dbBptNotMonotonic,int)
+# Default number of parallel callback threads
+variable(callbackParallelThreadsDefault,int)
=== modified file 'src/libCom/osi/epicsThread.h'
--- src/libCom/osi/epicsThread.h 2012-07-06 21:33:10 +0000
+++ src/libCom/osi/epicsThread.h 2013-10-22 21:58:27 +0000
@@ -3,7 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
-* Copyright (c) 2012 ITER Organization
+* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -85,6 +85,7 @@
epicsShareFunc double epicsShareAPI epicsThreadSleepQuantum(void);
epicsShareFunc epicsThreadId epicsShareAPI epicsThreadGetIdSelf(void);
epicsShareFunc epicsThreadId epicsShareAPI epicsThreadGetId(const char *name);
+epicsShareFunc int epicsThreadGetCPUs(void);
epicsShareFunc const char * epicsShareAPI epicsThreadGetNameSelf(void);
=== modified file 'src/libCom/osi/os/RTEMS/osdThread.c'
--- src/libCom/osi/os/RTEMS/osdThread.c 2012-07-31 19:04:38 +0000
+++ src/libCom/osi/os/RTEMS/osdThread.c 2013-10-22 21:58:27 +0000
@@ -718,3 +718,12 @@
return 1.0 / rtemsTicksPerSecond_double;
}
+
+epicsShareFunc int epicsThreadGetCPUs(void)
+{
+#if defined(RTEMS_SMP)
+ return rtems_smp_get_number_of_processors();
+#else
+ return 1;
+#endif
+}
=== modified file 'src/libCom/osi/os/WIN32/osdThread.c'
--- src/libCom/osi/os/WIN32/osdThread.c 2013-06-07 23:08:38 +0000
+++ src/libCom/osi/os/WIN32/osdThread.c 2013-10-22 21:58:27 +0000
@@ -1101,6 +1101,14 @@
return ( void * ) TlsGetValue ( pPvt->key );
}
+/*
+ * epicsThreadGetCPUs ()
+ */
+epicsShareFunc int epicsThreadGetCPUs ( void )
+{
+ return atoi(getenv("NUMBER_OF_PROCESSORS"));
+}
+
#ifdef TEST_CODES
void testPriorityMapping ()
{
=== modified file 'src/libCom/osi/os/posix/osdThread.c'
--- src/libCom/osi/os/posix/osdThread.c 2012-09-20 19:55:32 +0000
+++ src/libCom/osi/os/posix/osdThread.c 2013-10-22 21:58:27 +0000
@@ -3,7 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
-* Copyright (c) 2012 ITER Organization
+* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -873,3 +873,7 @@
return 1.0 / hz;
}
+epicsShareFunc int epicsThreadGetCPUs(void)
+{
+ return sysconf(_SC_NPROCESSORS_ONLN);
+}
=== modified file 'src/libCom/osi/os/vxWorks/osdThread.c'
--- src/libCom/osi/os/vxWorks/osdThread.c 2013-06-07 23:08:38 +0000
+++ src/libCom/osi/os/vxWorks/osdThread.c 2013-10-22 21:58:27 +0000
@@ -447,3 +447,8 @@
double HZ = sysClkRateGet ();
return 1.0 / HZ;
}
+
+epicsShareFunc int epicsThreadGetCPUs(void)
+{
+ return 1;
+}
=== modified file 'src/libCom/ring/epicsRingBytes.c'
--- src/libCom/ring/epicsRingBytes.c 2012-05-24 18:31:28 +0000
+++ src/libCom/ring/epicsRingBytes.c 2013-10-22 21:58:27 +0000
@@ -3,13 +3,16 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
-* EPICS BASE Versions 3.13.7
-* and higher are distributed subject to a Software License Agreement found
-* in file LICENSE that is included with this distribution.
+* Copyright (c) 2012 ITER Organization.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
\*************************************************************************/
-/* epicsRingBytes.cd */
-/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */
+/*
+ * Author: Marty Kraimer Date: 15JUL99
+ * Eric Norum
+ * Ralph Lange <[email protected]>
+ */
#include <stddef.h>
#include <string.h>
@@ -18,6 +21,7 @@
#include <stdio.h>
#define epicsExportSharedSymbols
+#include "epicsSpin.h"
#include "dbDefs.h"
#include "epicsRingBytes.h"
@@ -30,6 +34,7 @@
#define SLOP 16
typedef struct ringPvt {
+ epicsSpinId lock;
volatile int nextPut;
volatile int nextGet;
int size;
@@ -44,12 +49,23 @@
pring->size = size + SLOP;
pring->nextGet = 0;
pring->nextPut = 0;
+ pring->lock = 0;
+ return((void *)pring);
+}
+
+epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int size)
+{
+ ringPvt *pring = (ringPvt *)epicsRingBytesCreate(size);
+ if(!pring)
+ return NULL;
+ pring->lock = epicsSpinCreate();
return((void *)pring);
}
epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
+ if (pring->lock) epicsSpinDestroy(pring->lock);
free((void *)pring);
}
@@ -57,11 +73,14 @@
epicsRingBytesId id, char *value,int nbytes)
{
ringPvt *pring = (ringPvt *)id;
- int nextGet = pring->nextGet;
- int nextPut = pring->nextPut;
- int size = pring->size;
+ int nextGet, nextPut, size;
int count;
+ if (pring->lock) epicsSpinLock(pring->lock);
+ nextGet = pring->nextGet;
+ nextPut = pring->nextPut;
+ size = pring->size;
+
if (nextGet <= nextPut) {
count = nextPut - nextGet;
if (count < nbytes)
@@ -89,6 +108,8 @@
}
}
pring->nextGet = nextGet;
+
+ if (pring->lock) epicsSpinUnlock(pring->lock);
return nbytes;
}
@@ -96,23 +117,30 @@
epicsRingBytesId id, char *value,int nbytes)
{
ringPvt *pring = (ringPvt *)id;
- int nextGet = pring->nextGet;
- int nextPut = pring->nextPut;
- int size = pring->size;
+ int nextGet, nextPut, size;
int freeCount, copyCount, topCount;
+ if (pring->lock) epicsSpinLock(pring->lock);
+ nextGet = pring->nextGet;
+ nextPut = pring->nextPut;
+ size = pring->size;
+
if (nextPut < nextGet) {
freeCount = nextGet - nextPut - SLOP;
- if (nbytes > freeCount)
+ if (nbytes > freeCount) {
+ if (pring->lock) epicsSpinUnlock(pring->lock);
return 0;
+ }
if (nbytes)
memcpy ((void *)&pring->buffer[nextPut], value, nbytes);
nextPut += nbytes;
}
else {
freeCount = size - nextPut + nextGet - SLOP;
- if (nbytes > freeCount)
+ if (nbytes > freeCount) {
+ if (pring->lock) epicsSpinUnlock(pring->lock);
return 0;
+ }
topCount = size - nextPut;
copyCount = (nbytes > topCount) ? topCount : nbytes;
if (copyCount)
@@ -126,6 +154,8 @@
}
}
pring->nextPut = nextPut;
+
+ if (pring->lock) epicsSpinUnlock(pring->lock);
return nbytes;
}
@@ -133,14 +163,20 @@
{
ringPvt *pring = (ringPvt *)id;
+ if (pring->lock) epicsSpinLock(pring->lock);
pring->nextGet = pring->nextPut;
+ if (pring->lock) epicsSpinUnlock(pring->lock);
}
epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
- int nextGet = pring->nextGet;
- int nextPut = pring->nextPut;
+ int nextGet, nextPut;
+
+ if (pring->lock) epicsSpinLock(pring->lock);
+ nextGet = pring->nextGet;
+ nextPut = pring->nextPut;
+ if (pring->lock) epicsSpinUnlock(pring->lock);
if (nextPut < nextGet)
return nextGet - nextPut - SLOP;
@@ -151,8 +187,18 @@
epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
-
- return pring->size - epicsRingBytesFreeBytes(id) - SLOP;
+ int nextGet, nextPut;
+ int used;
+
+ if (pring->lock) epicsSpinLock(pring->lock);
+ nextGet = pring->nextGet;
+ nextPut = pring->nextPut;
+ if (pring->lock) epicsSpinUnlock(pring->lock);
+
+ used = nextPut - nextGet;
+ if (used < 0) used += pring->size;
+
+ return used;
}
epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id)
@@ -165,8 +211,13 @@
epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id)
{
ringPvt *pring = (ringPvt *)id;
-
- return (pring->nextPut == pring->nextGet);
+ int isEmpty;
+
+ if (pring->lock) epicsSpinLock(pring->lock);
+ isEmpty = (pring->nextPut == pring->nextGet);
+ if (pring->lock) epicsSpinUnlock(pring->lock);
+
+ return isEmpty;
}
epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id)
=== modified file 'src/libCom/ring/epicsRingBytes.h'
--- src/libCom/ring/epicsRingBytes.h 2002-07-12 21:35:43 +0000
+++ src/libCom/ring/epicsRingBytes.h 2013-10-22 21:58:27 +0000
@@ -3,13 +3,16 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
-* EPICS BASE Versions 3.13.7
-* and higher are distributed subject to a Software License Agreement found
-* in file LICENSE that is included with this distribution.
+* Copyright (c) 2012 ITER Organization.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
\*************************************************************************/
-/*epicsRingBytes.h */
-/* Author: Eric Norum & Marty Kraimer Date: 15JUL99 */
+/*
+ * Author: Marty Kraimer Date: 15JUL99
+ * Eric Norum
+ * Ralph Lange <[email protected]>
+ */
#ifndef INCepicsRingBytesh
#define INCepicsRingBytesh
@@ -23,6 +26,8 @@
typedef void *epicsRingBytesId;
epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesCreate(int nbytes);
+/* Same, but secured by a spinlock */
+epicsShareFunc epicsRingBytesId epicsShareAPI epicsRingBytesLockedCreate(int nbytes);
epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id);
epicsShareFunc int epicsShareAPI epicsRingBytesGet(
epicsRingBytesId id, char *value,int nbytes);
@@ -42,6 +47,8 @@
/* NOTES
If there is only one writer it is not necessary to lock for put
If there is a single reader it is not necessary to lock for puts
+
+ epicsRingBytesLocked uses a spinlock.
*/
#endif /* INCepicsRingBytesh */
=== modified file 'src/libCom/ring/epicsRingPointer.cpp'
--- src/libCom/ring/epicsRingPointer.cpp 2013-01-30 00:02:51 +0000
+++ src/libCom/ring/epicsRingPointer.cpp 2013-10-22 21:58:27 +0000
@@ -3,11 +3,16 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
+* Copyright (c) 2012 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
-/*epicsRingPointer.cpp*/
-/* Author: Marty Kraimer Date: 13OCT2000 */
+
+/*
+ * Author: Marty Kraimer Date: 13OCT2000
+ * Ralph Lange <[email protected]>
+ */
+
#include <stddef.h>
#include <string.h>
@@ -22,7 +27,13 @@
epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size)
{
- voidPointer *pvoidPointer = new voidPointer(size);
+ voidPointer *pvoidPointer = new voidPointer(size, false);
+ return(reinterpret_cast<void *>(pvoidPointer));
+}
+
+epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size)
+{
+ voidPointer *pvoidPointer = new voidPointer(size, true);
return(reinterpret_cast<void *>(pvoidPointer));
}
=== modified file 'src/libCom/ring/epicsRingPointer.h'
--- src/libCom/ring/epicsRingPointer.h 2013-01-30 00:02:51 +0000
+++ src/libCom/ring/epicsRingPointer.h 2013-10-22 21:58:27 +0000
@@ -3,12 +3,15 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
+* Copyright (c) 2012 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
-/*epicsRingPointer.h */
-/* Author: Marty Kraimer Date: 15JUL99 */
+/*
+ * Author: Marty Kraimer Date: 15JUL99
+ * Ralph Lange <[email protected]>
+ */
#ifndef INCepicsRingPointerh
#define INCepicsRingPointerh
@@ -16,15 +19,18 @@
/* NOTES
* If there is only one writer it is not necessary to lock push
* If there is a single reader it is not necessary to lock pop
+ *
+ * epicsRingPointerLocked uses a spinlock.
*/
+#include "epicsSpin.h"
#include "shareLib.h"
#ifdef __cplusplus
template <class T>
class epicsRingPointer {
public: /* Functions */
- epicsRingPointer(int size);
+ epicsRingPointer(int size, bool locked);
~epicsRingPointer();
bool push(T *p);
T* pop();
@@ -42,6 +48,7 @@
epicsRingPointer& operator=(const epicsRingPointer &);
private: /* Data */
+ epicsSpinId lock;
volatile int nextPush;
volatile int nextPop;
int size;
@@ -54,6 +61,8 @@
typedef void *epicsRingPointerId;
epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerCreate(int size);
+/* Same, but secured by a spinlock */
+epicsShareFunc epicsRingPointerId epicsShareAPI epicsRingPointerLockedCreate(int size);
epicsShareFunc void epicsShareAPI epicsRingPointerDelete(epicsRingPointerId id);
/*ringPointerPush returns (0,1) if p (was not, was) put on ring*/
epicsShareFunc int epicsShareAPI epicsRingPointerPush(epicsRingPointerId id,void *p);
@@ -85,72 +94,105 @@
#ifdef __cplusplus
template <class T>
-inline epicsRingPointer<T>::epicsRingPointer(int sz) :
- nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) {}
+inline epicsRingPointer<T>::epicsRingPointer(int sz, bool locked) :
+ lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1])
+{
+ if (locked)
+ lock = epicsSpinCreate();
+}
template <class T>
inline epicsRingPointer<T>::~epicsRingPointer()
-{ delete [] buffer;}
+{
+ if (lock) epicsSpinDestroy(lock);
+ delete [] buffer;
+}
template <class T>
inline bool epicsRingPointer<T>::push(T *p)
{
+ if (lock) epicsSpinLock(lock);
int next = nextPush;
int newNext = next + 1;
if(newNext>=size) newNext=0;
- if(newNext==nextPop) return(false);
+ if (newNext == nextPop) {
+ if (lock) epicsSpinUnlock(lock);
+ return(false);
+ }
buffer[next] = p;
nextPush = newNext;
+ if (lock) epicsSpinUnlock(lock);
return(true);
}
template <class T>
inline T* epicsRingPointer<T>::pop()
{
+ if (lock) epicsSpinLock(lock);
int next = nextPop;
- if(next == nextPush) return(0);
+ if (next == nextPush) {
+ if (lock) epicsSpinUnlock(lock);
+ return(0);
+ }
T*p = buffer[next];
++next;
if(next >=size) next = 0;
nextPop = next;
+ if (lock) epicsSpinUnlock(lock);
return(p);
}
template <class T>
inline void epicsRingPointer<T>::flush()
{
+ if (lock) epicsSpinLock(lock);
nextPop = 0;
nextPush = 0;
+ if (lock) epicsSpinUnlock(lock);
}
template <class T>
inline int epicsRingPointer<T>::getFree() const
{
+ if (lock) epicsSpinLock(lock);
int n = nextPop - nextPush - 1;
if (n < 0) n += size;
+ if (lock) epicsSpinUnlock(lock);
return n;
}
template <class T>
inline int epicsRingPointer<T>::getUsed() const
{
+ if (lock) epicsSpinLock(lock);
int n = nextPush - nextPop;
if (n < 0) n += size;
+ if (lock) epicsSpinUnlock(lock);
return n;
}
template <class T>
inline int epicsRingPointer<T>::getSize() const
-{ return(size-1);}
+{
+ return(size-1);
+}
template <class T>
inline bool epicsRingPointer<T>::isEmpty() const
-{ return(nextPush==nextPop);}
+{
+ bool isEmpty;
+ if (lock) epicsSpinLock(lock);
+ isEmpty = (nextPush == nextPop);
+ if (lock) epicsSpinUnlock(lock);
+ return isEmpty;
+}
template <class T>
inline bool epicsRingPointer<T>::isFull() const
{
+ if (lock) epicsSpinLock(lock);
int count = nextPush - nextPop +1;
+ if (lock) epicsSpinUnlock(lock);
return((count == 0) || (count == size));
}
=== modified file 'src/libCom/test/ringPointerTest.c'
--- src/libCom/test/ringPointerTest.c 2009-04-23 22:28:44 +0000
+++ src/libCom/test/ringPointerTest.c 2013-10-22 21:58:27 +0000
@@ -3,6 +3,7 @@
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
@@ -26,12 +27,17 @@
#include "testMain.h"
#define ringSize 10
+#define consumerCount 4
+#define producerCount 4
static volatile int testExit = 0;
+int value[ringSize*2];
typedef struct info {
epicsEventId consumerEvent;
epicsRingPointerId ring;
+ int checkOrder;
+ int value[ringSize*2];
}info;
static void consumer(void *arg)
@@ -39,16 +45,46 @@
info *pinfo = (info *)arg;
static int expectedValue=0;
int *newvalue;
+ char myname[20];
- testDiag("Consumer starting");
+ epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname));
+ testDiag("%s starting", myname);
while(1) {
epicsEventMustWait(pinfo->consumerEvent);
if (testExit) return;
- while((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) {
- testOk(expectedValue == *newvalue,
- "Consumer: %d == %d", expectedValue, *newvalue);
- expectedValue = *newvalue + 1;
- }
+ while ((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) {
+ if (pinfo->checkOrder) {
+ testOk(expectedValue == *newvalue,
+ "%s: (got) %d == %d (expected)", myname, *newvalue, expectedValue);
+ expectedValue = *newvalue + 1;
+ } else {
+ testOk(pinfo->value[*newvalue] <= producerCount, "%s: got a %d (%d times seen before)",
+ myname, *newvalue, pinfo->value[*newvalue]);
+ }
+ /* This must be atomic... */
+ pinfo->value[*newvalue]++;
+ epicsThreadSleep(0.05);
+ }
+ }
+}
+
+static void producer(void *arg)
+{
+ info *pinfo = (info *)arg;
+ char myname[20];
+ int i;
+
+ epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname));
+ testDiag("%s starting", myname);
+ for (i=0; i<ringSize*2; i++) {
+ while (epicsRingPointerIsFull(pinfo->ring)) {
+ epicsThreadSleep(0.2);
+ if (testExit) return;
+ }
+ testOk(epicsRingPointerPush(pinfo->ring, (void *)&value[i]),
+ "%s: Pushing %d, ring not full", myname, i);
+ epicsEventSignal(pinfo->consumerEvent);
+ if (testExit) return;
}
}
@@ -57,21 +93,27 @@
int i;
info *pinfo;
epicsEventId consumerEvent;
- int value[ringSize*2];
int *pgetValue;
epicsRingPointerId ring;
epicsThreadId tid;
+ char threadName[20];
- testPlan(54);
+ testPlan(256);
for (i=0; i<ringSize*2; i++) value[i] = i;
+
pinfo = calloc(1,sizeof(info));
if(!pinfo) testAbort("calloc failed");
+
pinfo->consumerEvent = consumerEvent = epicsEventMustCreate(epicsEventEmpty);
if (!consumerEvent) {
testAbort("epicsEventMustCreate failed");
}
+ testDiag("******************************************************");
+ testDiag("** Test 1: local ring pointer, check size and order **");
+ testDiag("******************************************************");
+
pinfo->ring = ring = epicsRingPointerCreate(ringSize);
if (!ring) {
testAbort("epicsRingPointerCreate failed");
@@ -86,22 +128,74 @@
}
testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
+ testDiag("**************************************************************");
+ testDiag("** Test 2: unlocked ring pointer, one consumer, check order **");
+ testDiag("**************************************************************");
+
+ pinfo->checkOrder = 1;
tid=epicsThreadCreate("consumer", 50,
epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo);
if(!tid) testAbort("epicsThreadCreate failed");
- epicsThreadSleep(0.1);
+ epicsThreadSleep(0.2);
for (i=0; i<ringSize*2; i++) {
if (epicsRingPointerIsFull(ring)) {
- epicsEventSignal(consumerEvent);
epicsThreadSleep(0.2);
}
- testOk(epicsRingPointerPush(ring, (void *)&value[i]), "Ring not full");
- }
+ testOk(epicsRingPointerPush(ring, (void *)&value[i]), "Pushing %d, ring not full", i);
+ epicsEventSignal(consumerEvent);
+ }
+ epicsThreadSleep(1.0);
+ testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
+
+ for (i=0; i<ringSize*2; i++) {
+ testOk(pinfo->value[i] == 1, "Value test: %d was processed", i);
+ }
+
+ testExit = 1;
epicsEventSignal(consumerEvent);
+ epicsThreadSleep(1.0);
+
+ epicsRingPointerDelete(pinfo->ring);
+
+ testDiag("*************************************************************************************");
+ testDiag("** Test 3: locked ring pointer, many consumers, many producers, check no of copies **");
+ testDiag("*************************************************************************************");
+
+ pinfo->ring = ring = epicsRingPointerLockedCreate(ringSize);
+ if (!ring) {
+ testAbort("epicsRingPointerLockedCreate failed");
+ }
+ testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
+
+ for (i=0; i<ringSize*2; i++) pinfo->value[i] = 0;
+ testExit = 0;
+ pinfo->checkOrder = 0;
+ for (i=0; i<consumerCount; i++) {
+ sprintf(threadName, "consumer%d", i);
+ tid=epicsThreadCreate(threadName, 50,
+ epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo);
+ if(!tid) testAbort("epicsThreadCreate failed");
+ }
epicsThreadSleep(0.2);
+
+ for (i=0; i<producerCount; i++) {
+ sprintf(threadName, "producer%d", i);
+ tid=epicsThreadCreate(threadName, 50,
+ epicsThreadGetStackSize(epicsThreadStackSmall), producer, pinfo);
+ if(!tid) testAbort("epicsThreadCreate failed");
+ }
+
+ epicsThreadSleep(0.5);
+ epicsEventSignal(consumerEvent);
+ epicsThreadSleep(1.0);
+
testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
+ for (i=0; i<ringSize*2; i++) {
+ testOk(pinfo->value[i] == producerCount, "Value test: %d was processed %d times", i, producerCount);
+ }
+
testExit = 1;
epicsEventSignal(consumerEvent);
epicsThreadSleep(1.0);
- Replies:
- Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads-2 into lp:epics-base Ralph Lange
- Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads-2 into lp:epics-base mdavidsaver
- Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads-2 into lp:epics-base mdavidsaver
- Navigate by Date:
- Prev:
Re: [Merge] lp:~epics-core/epics-base/ioc-shutdown into lp:epics-base Ralph Lange
- Next:
Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads-2 into lp:epics-base Ralph Lange
- Index:
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
<2013>
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
- Navigate by Thread:
- Prev:
EPICS Mutex exceptions with file & line Andrew Johnson
- Next:
Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads-2 into lp:epics-base Ralph Lange
- Index:
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
<2013>
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024