Demo2.cpp

// Demo2.cpp

#include "Ceda/cxLss/ILogStructuredStore.h"
#include "Ceda/cxLss/LssSettings.h"
#include "Ceda/cxLss/LssExceptions.h"
#include "Ceda/cxUtils/FileTools.h"
#include "Ceda/cxUtils/Environ.h"
#include "Ceda/cxUtils/xchar.h"
#include "Ceda/cxUtils/AutoCloser.h"
#include "Ceda/cxUtils/FileException.h"
#include "Ceda/cxUtils/Tracer.h"
#include <assert.h>
#include <time.h>
#include <iostream>
#include <vector>

/*
Perform the following steps

    1.  Open a store with the given path (provided on the command line)

    2.  Write 'num' serial elements, each 9k in size to the store

    3.  Read all the serial elements

    4.  Read then rewrite each serial element

    5.  Read then rewrite each serial element, now of size 10k

Note that this example is not realistic in that it doesn't bother to catch exceptions thrown
by the LSS.
*/

//const int DEFAULT_BUF_SIZE = 32 * 1024;
const int DEFAULT_BUF_SIZE = 32;

///////////////////////////////////////////////////////////////////////////////////////////////////
// PersistStore

class PersistStore
{
public:
    // Ctor opens the store
    PersistStore(ceda::ConstStringZ lssPath, ceda::ConstStringZ deltaFileDir, int numElements);
    
    // Closes the store
    ~PersistStore();

    // The four steps to be performed
    void Create();
    void Read();
    void ReadAndWrite();
    void ReadAndExtend();

private:
    // Write the given buffer to the serial element with the given Seid
    void WriteBuffer(ceda::Seid seid, const std::vector<ceda::octet_t>& buffer);

    // Read the serial element with the given Seid into the given buffer
    void ReadBuffer(ceda::Seid seid, std::vector<ceda::octet_t>& buffer);

private:
    ceda::ILogStructuredStore* lss_;
    int m_numElements;
    int m_bufferSize;
    ceda::SeidHigh m_seidHigh;
    std::vector<ceda::Seid> m_seids;
};

PersistStore::PersistStore(ceda::ConstStringZ lssPath, ceda::ConstStringZ deltaFileDir, int numElements) :
    m_numElements(numElements),
    m_bufferSize(DEFAULT_BUF_SIZE)
{
    bool createdNew;
    ceda::LssSettings settings;  // Use default settings
    lss_ = ceda::CreateOrOpenLSS(lssPath, deltaFileDir, createdNew, ceda::OM_OPEN_ALWAYS, settings);

    // Create a 32 bit Seid space for all Seid allocations
    m_seidHigh = lss_->CreateSeidSpace();
}

PersistStore::~PersistStore()
{
    lss_->Close();
    lss_ = NULL;
}

void PersistStore::WriteBuffer(ceda::Seid seid, const std::vector<ceda::octet_t>& buffer)
{
    // A serial element can only be written within a transaction
    ceda::close_ptr<ceda::ILssTransaction> txn(lss_->OpenTransaction());

    ceda::close_ptr<ceda::ICloseableOutputStream> os(txn->WriteSerialElement(seid));
    os->WriteStream(&buffer[0], buffer.size());
}

void PersistStore::ReadBuffer(ceda::Seid seid, std::vector<ceda::octet_t>& buffer)
{
    ceda::close_ptr<ceda::ICloseableInputStream> is(lss_->ReadSerialElement(seid));

    ceda::ssize_t numBytesRead = is->ReadStream(&buffer[0], buffer.size());
    assert(numBytesRead == buffer.size());
}

void PersistStore::Create()
{
    std::vector<ceda::octet_t> buffer(m_bufferSize);
    
    for (int i=0 ; i < m_numElements ; ++i)
    {
        ceda::Seid seid = lss_->AllocateSeid(m_seidHigh);
        WriteBuffer(seid, buffer);
        m_seids.push_back(seid);
    }    
}

void PersistStore::Read()
{
    std::vector<ceda::octet_t> buffer(m_bufferSize);
    for (int i=0 ; i < m_numElements ; ++i)
    {
        ReadBuffer(m_seids[i], buffer);
    }
}

