EPICS Controls Argonne National Laboratory

Experimental Physics and
Industrial Control System

1994  1995  1996  1997  1998  1999  2000  2001  2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  2013  2014  2015  2016  2017  2018  2019  <20202021  2022  2023  2024  Index 1994  1995  1996  1997  1998  1999  2000  2001  2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  2013  2014  2015  2016  2017  2018  2019  <20202021  2022  2023  2024 
<== Date ==> <== Thread ==>

Subject: FW: EPICS on windows10 Debian Shell
From: Alfio Rizzo via Tech-talk <tech-talk at aps.anl.gov>
To: "tech-talk at aps.anl.gov" <tech-talk at aps.anl.gov>
Date: Tue, 3 Mar 2020 09:45:56 +0000
Hi,
When I compile epics base on a WSL Debian/Ubunt shell
So far I manually commented out (when needed) the _WIN32 macro in the  blockingUDPTransport.cpp file


But a possible clean solution is to create a macro called WSL , checking the kernel version (my kernel is for instance 4.4.0-17763-Microsoft )

#---------------------------------------------------------------

ifeq ($(findstring Microsoft,$(shell uname -r)),Microsoft) 

CPPFLAGS += -D_WSL_ 

Endif


Those lines can be put in the RULES_BUILD file of EPICS Base for instance,

Then this macro can be used in the  blockingUDPTransport.cpp file (see attachment)


It could be possible to apply that in the next EPICS base release patch ?
Thanks
Best
Alfio 

-----Original Message-----
From: Tech-talk <tech-talk-bounces at aps.anl.gov> On Behalf Of Alfio Rizzo via Tech-talk
Sent: Wednesday, October 2, 2019 6:36 AM
To: Michael Davidsaver <mdavidsaver at gmail.com>
Cc: tech-talk at aps.anl.gov
Subject: RE: EPICS on windows10 Debian Shell

Hi
The macros are defined the same as any GCC Linux/Unix compiler, I have already checked this before.
See file in attachment
Best
Alfio 

-----Original Message-----
From: Michael Davidsaver <mdavidsaver at gmail.com>
Sent: Wednesday, October 2, 2019 2:22 AM
To: Alfio Rizzo <Alfio.Rizzo at esss.se>
Cc: tech-talk at aps.anl.gov
Subject: Re: EPICS on windows10 Debian Shell

On 10/1/19 8:55 AM, Alfio Rizzo via Tech-talk wrote:
> As it should be, the OS macro is defined as __linux__ not as _WIN32,
> 
> since the compiler is NOT MinGW or CYGWIN, but GNU as I said before.
> 
> So, then it comes issue I had was this UDP socket biding issue,
> 
> since binding is still handled a la Windows.
> 
> So the problem is in the source file blockingUDPTransport.cpp
> 
> where the macro _WIN32 is not used consistently in my case.

Can you check what macros this compiler pre-defines?
My usual reference doesn't cover this new variation.

https://sourceforge.net/p/predef/wiki/Home/

> cpp -dM /dev/null > wsl_predefined.h
/**
 * Copyright - See the COPYRIGHT that is included with this distribution.
 * pvAccessCPP is distributed subject to a Software License Agreement found
 * in file LICENSE that is included with this distribution.
 */

#ifdef _WIN32
// needed for ip_mreq
#include <ws2tcpip.h>
#endif

#include <sstream>

#include <sys/types.h>
#include <cstdio>

#include <epicsThread.h>
#include <osiSock.h>
#include <epicsAtomic.h>

#include <pv/lock.h>
#include <pv/byteBuffer.h>
#include <pv/reftrack.h>

#define epicsExportSharedSymbols
#include <pv/blockingUDP.h>
#include <pv/pvaConstants.h>
#include <pv/inetAddressUtil.h>
#include <pv/logger.h>
#include <pv/likely.h>
#include <pv/hexDump.h>

