Client.cpp


// **********************************************************************
//
// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
//
// **********************************************************************

#include <Ice/Ice.h>
#include <Throughput.h>

#include <iomanip>

using namespace std;
using namespace Demo;

class ThroughputClient : public Ice::Application
{
public:
    ThroughputClient();
    void MeasureOneWayRate(ThroughputPrx proxyBatchOneWay, ThroughputPrx proxyTwoWay, int n, int numRepetitions, bool deliver, const char* name, std::function<void()> fn);
    void TimeOneWayCalls(ThroughputPrx proxyBatchOneWay, ThroughputPrx proxyTwoWay, int n, int numRepetitions, bool flush);
    virtual int run(int, char*[]);
};

int
main(int argc, char* argv[])
{
#ifdef ICE_STATIC_LIBS
    Ice::registerIceSSL();
#endif
    ThroughputClient app;
    return app.main(argc, argv, "config.client");
}

ThroughputClient::ThroughputClient() :
    //
    // Since this is an interactive demo we don't want any signal
    // handling.
    //
    Ice::Application(Ice::NoSignalHandling)
{
}

void MeasureRatek(const char* name, int n, int numRepetitions, std::function<void()> fn)
{
    double timeMicroSec = 1E30;
    for (int i=0 ; i < n ; ++i)
    {
        IceUtil::Time start = IceUtil::Time::now(IceUtil::Time::Monotonic);
        fn();
        IceUtil::Time end = IceUtil::Time::now(IceUtil::Time::Monotonic);
        double t = (double) ((end-start).toMicroSeconds());
        if (t < timeMicroSec) timeMicroSec = t;
    }
    cout << name << "  count: " << numRepetitions << "  time: " << timeMicroSec / 1000 << "ms" << "  rate: " << fixed << setprecision(1) << 1000.0 * numRepetitions / timeMicroSec << " k calls/sec" << endl;
}

void MeasureRate(const char* name, int n, int numRepetitions, std::function<void()> fn)
{
    double timeMicroSec = 1E30;
    for (int i=0 ; i < n ; ++i)
    {
        IceUtil::Time start = IceUtil::Time::now(IceUtil::Time::Monotonic);
        fn();
        IceUtil::Time end = IceUtil::Time::now(IceUtil::Time::Monotonic);
        double t = (double) ((end-start).toMicroSeconds());
        if (t < timeMicroSec) timeMicroSec = t;
    }
    cout << name << "  count: " << numRepetitions << "  time: " << timeMicroSec / 1000 << "ms" << "  rate: " << fixed << setprecision(1) << numRepetitions / timeMicroSec << " M calls/sec" << endl;
}

void MeasureBitRate(const char* name, int n, int numBytes, std::function<void()> fn)
{
    double timeMicroSec = 1E30;
    for (int i=0 ; i < n ; ++i)
    {
        IceUtil::Time start = IceUtil::Time::now(IceUtil::Time::Monotonic);
        fn();
        IceUtil::Time end = IceUtil::Time::now(IceUtil::Time::Monotonic);
        double t = (double) ((end-start).toMicroSeconds());
        if (t < timeMicroSec) timeMicroSec = t;
    }
    cout << name << "  Num bytes: " << numBytes/1000000 << "MB  time: " << timeMicroSec / 1000 << "ms" << "  rate: " << fixed << setprecision(0) << 8.0 * numBytes / timeMicroSec << " Mbit/sec" << endl;
}

void ThroughputClient::MeasureOneWayRate(ThroughputPrx proxyBatchOneWay, ThroughputPrx proxyTwoWay, int n, int numRepetitions, bool deliver, const char* name, std::function<void()> fn)
{
    double timeMicroSec = 1E30;
    for (int i=0 ; i < n ; ++i)
    {
        IceUtil::Time start = IceUtil::Time::now(IceUtil::Time::Monotonic);
        fn();
        IceUtil::Time end = IceUtil::Time::now(IceUtil::Time::Monotonic);
        communicator()->flushBatchRequests();
        proxyTwoWay->inc(0);
        if (deliver) 
        {
            end = IceUtil::Time::now(IceUtil::Time::Monotonic);
        }
        double t = (double) ((end-start).toMicroSeconds());
        if (t < timeMicroSec) timeMicroSec = t;
    }
    cout << name << "  count: " << numRepetitions << "  time: " << timeMicroSec / 1000 << "ms" << "  rate: " << fixed << setprecision(1) << numRepetitions / timeMicroSec << " M calls/sec" << endl;
}