void PersistStore::ReadAndWrite()
{
    std::vector<ceda::octet_t> buffer(m_bufferSize);
    for (int i=0 ; i < m_numElements ; ++i)
    {
        ReadBuffer(m_seids[i], buffer);
        WriteBuffer(m_seids[i], buffer);
    }
}

void PersistStore::ReadAndExtend()
{
    std::vector<ceda::octet_t> bufferR(m_bufferSize);
    m_bufferSize += 1024;
    std::vector<ceda::octet_t> bufferW(m_bufferSize);
    for (int i=0 ; i < m_numElements ; ++i)
    {
        ReadBuffer(m_seids[i], bufferR);
        WriteBuffer(m_seids[i], bufferW);
    }
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Timer

class Timer
{
public:
    Timer()
    {
        m_startTime = clock();
    }

    double GetElapsedTimeInSec()
    {
        return (double) (clock() - m_startTime) / CLOCKS_PER_SEC;
    }

private:
    clock_t m_startTime;
};

///////////////////////////////////////////////////////////////////////////////////////////////////
void WriteValue(ceda::xostream& os, ceda::int64 value)
{
    const ceda::int64 k = 1024;
    const ceda::int64 M = k*k;
    const ceda::int64 G = k*k*k;
    const ceda::int64 T = k*k*k*k;
    
    if (value < k)
    {
        os << (int) value;    
    }
    else if (value < M)
    {
        os << ((double)value / k) << "k";
    }
    else if (value < G)
    {
        os << ((double)value / M) << "M";
    }
    else if (value < T)
    {
        os << ((double)value / G) << "G";
    }
    else
    {
        os << ((double)value / T) << "T";
    }
}

void RunTest(int numElements, PersistStore& store)
{
	for( int i = 0; i < 1; ++i )
	{
		Timer timer;
		store.Create();
		double f = timer.GetElapsedTimeInSec();
		Tracer() 
		    << "  Create " << numElements << " objects of size " << DEFAULT_BUF_SIZE 
            << " : " << f << " sec" 
            << " --> " << (numElements*DEFAULT_BUF_SIZE)/(f*1024*1024) << " MB/sec "
            << numElements/f << " objects/sec"
            << ceda::endl;
	}
}

bool Demo2()
{
    try
    {
        ceda::xstring path = ceda::GetCedaTestPath("exlss_demo2.lss");

	    // example for writing directly to a volume...
        //ConstStringZ path = "\\\\.\\Z:";

        //int numElements = 10000000;
        int numElements = 100000;

        if (ceda::DeleteFile_u8(path.c_str()))
        {
            Tracer() << "Deleting file " << path << ceda::endl;
        }

        Tracer() 
                  << "Path = " << path << '\n'
                  << "Num serial elements = " << numElements << '\n'
                  << "Serial element size = " << DEFAULT_BUF_SIZE << '\n'
                  << "Total size = "; 
        WriteValue(Tracer(), (ceda::int64)numElements * DEFAULT_BUF_SIZE); 
        Tracer() << ceda::endl;
    
        PersistStore store(path.c_str(), NULL, numElements);
        RunTest(numElements,store);

        /*
        {
            Timer timer;
            store.Read();
            Tracer() << "  Read " << numElements << " objects :  " << timer.GetElapsedTimeInSec() << " sec" << ceda::endl;
        }

        {
            Timer timer;
            store.ReadAndWrite();
            Tracer() << "  Read and write  :  " << timer.GetElapsedTimeInSec() << " sec" << ceda::endl;
        }

        {
            Timer timer;
            store.ReadAndExtend();
            Tracer() << "  Read and extend  :  " << timer.GetElapsedTimeInSec() << " sec" << ceda::endl;
        }
        */

        Tracer() << "Test completed" << ceda::endl;
    	return true;
    }
    catch(ceda::ProgrammingErrorException& e)
    {
        Tracer() << "Programming error exception : " << e << ceda::endl;
        return false;
    }
    catch(ceda::FileException& e)
    {
        Tracer() << "File Exception : " << e << ceda::endl;
        return false;
    }
    catch(ceda::CorruptLSSException& e)
    {
        Tracer() << "Corruption : " << e << ceda::endl;
        return false;
    }
}