using namespace epics::pvData;
using namespace std;
using std::tr1::static_pointer_cast;

namespace epics {
namespace pvAccess {

#ifdef __vxworks
inline int sendto(int s, const char *buf, size_t len, int flags, const struct sockaddr *to, int tolen)
{
    return ::sendto(s, const_cast<char*>(buf), len, flags, const_cast<struct sockaddr *>(to), tolen);
}
#endif

// reserve some space for CMD_ORIGIN_TAG message
#define RECEIVE_BUFFER_PRE_RESERVE (PVA_MESSAGE_HEADER_SIZE + 16)

size_t BlockingUDPTransport::num_instances;

BlockingUDPTransport::BlockingUDPTransport(bool serverFlag,
        ResponseHandler::shared_pointer const & responseHandler, SOCKET channel,
        osiSockAddr& bindAddress,
        short /*remoteTransportRevision*/) :
    _closed(),
    _responseHandler(responseHandler),
    _channel(channel),
    _bindAddress(bindAddress),
    _sendAddresses(0),
    _ignoredAddresses(0),
    _tappedNIF(0),
    _sendToEnabled(false),
    _localMulticastAddressEnabled(false),
    _receiveBuffer(MAX_UDP_RECV+RECEIVE_BUFFER_PRE_RESERVE),
    _sendBuffer(MAX_UDP_RECV),
    _lastMessageStartPosition(0),
    _clientServerWithEndianFlag(
        (serverFlag ? 0x40 : 0x00) | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00))
{
    assert(_responseHandler.get());

    osiSocklen_t sockLen = sizeof(sockaddr);
    // read the actual socket info
    int retval = ::getsockname(_channel, &_remoteAddress.sa, &sockLen);
    if(retval<0) {
        // error obtaining remote address, fallback to bindAddress
        _remoteAddress = _bindAddress;

        char strBuffer[64];
        epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
        LOG(logLevelDebug, "getsockname error: %s.", strBuffer);
        _remoteName = "<unknown>:0";
    } else {
        char strBuffer[64];
        sockAddrToDottedIP(&_remoteAddress.sa, strBuffer, sizeof(strBuffer));
        _remoteName = strBuffer;
        LOG(logLevelDebug, "Creating datagram socket from: %s.",
            _remoteName.c_str());
    }

    REFTRACE_INCREMENT(num_instances);
}

BlockingUDPTransport::~BlockingUDPTransport() {
    REFTRACE_DECREMENT(num_instances);

    close(true); // close the socket and stop the thread.
}

void BlockingUDPTransport::start() {

    string threadName = "UDP-rx " + inetAddressToString(_bindAddress);

    if (IS_LOGGABLE(logLevelTrace))
    {
        LOG(logLevelTrace, "Starting thread: %s.", threadName.c_str());
    }

    _thread.reset(new epicsThread(*this, threadName.c_str(),
                                  epicsThreadGetStackSize(epicsThreadStackBig),
                                  epicsThreadPriorityMedium));
    _thread->start();
}

void BlockingUDPTransport::close() {
    close(true);
}

void BlockingUDPTransport::ensureData(std::size_t size) {
    if (_receiveBuffer.getRemaining() >= size)
        return;
    std::ostringstream msg;
    msg<<"no more data in UDP packet : "
       <<_receiveBuffer.getPosition()<<":"<<_receiveBuffer.getLimit()
       <<" for "<<size;
    throw std::underflow_error(msg.str());
}

void BlockingUDPTransport::close(bool waitForThreadToComplete) {
    {
        Lock guard(_mutex);
        if(_closed.get()) return;
        _closed.set();
    }

    if (IS_LOGGABLE(logLevelDebug))
    {
        LOG(logLevelDebug,
            "UDP socket %s closed.",
            inetAddressToString(_bindAddress).c_str());
    }

    epicsSocketSystemCallInterruptMechanismQueryInfo info  =
        epicsSocketSystemCallInterruptMechanismQuery ();
    switch ( info )
    {
    case esscimqi_socketCloseRequired:
        epicsSocketDestroy ( _channel );
        break;
    case esscimqi_socketBothShutdownRequired:
    {
        /*int status =*/ ::shutdown ( _channel, SHUT_RDWR );
        /*
        if ( status ) {
            char sockErrBuf[64];
            epicsSocketConvertErrnoToString (
                sockErrBuf, sizeof ( sockErrBuf ) );
        LOG(logLevelDebug,
            "UDP socket %s failed to shutdown: %s.",
            inetAddressToString(_bindAddress).c_str(), sockErrBuf);
        }
        */
        epicsSocketDestroy ( _channel );
    }
    break;
    case esscimqi_socketSigAlarmRequired:
    // not supported anymore anyway
    default:
        epicsSocketDestroy(_channel);
    }


    // wait for send thread to exit cleanly
    if (_thread.get() && waitForThreadToComplete)
    {
        if (!_thread->exitWait(5.0))
        {
            LOG(logLevelError,
                "Receive thread for UDP socket %s has not exited.",
                inetAddressToString(_bindAddress).c_str());
        }
    }
}

void BlockingUDPTransport::enqueueSendRequest(TransportSender::shared_pointer const & sender) {
    Lock lock(_sendMutex);

    _sendToEnabled = false;
    _sendBuffer.clear();
    {
        epicsGuard<TransportSender> G(*sender);
        sender->send(&_sendBuffer, this);
    }
    endMessage();
    if(!_sendToEnabled)
        send(&_sendBuffer);
    else
        send(&_sendBuffer, _sendTo);
}


void BlockingUDPTransport::flushSendQueue()
{
    // noop (note different sent addresses are possible)
}

void BlockingUDPTransport::startMessage(int8 command, size_t /*ensureCapacity*/, int32 payloadSize) {
    _lastMessageStartPosition = _sendBuffer.getPosition();
    _sendBuffer.putByte(PVA_MAGIC);
    _sendBuffer.putByte((_clientServerWithEndianFlag&0x40) ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION);
    _sendBuffer.putByte(_clientServerWithEndianFlag);
    _sendBuffer.putByte(command); // command
    _sendBuffer.putInt(payloadSize);
}

void BlockingUDPTransport::endMessage() {
    _sendBuffer.putInt(
        _lastMessageStartPosition+(sizeof(int16)+2),
        _sendBuffer.getPosition()-_lastMessageStartPosition-PVA_MESSAGE_HEADER_SIZE);
}

void BlockingUDPTransport::run() {
    // This function is always called from only one thread - this
    // object's own thread.

    osiSockAddr fromAddress;
    osiSocklen_t addrStructSize = sizeof(sockaddr);
    Transport::shared_pointer thisTransport(internal_this);

    try {

        char* recvfrom_buffer_start = (char*)(_receiveBuffer.getBuffer()+RECEIVE_BUFFER_PRE_RESERVE);
        size_t recvfrom_buffer_len =_receiveBuffer.getSize()-RECEIVE_BUFFER_PRE_RESERVE;
        while(!_closed.get())
        {
            int bytesRead = recvfrom(_channel,
                                     recvfrom_buffer_start, recvfrom_buffer_len,
                                     0, (sockaddr*)&fromAddress,
                                     &addrStructSize);

            if(likely(bytesRead>=0)) {
                // successfully got datagram
                atomic::add(_totalBytesRecv, bytesRead);
                bool ignore = false;
                for(size_t i = 0; i <_ignoredAddresses.size(); i++)
                {
                    if(_ignoredAddresses[i].ia.sin_addr.s_addr==fromAddress.ia.sin_addr.s_addr)
                    {
                        ignore = true;
                        if(pvAccessIsLoggable(logLevelDebug)) {
                            char strBuffer[64];
                            sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
                            LOG(logLevelDebug, "UDP Ignore (%d) %s x- %s", bytesRead, _remoteName.c_str(), strBuffer);
                        }
                        break;
                    }
                }

                if(likely(!ignore)) {
                    if(pvAccessIsLoggable(logLevelDebug)) {
                        char strBuffer[64];
                        sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
                        LOG(logLevelDebug, "UDP %s Rx (%d) %s <- %s", (_clientServerWithEndianFlag&0x40)?"Server":"Client", bytesRead, _remoteName.c_str(), strBuffer);
                    }

                    _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE);
                    _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);

                    try {
                        processBuffer(thisTransport, fromAddress, &_receiveBuffer);
                    } catch(std::exception& e) {
                        if(IS_LOGGABLE(logLevelError)) {
                            char strBuffer[64];
                            sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
                            size_t epos = _receiveBuffer.getPosition();

                            // of course _receiveBuffer _may_ have been modified during processing...
                            _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE);
                            _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);

                            std::cerr<<"Error on UDP RX "<<strBuffer<<" -> "<<_remoteName<<" at "<<epos<<" : "<<e.what()<<"\n"
                                      <<HexDump(_receiveBuffer).limit(256u);
                        }
                    }
                }
            } else {

                int socketError = SOCKERRNO;

                // interrupted or timeout
                if (socketError == SOCK_EINTR ||
                        socketError == EAGAIN ||        // no alias in libCom
                        // windows times out with this
                        socketError == SOCK_ETIMEDOUT ||
                        socketError == SOCK_EWOULDBLOCK)
                    continue;

                if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux
                        socketError == SOCK_ECONNRESET)     // or ECONNRESET in Windows
                    continue;

                // log a 'recvfrom' error
                if(!_closed.get())
                {
                    char errStr[64];
                    epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
                    LOG(logLevelError, "Socket recvfrom error: %s.", errStr);
                }

                close(false);
                break;
            }

        }
    } catch(...) {
        // TODO: catch all exceptions, and act accordingly
        close(false);
    }

    if (IS_LOGGABLE(logLevelTrace))
    {
        string threadName = "UDP-rx "+inetAddressToString(_bindAddress);
        LOG(logLevelTrace, "Thread '%s' exiting.", threadName.c_str());
    }
}

bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & transport,
        osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) {


    // handle response(s)
    while(likely((int)receiveBuffer->getRemaining()>=PVA_MESSAGE_HEADER_SIZE)) {

        //
        // read header
        //

        // first byte is PVA_MAGIC
        int8 magic = receiveBuffer->getByte();
        if(unlikely(magic != PVA_MAGIC))
            return false;

        // second byte version
        int8 version = receiveBuffer->getByte();
        if(version==0) {
            // 0 -> 1 included incompatible changes
            return false;
        }

        int8 flags = receiveBuffer->getByte();
        if (flags & 0x80)
        {
            // 7th bit set
            receiveBuffer->setEndianess(EPICS_ENDIAN_BIG);
        }
        else
        {
            receiveBuffer->setEndianess(EPICS_ENDIAN_LITTLE);
        }

        // command ID and paylaod
        int8 command = receiveBuffer->getByte();
        // TODO check this cast (size_t must be 32-bit)
        size_t payloadSize = receiveBuffer->getInt();

        // control message check (skip message)
        if (flags & 0x01)
            continue;

        size_t nextRequestPosition = receiveBuffer->getPosition() + payloadSize;

        // payload size check
        if(unlikely(nextRequestPosition>receiveBuffer->getLimit())) return false;

        // CMD_ORIGIN_TAG filtering
        // NOTE: from design point of view this is not a right place to process application message here
        if (unlikely(command == CMD_ORIGIN_TAG))
        {
            // enabled?
            if (!_tappedNIF.empty())
            {
                // 128-bit IPv6 address
                osiSockAddr originNIFAddress;
                memset(&originNIFAddress, 0, sizeof(originNIFAddress));

                if (decodeAsIPv6Address(receiveBuffer, &originNIFAddress))
                {
                    originNIFAddress.ia.sin_family = AF_INET;

                    /*
                    LOG(logLevelDebug, "Got CMD_ORIGIN_TAG message form %s tagged as %s.",
                        inetAddressToString(fromAddress, true).c_str(),
                        inetAddressToString(originNIFAddress, false).c_str());
                    */

                    // filter
                    if (originNIFAddress.ia.sin_addr.s_addr != htonl(INADDR_ANY))
                    {
                        bool accept = false;
                        for(size_t i = 0; i < _tappedNIF.size(); i++)
                        {
                            if(_tappedNIF[i].ia.sin_addr.s_addr == originNIFAddress.ia.sin_addr.s_addr)
                            {
                                accept = true;
                                break;
                            }
                        }

                        // ignore messages from non-tapped NIFs
                        if (!accept)
                            return false;
                    }
                }
            }
        }
        else
        {
            // handle
            _responseHandler->handleResponse(&fromAddress, transport,
                                             version, command, payloadSize,
                                             &_receiveBuffer);
        }

        // set position (e.g. in case handler did not read all)
        receiveBuffer->setPosition(nextRequestPosition);
    }

    // all ok
    return true;
}

bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSockAddr& address)
{
    if (IS_LOGGABLE(logLevelDebug))
    {
        LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
            length, _remoteName.c_str(), inetAddressToString(address).c_str());
    }

    int retval = sendto(_channel, buffer,
                        length, 0, &(address.sa), sizeof(sockaddr));
    if(unlikely(retval<0))
    {
        char errStr[64];
        epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
        LOG(logLevelDebug, "Socket sendto to %s error: %s.",
            inetAddressToString(address).c_str(), errStr);
        return false;
    }
    atomic::add(_totalBytesSent, length);

    return true;
}

bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address) {

    buffer->flip();

    if (IS_LOGGABLE(logLevelDebug))
    {
        LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
            buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
    }

    int retval = sendto(_channel, buffer->getBuffer(),
                        buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr));
    if(unlikely(retval<0))
    {
        char errStr[64];
        epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
        LOG(logLevelDebug, "Socket sendto to %s error: %s.",
            inetAddressToString(address).c_str(), errStr);
        return false;
    }
    atomic::add(_totalBytesSent, buffer->getLimit());

    // all sent
    buffer->setPosition(buffer->getLimit());

    return true;
}

bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {
    if(_sendAddresses.empty()) return false;

    buffer->flip();

    bool allOK = true;
    for(size_t i = 0; i<_sendAddresses.size(); i++) {

        // filter
        if (target != inetAddressType_all)
            if ((target == inetAddressType_unicast && !_isSendAddressUnicast[i]) ||
                    (target == inetAddressType_broadcast_multicast && _isSendAddressUnicast[i]))
                continue;

        if (IS_LOGGABLE(logLevelDebug))
        {
            LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
                buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
        }

        int retval = sendto(_channel, buffer->getBuffer(),
                            buffer->getLimit(), 0, &(_sendAddresses[i].sa),
                            sizeof(sockaddr));
        if(unlikely(retval<0))
        {
            char errStr[64];
            epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
            LOG(logLevelDebug, "Socket sendto to %s error: %s.",
                inetAddressToString(_sendAddresses[i]).c_str(), errStr);
            allOK = false;
        }
        atomic::add(_totalBytesSent, buffer->getLimit());
    }

    // all sent
    buffer->setPosition(buffer->getLimit());

    return allOK;
}