void ThroughputClient::TimeOneWayCalls(ThroughputPrx proxyBatchOneWay, ThroughputPrx proxyTwoWay, int n, int numRepetitions, bool deliver)
{
    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "v", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->v();
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "b1", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->b1(0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "b2", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->b2(0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "b4", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->b4(0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "b8", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->b8(0,0,0,0,0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "s1", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->s1(0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "s2", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->s2(0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "s4", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->s4(0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "s8", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->s8(0,0,0,0,0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "i1", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->i1(0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "i2", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->i2(0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "i4", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->i4(0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "i8", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->i8(0,0,0,0,0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "l1", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->l1(0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "l2", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->l2(0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "l4", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->l4(0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "l8", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->l8(0,0,0,0,0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "f1", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->f1(0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "f2", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->f2(0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "f4", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->f4(0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "f8", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->f8(0,0,0,0,0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "d1", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->d1(0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "d2", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->d2(0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "d4", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->d4(0,0,0,0);
            }
        });

    MeasureOneWayRate(proxyBatchOneWay, proxyTwoWay, n, numRepetitions, deliver, "d8", 
        [proxyBatchOneWay,numRepetitions] 
        {
            for(int i = 0; i < numRepetitions; ++i)
            {
                proxyBatchOneWay->d8(0,0,0,0,0,0,0,0);
            }
        });
}

int
ThroughputClient::run(int argc, char* argv[])
{
    if(argc > 1)
    {
        cerr << appName() << ": too many arguments" << endl;
        return EXIT_FAILURE;
    }

    ThroughputPrx proxyTwoWay = ThroughputPrx::checkedCast(communicator()->propertyToProxy("Throughput.Proxy"));
    if(!proxyTwoWay)
    {
        cerr << argv[0] << ": invalid proxy" << endl;
        return EXIT_FAILURE;
    }

    ThroughputPrx proxyOneWay = ThroughputPrx::uncheckedCast(proxyTwoWay->ice_oneway());
    ThroughputPrx proxyBatchOneWay = ThroughputPrx::uncheckedCast(proxyTwoWay->ice_batchOneway());

    ByteSeq byteSeq(ByteSeqSize);
    pair<const Ice::Byte*, const Ice::Byte*> byteArr;
    byteArr.first = &byteSeq[0];
    byteArr.second = byteArr.first + byteSeq.size();

    StringSeq stringSeq(StringSeqSize, "hello");
    vector<Util::string_view> stringViewSeq(StringSeqSize, "hello");

    StringDoubleSeq structSeq(StringDoubleSeqSize);
    for(int i = 0; i < StringDoubleSeqSize; ++i)
    {
        structSeq[i].s = "hello";
        structSeq[i].d = 3.14;
    }

    FixedSeq fixedSeq(FixedSeqSize);
    for(int i = 0; i < FixedSeqSize; ++i)
    {
        fixedSeq[i].i = 0;
        fixedSeq[i].j = 0;
        fixedSeq[i].d = 0;
    }

    //
    // To allow cross-language tests we may need to "warm up" the
    // server. The warm up is to ensure that any JIT compiler will
    // have converted any hotspots to native code. This ensures an
    // accurate throughput measurement.
    //
    if(proxyTwoWay->needsWarmup())
    {
        proxyTwoWay->startWarmup();

        ByteSeq emptyBytesBuf(1);
        emptyBytesBuf.resize(1);
        pair<const Ice::Byte*, const Ice::Byte*> emptyBytes;
        emptyBytes.first = &emptyBytesBuf[0];
        emptyBytes.second = emptyBytes.first + emptyBytesBuf.size();

        StringSeq emptyStrings(1);
        emptyStrings.resize(1);

        vector<Util::string_view> emptyStringViews(1);
        emptyStringViews.resize(1);

        StringDoubleSeq emptyStructs(1);
        emptyStructs.resize(1);

        FixedSeq emptyFixed(1);
        emptyFixed.resize(1);

        cout << "warming up the server... " << flush;
        for(int i = 0; i < 10000; i++)
        {
            proxyTwoWay->sendByteSeq(emptyBytes);
            proxyTwoWay->sendStringSeq(emptyStringViews);
            proxyTwoWay->sendStructSeq(emptyStructs);
            proxyTwoWay->sendFixedSeq(emptyFixed);

            proxyTwoWay->recvByteSeq();
            proxyTwoWay->recvStringSeq();
            proxyTwoWay->recvStructSeq();
            proxyTwoWay->recvFixedSeq();

            proxyTwoWay->echoByteSeq(emptyBytes);
            proxyTwoWay->echoStringSeq(emptyStringViews);
            proxyTwoWay->echoStructSeq(emptyStructs);
            proxyTwoWay->echoFixedSeq(emptyFixed);
        }

        proxyTwoWay->endWarmup();

        cout << " ok" << endl;
    }
    else
    {
        proxyTwoWay->ice_ping(); // Initial ping to setup the connection.
    }

    try
    {
        int n = 10;
        int nr;
        
        nr = 1000000;

        cout << "Batched one way messages" << endl;
        TimeOneWayCalls(proxyBatchOneWay, proxyTwoWay, n, nr, false);

        cout << "\nBatched and delivered one way messages" << endl;
        TimeOneWayCalls(proxyBatchOneWay, proxyTwoWay, n, nr, true);

        nr = 1000;

        cout << "\nSynchronous 2-way messages" << endl;
        MeasureRatek("inc (2-way)", n, nr, 
            [proxyTwoWay,nr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->inc(0);
                }
            });

        //////////////////////////// 500000 bytes

        cout << "\n500000 byte sequence" << endl;

        int numBytes = nr*ByteSeqSize;

        MeasureBitRate("sendByteSeq (1-way)", n, numBytes, 
            [proxyOneWay,nr,byteArr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyOneWay->sendByteSeq(byteArr);
                }
            });

        MeasureBitRate("sendByteSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr,byteArr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->sendByteSeq(byteArr);
                }
            });

        MeasureBitRate("recvByteSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->recvByteSeq();
                }
            });

        MeasureBitRate("echoByteSeq (2-way)", n, numBytes*2, 
            [proxyTwoWay,nr,byteArr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->echoByteSeq(byteArr);
                }
            });

        //////////////////////////// 50000 structs

        cout << "\n50000 structs with two ints and a double" << endl;

        numBytes = nr*FixedSeqSize*16;

        MeasureBitRate("sendFixedSeq (1-way)", n, numBytes, 
            [proxyOneWay,nr,&fixedSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyOneWay->sendFixedSeq(fixedSeq);
                }
            });

        MeasureBitRate("sendFixedSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr,&fixedSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->sendFixedSeq(fixedSeq);
                }
            });

        MeasureBitRate("recvFixedSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->recvFixedSeq();
                }
            });

        MeasureBitRate("echoFixedSeq (2-way)", n, numBytes*2, 
            [proxyTwoWay,nr,&fixedSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->echoFixedSeq(fixedSeq);
                }
            });

        //////////////////////////// 50000 strings

        cout << "\n50000 strings" << endl;

        numBytes =  nr*StringSeqSize*static_cast<int>(stringSeq[0].size());

        MeasureBitRate("sendStringSeq (1-way)", n, numBytes, 
            [proxyOneWay,nr,&stringViewSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyOneWay->sendStringSeq(stringViewSeq);
                }
            });

        MeasureBitRate("sendStringSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr,&stringViewSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->sendStringSeq(stringViewSeq);
                }
            });

        MeasureBitRate("recvStringSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->recvStringSeq();
                }
            });

        MeasureBitRate("echoStringSeq (2-way)", n, numBytes*2, 
            [proxyTwoWay,nr,&stringViewSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->echoStringSeq(stringViewSeq);
                }
            });

        //////////////////////////// 50000 string + double structs

        cout << "\n50000 structs with string + double" << endl;

        numBytes =  nr*StringDoubleSeqSize*(static_cast<int>(structSeq[0].s.size()) + 8);

        MeasureBitRate("sendStructSeq (1-way)", n, numBytes, 
            [proxyOneWay,nr,&structSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyOneWay->sendStructSeq(structSeq);
                }
            });

        MeasureBitRate("sendStructSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr,&structSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->sendStructSeq(structSeq);
                }
            });

        MeasureBitRate("recvStructSeq (2-way)", n, numBytes, 
            [proxyTwoWay,nr] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->recvStructSeq();
                }
            });

        MeasureBitRate("echoStructSeq (2-way)", n, numBytes*2, 
            [proxyTwoWay,nr,&structSeq] 
            {
                for(int i = 0; i < nr; ++i)
                {
                    proxyTwoWay->echoStructSeq(structSeq);
                }
            });
    }
    catch(const Ice::Exception& ex)
    {
        cerr << ex << endl;
    }

    return EXIT_SUCCESS;
}