void BlockingUDPTransport::join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr)
{
    struct ip_mreq imreq;
    memset(&imreq, 0, sizeof(struct ip_mreq));

    imreq.imr_multiaddr.s_addr = mcastAddr.ia.sin_addr.s_addr;
    imreq.imr_interface.s_addr = nifAddr.ia.sin_addr.s_addr;

    // join multicast group on the given interface
    int status = ::setsockopt(_channel, IPPROTO_IP, IP_ADD_MEMBERSHIP,
                              (char*)&imreq, sizeof(struct ip_mreq));
    if (status)
    {
        char errStr[64];
        epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
        throw std::runtime_error(
            string("Failed to join to the multicast group '") +
            inetAddressToString(mcastAddr) + "' on network interface '" +
            inetAddressToString(nifAddr, false) + "': " + errStr);
    }
}

void BlockingUDPTransport::setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback)
{
    // set the multicast outgoing interface
    int status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_IF,
                              (char*)&nifAddr.ia.sin_addr, sizeof(struct in_addr));
    if (status)
    {
        char errStr[64];
        epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
        throw std::runtime_error(
            string("Failed to set multicast network interface '") +
            inetAddressToString(nifAddr, false) + "': " + errStr);
    }

    // send multicast traffic to myself too
    unsigned char mcast_loop = (loopback ? 1 : 0);
    status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_LOOP,
                          (char*)&mcast_loop, sizeof(unsigned char));
    if (status)
    {
        char errStr[64];
        epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
        throw std::runtime_error(
            string("Failed to enable multicast loopback on network interface '") +
            inetAddressToString(nifAddr, false) + "': " + errStr);
    }

}

void initializeUDPTransports(bool serverFlag,
                             BlockingUDPTransportVector& udpTransports,
                             const IfaceNodeVector& ifaceList,
                             const ResponseHandler::shared_pointer& responseHandler,
                             BlockingUDPTransport::shared_pointer& sendTransport,
                             int32& listenPort,
                             bool autoAddressList,
                             const std::string& addressList,
                             const std::string& ignoreAddressList)
{
    BlockingUDPConnector connector(serverFlag);

    const int8_t protoVer = serverFlag ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION;

    //
    // Create UDP transport for sending (to all network interfaces)
    //

    osiSockAddr anyAddress;
    memset(&anyAddress, 0, sizeof(anyAddress));
    anyAddress.ia.sin_family = AF_INET;
    anyAddress.ia.sin_port = htons(0);
    anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);

    sendTransport = connector.connect(responseHandler, anyAddress, protoVer);
    if (!sendTransport)
    {
        THROW_BASE_EXCEPTION("Failed to initialize UDP transport.");
    }

    // to allow automatic assignment of listen port (for testing)
    if (listenPort == 0)
    {
        listenPort = ntohs(sendTransport->getRemoteAddress().ia.sin_port);
        LOG(logLevelDebug, "Dynamic listen UDP port set to %u.", (unsigned)listenPort);
    }

    // TODO current implementation shares the port (aka beacon and search port)
    int32 sendPort = listenPort;

    //
    // compile auto address list - where to send packets
    //

    InetAddrVector autoBCastAddr;
    for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
    {
        ifaceNode node = *iter;

        // in practice, interface will have either destination (PPP)
        // or broadcast, but never both.
        if (node.validP2P && node.peer.ia.sin_family != AF_UNSPEC)
        {
            node.peer.ia.sin_port = htons(sendPort);
            autoBCastAddr.push_back(node.peer);
        }
        if (node.validBcast && node.bcast.ia.sin_family != AF_UNSPEC)
        {
            node.bcast.ia.sin_port = htons(sendPort);
            autoBCastAddr.push_back(node.bcast);
        }
    }

    //
    // set send address list
    //
    {
        InetAddrVector list;
        getSocketAddressList(list, addressList, sendPort, autoAddressList ? &autoBCastAddr : NULL);

        // avoid duplicates in address list
        {
            InetAddrVector dedup;

            for (InetAddrVector::const_iterator iter = list.begin(); iter != list.end(); iter++)
            {
                bool match = false;

                for(InetAddrVector::const_iterator inner = dedup.begin(); !match && inner != dedup.end(); inner++)
                {
                    match = iter->ia.sin_family==inner->ia.sin_family && iter->ia.sin_addr.s_addr==inner->ia.sin_addr.s_addr;
                }

                if(!match)
                    dedup.push_back(*iter);
            }
            list.swap(dedup);
        }

        std::vector<bool> isunicast(list.size());

        if (list.empty()) {
            LOG(logLevelError,
                "No %s broadcast addresses found or specified - empty address list!", serverFlag ? "server" : "client");
        }

        for (size_t i = 0; i < list.size(); i++) {

            isunicast[i] = !isMulticastAddress(&list[i]);

            for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); isunicast[i] && iter != ifaceList.end(); iter++)
            {
                ifaceNode node = *iter;
                // compare with all iface bcasts
                if(node.validBcast && list[i].ia.sin_family==iter->bcast.ia.sin_family
                        && list[i].ia.sin_addr.s_addr==iter->bcast.ia.sin_addr.s_addr) {
                    isunicast[i] = false;
                }
            }
            LOG(logLevelDebug,
                "Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(),
                isunicast[i]?"":"not ");
        }

        sendTransport->setSendAddresses(list, isunicast);
    }

    sendTransport->start();
    udpTransports.push_back(sendTransport);

    // TODO configurable local NIF, address
    osiSockAddr loAddr;
    memset(&loAddr, 0, sizeof(loAddr));
    loAddr.ia.sin_family = AF_INET;
    loAddr.ia.sin_port = ntohs(0);
    loAddr.ia.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

    // TODO configurable local multicast address
    std::string mcastAddress("224.0.0.128");

    osiSockAddr group;
    aToIPAddr(mcastAddress.c_str(), listenPort, &group.ia);

    //
    // set ignore address list
    //
    InetAddrVector ignoreAddressVector;
    getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0);

    //
    // Setup UDP trasport(s) (per interface)
    //

    InetAddrVector tappedNIF;

    for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
    {
        ifaceNode node = *iter;

        LOG(logLevelDebug, "Setting up UDP for interface %s/%s, broadcast %s, dest %s.",
            inetAddressToString(node.addr, false).c_str(),
            node.validBcast ? inetAddressToString(node.mask, false).c_str() : "<none>",
            node.validBcast ? inetAddressToString(node.bcast, false).c_str() : "<none>",
            node.validP2P ? inetAddressToString(node.peer, false).c_str() : "<none>");
        try
        {
            // where to bind (listen) address
            osiSockAddr listenLocalAddress;
            memset(&listenLocalAddress, 0, sizeof(listenLocalAddress));
            listenLocalAddress.ia.sin_family = AF_INET;
            listenLocalAddress.ia.sin_port = htons(listenPort);
            listenLocalAddress.ia.sin_addr.s_addr = node.addr.ia.sin_addr.s_addr;

            BlockingUDPTransport::shared_pointer transport = connector.connect(
                        responseHandler, listenLocalAddress, protoVer);
            if (!transport)
                continue;
            listenLocalAddress = transport->getRemoteAddress();

            transport->setIgnoredAddresses(ignoreAddressVector);

            tappedNIF.push_back(listenLocalAddress);


            BlockingUDPTransport::shared_pointer transport2;

            if(!node.validBcast || node.bcast.sa.sa_family != AF_INET ||
                    node.bcast.ia.sin_addr.s_addr == listenLocalAddress.ia.sin_addr.s_addr) {
                // warning if not point-to-point
                LOG(node.bcast.sa.sa_family != AF_INET ? logLevelDebug : logLevelWarn,
                    "Unable to find broadcast address of interface %s.", inetAddressToString(node.addr, false).c_str());
            }
#if !defined(_WIN32) && !defined(_WSL_)
            else
            {
                /* An oddness of BSD sockets (not winsock) is that binding to
                 * INADDR_ANY will receive unicast and broadcast, but binding to
                 * a specific interface address receives only unicast.  The trick
                 * is to bind a second socket to the interface broadcast address,
                 * which will then receive only broadcasts.
                 */

                osiSockAddr bcastAddress;
                memset(&bcastAddress, 0, sizeof(bcastAddress));
                bcastAddress.ia.sin_family = AF_INET;
                bcastAddress.ia.sin_port = htons(listenPort);
                bcastAddress.ia.sin_addr.s_addr = node.bcast.ia.sin_addr.s_addr;

                transport2 = connector.connect(responseHandler, bcastAddress, protoVer);
                if (transport2)
                {
                    /* The other wrinkle is that nothing should be sent from this second
                     * socket. So replies are made through the unicast socket.
                     *
                    transport2->setReplyTransport(transport);
                    */
                    // NOTE: search responses all always send from sendTransport

                    transport2->setIgnoredAddresses(ignoreAddressVector);

                    tappedNIF.push_back(bcastAddress);
                }
            }
#endif

            transport->setMutlicastNIF(loAddr, true);
            transport->setLocalMulticastAddress(group);

            transport->start();
            udpTransports.push_back(transport);

            if (transport2)
            {
                transport2->start();
                udpTransports.push_back(transport2);
            }
        }
        catch (std::exception& e)
        {
            THROW_BASE_EXCEPTION_CAUSE("Failed to initialize UDP transport.", e);
        }
        catch (...)
        {
            THROW_BASE_EXCEPTION("Failed to initialize UDP transport.");
        }
    }


    //
    // Setup local multicasting
    //

    // WIN32 do not allow binding to multicast address, use any address w/ port
#if defined(_WIN32) || defined (_WSL_)
    anyAddress.ia.sin_port = htons(listenPort);
#endif

    BlockingUDPTransport::shared_pointer localMulticastTransport;
    try
    {
        // NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address
        localMulticastTransport = connector.connect(
                                      responseHandler,
#if !defined(_WIN32) && !defined(_WSL_)
                                      group,
#else
                                      anyAddress,
#endif
                                      protoVer);
        if (!localMulticastTransport)
            throw std::runtime_error("Failed to bind UDP socket.");

        localMulticastTransport->setTappedNIF(tappedNIF);
        localMulticastTransport->join(group, loAddr);
        localMulticastTransport->start();
        udpTransports.push_back(localMulticastTransport);

        LOG(logLevelDebug, "Local multicast enabled on %s/%s.",
            inetAddressToString(loAddr, false).c_str(),
            inetAddressToString(group).c_str());
    }
    catch (std::exception& ex)
    {
        LOG(logLevelDebug, "Failed to initialize local multicast, functionality disabled. Reason: %s.", ex.what());
    }
}


}
}

Replies:
RE: EPICS on windows10 Debian Shell Freddie Akeroyd - UKRI STFC via Tech-talk

Navigate by Date:
Prev: Re: pco Camera USB and IEEE interface Bradnick, Ben (Tessella, RAL, TEC) via Tech-talk
Next: NORD on a char waveform record Thomas Willemsen - UKRI STFC via Tech-talk
Index: 1994  1995  1996  1997  1998  1999  2000  2001  2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  2013  2014  2015  2016  2017  2018  2019  <20202021  2022  2023  2024 
Navigate by Thread:
Prev: quadEM R9-3 now available Mark Rivers via Tech-talk
Next: RE: EPICS on windows10 Debian Shell Freddie Akeroyd - UKRI STFC via Tech-talk
Index: 1994  1995  1996  1997  1998  1999  2000  2001  2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  2013  2014  2015  2016  2017  2018  2019  <20202021  2022  2023  2024 
ANJ, 03 Mar 2020 Valid HTML 4.01! · Home · News · About · Base · Modules · Extensions · Distributions · Download ·
· Search · EPICS V4 · IRMIS · Talk · Bugs · Documents · Links · Licensing ·