Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 919932 Details for
Bug 1089652
[RFE]: Configuration option for linear store to delete the used journal files instead of recycling them.
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
[patch]
Patch proposal (incomplete)
bz1089652.patch (text/plain), 27.21 KB, created by
Pavel Moravec
on 2014-07-22 13:32:31 UTC
(
hide
)
Description:
Patch proposal (incomplete)
Filename:
MIME Type:
Creator:
Pavel Moravec
Created:
2014-07-22 13:32:31 UTC
Size:
27.21 KB
patch
obsolete
>Index: cpp/src/qpid/linearstore/JournalImpl.h >=================================================================== >--- cpp/src/qpid/linearstore/JournalImpl.h (revision 1612523) >+++ cpp/src/qpid/linearstore/JournalImpl.h (working copy) >@@ -111,17 +111,20 @@ > void initialize(::qpid::linearstore::journal::EmptyFilePool* efp, > const uint16_t wcache_num_pages, > const uint32_t wcache_pgsize_sblks, >- ::qpid::linearstore::journal::aio_callback* const cbp); >+ ::qpid::linearstore::journal::aio_callback* const cbp, >+ bool overwriteBeforeEfpReturn); > > inline void initialize(::qpid::linearstore::journal::EmptyFilePool* efpp, > const uint16_t wcache_num_pages, >- const uint32_t wcache_pgsize_sblks) { >- initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this); >+ const uint32_t wcache_pgsize_sblks, >+ bool overwriteBeforeEfpReturn) { >+ initialize(efpp, wcache_num_pages, wcache_pgsize_sblks, this, overwriteBeforeEfpReturn); > } > > void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm, > const uint16_t wcache_num_pages, > const uint32_t wcache_pgsize_sblks, >+ bool overwriteBeforeEfpReturn, > ::qpid::linearstore::journal::aio_callback* const cbp, > boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, > uint64_t& highest_rid, >@@ -130,10 +133,11 @@ > inline void recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm, > const uint16_t wcache_num_pages, > const uint32_t wcache_pgsize_sblks, >+ bool overwriteBeforeEfpReturn, > boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, > uint64_t& highest_rid, > uint64_t queue_id) { >- recover(efpm, wcache_num_pages, wcache_pgsize_sblks, this, prep_tx_list_ptr, highest_rid, queue_id); >+ recover(efpm, wcache_num_pages, wcache_pgsize_sblks, overwriteBeforeEfpReturn, this, prep_tx_list_ptr, highest_rid, queue_id); > } > > void recover_complete(); >Index: cpp/src/qpid/linearstore/MessageStoreImpl.cpp >=================================================================== >--- cpp/src/qpid/linearstore/MessageStoreImpl.cpp (revision 1612523) >+++ cpp/src/qpid/linearstore/MessageStoreImpl.cpp (working copy) >@@ -172,9 +172,10 @@ > qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); > uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); > uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); >+ bool overwriteBeforeEfpReturn = opts->overwriteBeforeEfpReturn; > > // Pass option values to init() >- return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib); >+ return init(opts->storeDir, efpPartition, efpFilePoolSize_kib, opts->truncateFlag, jrnlWrCachePageSizeKib, tplJrnlWrCachePageSizeKib, overwriteBeforeEfpReturn); > } > > // These params, taken from options, are assumed to be correct and verified >@@ -183,7 +184,8 @@ > qpid::linearstore::journal::efpDataSize_kib_t efpFileSize_kib_, > const bool truncateFlag_, > uint32_t wCachePageSizeKib_, >- uint32_t tplWCachePageSizeKib_) >+ uint32_t tplWCachePageSizeKib_, >+ bool overwriteBeforeEfpReturn_) > { > if (isInit) return true; > >@@ -194,6 +196,7 @@ > wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib_); > tplWCachePgSizeSblks = tplWCachePageSizeKib_ / QLS_SBLK_SIZE_KIB; // convert from KiB to number sblks > tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib_); >+ overwriteBeforeEfpReturn = overwriteBeforeEfpReturn_; > if (storeDir_.size()>0) storeDir = storeDir_; > > if (truncateFlag_) >@@ -347,7 +350,7 @@ > qpid::sys::Mutex::ScopedLock sl(tplInitLock); > if (!tplStorePtr->is_ready()) { > qpid::linearstore::journal::jdir::create_dir(getTplBaseDir()); >- tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); >+ tplStorePtr->initialize(getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks, false); > if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); > } > } >@@ -415,8 +418,14 @@ > } > > queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); >+ bool localOverwriteBeforeEfpReturn = false; >+ qpid::framing::FieldTable::ValuePtr value = args_.get("qpid.overwrite_before_efp_return"); //TODO: can be unified with QueueSettings.cpp ? >+ if (!value->empty() && value->convertsTo<std::string>()) { >+ std::string s = value->get<std::string>(); >+ localOverwriteBeforeEfpReturn = ((s=="0") || (strcmp(s.c_str(),"true") == 0)); >+ } > try { >- jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks); >+ jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks, localOverwriteBeforeEfpReturn); > } catch (const qpid::linearstore::journal::jexception& e) { > THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what()); > } >@@ -736,7 +745,7 @@ > long rcnt = 0L; // recovered msg count > long idcnt = 0L; // in-doubt msg count > uint64_t thisHighestRid = 0ULL; >- jQueue->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); >+ jQueue->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, overwriteBeforeEfpReturn, &prepared, thisHighestRid, key.id); > > // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting > // from recovery of a store that has had its size changed externally by the resize utility. >@@ -1048,7 +1057,7 @@ > { > if (qpid::linearstore::journal::jdir::exists(tplStorePtr->jrnl_dir())) { > uint64_t thisHighestRid = 0ULL; >- tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0); >+ tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, false, 0, thisHighestRid, 0); > if (highestRid == 0ULL) > highestRid = thisHighestRid; > else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit >@@ -1522,7 +1531,8 @@ > wCachePageSizeKib(defWCachePageSizeKib), > tplWCachePageSizeKib(defTplWCachePageSizeKib), > efpPartition(defEfpPartition), >- efpFileSizeKib(defEfpFileSizeKib) >+ efpFileSizeKib(defEfpFileSizeKib), >+ overwriteBeforeEfpReturn(defOverwriteBeforeEfpReturn) > { > addOptions() > ("store-dir", qpid::optValue(storeDir, "DIR"), >Index: cpp/src/qpid/linearstore/management-schema.xml >=================================================================== >--- cpp/src/qpid/linearstore/management-schema.xml (revision 1612523) >+++ cpp/src/qpid/linearstore/management-schema.xml (working copy) >@@ -53,7 +53,7 @@ > <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/--> > <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/--> > <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/--> >- >+ <property name="overwriteBeforeEfpReturn" type="bool" access="RO" desc="Overwrite files before returning back to EFP?"/> > <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/> > <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/> > <statistic name="dequeues" type="count64" unit="record" desc="Total dequeued records on journal"/> >Index: cpp/src/qpid/linearstore/MessageStoreImpl.h >=================================================================== >--- cpp/src/qpid/linearstore/MessageStoreImpl.h (revision 1612523) >+++ cpp/src/qpid/linearstore/MessageStoreImpl.h (working copy) >@@ -82,6 +82,7 @@ > uint32_t tplWCachePageSizeKib; > uint16_t efpPartition; > uint64_t efpFileSizeKib; >+ bool overwriteBeforeEfpReturn; > }; > > protected: >@@ -101,6 +102,7 @@ > static const uint32_t defTplWCachePageSizeKib = defWCachePageSizeKib / 8; > static const uint16_t defEfpPartition = 1; > static const uint64_t defEfpFileSizeKib = 512 * QLS_SBLK_SIZE_KIB; >+ static const bool defOverwriteBeforeEfpReturn = false; > static const std::string storeTopLevelDir; > > static qpid::sys::Duration defJournalGetEventsTimeout; >@@ -135,6 +137,7 @@ > uint32_t tplWCachePgSizeSblks; > uint16_t tplWCacheNumPages; > uint64_t highestRid; >+ bool overwriteBeforeEfpReturn; > bool isInit; > const char* envPath; > qpid::broker::Broker* broker; >@@ -252,7 +255,8 @@ > qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib, > const bool truncateFlag = false, > uint32_t wCachePageSize = defWCachePageSizeKib, >- uint32_t tplWCachePageSize = defTplWCachePageSizeKib); >+ uint32_t tplWCachePageSize = defTplWCachePageSizeKib, >+ bool overwriteBeforeEfpReturn = defOverwriteBeforeEfpReturn); > > void truncateInit(); > >Index: cpp/src/qpid/linearstore/journal/RecoveryManager.h >=================================================================== >--- cpp/src/qpid/linearstore/journal/RecoveryManager.h (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/RecoveryManager.h (working copy) >@@ -109,7 +109,8 @@ > > void analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr, > EmptyFilePoolManager* emptyFilePoolManager, >- EmptyFilePool** emptyFilePoolPtrPtr); >+ EmptyFilePool** emptyFilePoolPtrPtr, >+ bool overwriteBeforeEfpReturn); > std::streamoff getEndOffset() const; > uint64_t getHighestFileNumber() const; > uint64_t getHighestRecordId() const; >@@ -145,7 +146,7 @@ > void prepareRecordList(); > bool readFileHeader(); > void readJournalData(char* target, const std::streamsize size); >- void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr); >+ void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr, bool overwriteBeforeEfpReturn); > > static bool readJournalFileHeader(const std::string& journalFileName, > ::file_hdr_t& fileHeaderRef, >Index: cpp/src/qpid/linearstore/journal/LinearFileController.cpp >=================================================================== >--- cpp/src/qpid/linearstore/journal/LinearFileController.cpp (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/LinearFileController.cpp (working copy) >@@ -42,10 +42,12 @@ > > void LinearFileController::initialize(const std::string& journalDirectory, > EmptyFilePool* emptyFilePoolPtr, >- uint64_t initialFileNumberVal) { >+ uint64_t initialFileNumberVal, >+ bool overwriteBeforeEfpReturn) { > journalDirectory_.assign(journalDirectory); > emptyFilePoolPtr_ = emptyFilePoolPtr; > fileSeqCounter_.set(initialFileNumberVal); >+ overwriteBeforeEfpReturn_ = overwriteBeforeEfpReturn; > } > > void LinearFileController::finalize() { >@@ -97,7 +99,7 @@ > > void LinearFileController::removeFileToEfp(const std::string& fileName) { > if (emptyFilePoolPtr_) { >- emptyFilePoolPtr_->returnEmptyFile(fileName); >+ emptyFilePoolPtr_->returnEmptyFile(fileName, overwriteBeforeEfpReturn_); > } > } > >@@ -109,7 +111,7 @@ > void LinearFileController::purgeEmptyFilesToEfp() { > slock l(journalFileListMutex_); > while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records >- emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); >+ emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName(), overwriteBeforeEfpReturn_); > delete journalFileList_.front(); > journalFileList_.pop_front(); > } >Index: cpp/src/qpid/linearstore/journal/jcntl.h >=================================================================== >--- cpp/src/qpid/linearstore/journal/jcntl.h (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/jcntl.h (working copy) >@@ -165,7 +165,8 @@ > void initialize(EmptyFilePool* efpp, > const uint16_t wcache_num_pages, > const uint32_t wcache_pgsize_sblks, >- aio_callback* const cbp); >+ aio_callback* const cbp, >+ bool overwriteBeforeEfpReturn); > > /** > * /brief Initialize journal by recovering state from previously written journal. >@@ -205,7 +206,8 @@ > const uint32_t wcache_pgsize_sblks, > aio_callback* const cbp, > const std::vector<std::string>* prep_txn_list_ptr, >- uint64_t& highest_rid); >+ uint64_t& highest_rid, >+ bool overwriteBeforeEfpReturn); > > /** > * \brief Notification to the journal that recovery is complete and that normal operation >Index: cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp >=================================================================== >--- cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp (working copy) >@@ -120,7 +120,7 @@ > return newFileName; > } > >-void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) { >+void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile, bool overwriteBeforeEfpReturn) { > std::string emptyFileName(efpDirectory_ + fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/' > if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) { > // Try again with new UUID for file name >@@ -131,8 +131,12 @@ > return; > } > } >- resetEmptyFileHeader(emptyFileName); >- pushEmptyFile(emptyFileName); >+ if (overwriteBeforeEfpReturn) >+ createEmptyFile(emptyFileName); >+ else { >+ resetEmptyFileHeader(emptyFileName); >+ pushEmptyFile(emptyFileName); >+ } > } > > //static >@@ -170,10 +174,11 @@ > > // --- protected functions --- > >-void EmptyFilePool::createEmptyFile() { >+void EmptyFilePool::createEmptyFile(std::string efpfn) { > ::file_hdr_t fh; > ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_); >- std::string efpfn = getEfpFileName(); >+ if (efpfn.size() == 0) >+ efpfn = getEfpFileName(); > std::ofstream ofs(efpfn.c_str(), std::ofstream::out | std::ofstream::binary); > if (ofs.good()) { > ofs.write((char*)&fh, sizeof(::file_hdr_t)); >Index: cpp/src/qpid/linearstore/journal/RecoveryManager.cpp >=================================================================== >--- cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (working copy) >@@ -95,7 +95,8 @@ > > void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr, > EmptyFilePoolManager* emptyFilePoolManager, >- EmptyFilePool** emptyFilePoolPtrPtr) { >+ EmptyFilePool** emptyFilePoolPtrPtr, >+ bool overwriteBeforeEfpReturn) { > // Analyze file headers of existing journal files > efpIdentity_t efpIdentity; > analyzeJournalFileHeaders(efpIdentity); >@@ -129,7 +130,7 @@ > lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; > > // Remove leading files which have no enqueued records >- removeEmptyFiles(*emptyFilePoolPtrPtr); >+ removeEmptyFiles(*emptyFilePoolPtrPtr, overwriteBeforeEfpReturn); > > // Remove all txns from tmap that are not in the prepared list > if (preparedTransactionListPtr) { >@@ -933,10 +934,10 @@ > return true; > } > >-void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { >+void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr, bool overwriteBeforeEfpReturn) { > while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { > RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second; >- emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName()); >+ emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName(), overwriteBeforeEfpReturn); > delete rfdp->journalFilePtr_; > delete rfdp; > fileNumberMap_.erase(fileNumberMap_.begin()->first); >Index: cpp/src/qpid/linearstore/journal/jcntl.cpp >=================================================================== >--- cpp/src/qpid/linearstore/journal/jcntl.cpp (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/jcntl.cpp (working copy) >@@ -79,7 +79,8 @@ > jcntl::initialize(EmptyFilePool* efpp, > const uint16_t wcache_num_pages, > const uint32_t wcache_pgsize_sblks, >- aio_callback* const cbp) >+ aio_callback* const cbp, >+ bool overwriteBeforeEfpReturn) > { > _init_flag = false; > _stop_flag = false; >@@ -90,7 +91,7 @@ > > _linearFileController.finalize(); > _jdir.clear_dir(); // Clear any existing journal files >- _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL); >+ _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL, overwriteBeforeEfpReturn); > _linearFileController.getNextJournalFile(); > _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, 0); > _init_flag = true; >@@ -102,7 +103,8 @@ > const uint32_t wcache_pgsize_sblks, > aio_callback* const cbp, > const std::vector<std::string>* prep_txn_list_ptr, >- uint64_t& highest_rid) >+ uint64_t& highest_rid, >+ bool overwriteBeforeEfpReturn) > { > _init_flag = false; > _stop_flag = false; >@@ -115,12 +117,12 @@ > > // Verify journal dir and journal files > _jdir.verify_dir(); >- _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr); >+ _recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr, overwriteBeforeEfpReturn); > assert(_emptyFilePoolPtr != 0); > > highest_rid = _recoveryManager.getHighestRecordId(); > _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5)); >- _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber()); >+ _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber(), overwriteBeforeEfpReturn); > _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController); > if (_recoveryManager.isLastFileFull()) { > _linearFileController.getNextJournalFile(); >Index: cpp/src/qpid/linearstore/journal/LinearFileController.h >=================================================================== >--- cpp/src/qpid/linearstore/journal/LinearFileController.h (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/LinearFileController.h (working copy) >@@ -47,6 +47,7 @@ > AtomicCounter<uint64_t> fileSeqCounter_; > AtomicCounter<uint64_t> recordIdCounter_; > AtomicCounter<uint64_t> decrCounter_; >+ bool overwriteBeforeEfpReturn_; > > JournalFileList_t journalFileList_; > JournalFile* currentJournalFilePtr_; >@@ -58,7 +59,8 @@ > > void initialize(const std::string& journalDirectory, > EmptyFilePool* emptyFilePoolPtr, >- uint64_t initialFileNumberVal); >+ uint64_t initialFileNumberVal, >+ bool overwriteBeforeEfpReturn); > void finalize(); > > void addJournalFile(JournalFile* journalFilePtr, >Index: cpp/src/qpid/linearstore/journal/EmptyFilePool.h >=================================================================== >--- cpp/src/qpid/linearstore/journal/EmptyFilePool.h (revision 1612523) >+++ cpp/src/qpid/linearstore/journal/EmptyFilePool.h (working copy) >@@ -73,14 +73,14 @@ > const efpIdentity_t getIdentity() const; > > std::string takeEmptyFile(const std::string& destDirectory); >- void returnEmptyFile(const std::string& srcFile); >+ void returnEmptyFile(const std::string& srcFile, bool overwriteBeforeEfpReturn = false); > > static std::string dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib); > static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& dirName, > const efpPartitionNumber_t partitionNumber); > > protected: >- void createEmptyFile(); >+ void createEmptyFile(std::string efpfn = ""); > std::string getEfpFileName(); > std::string popEmptyFile(); > void pushEmptyFile(const std::string fqFileName); >Index: cpp/src/qpid/linearstore/JournalImpl.cpp >=================================================================== >--- cpp/src/qpid/linearstore/JournalImpl.cpp (revision 1612523) >+++ cpp/src/qpid/linearstore/JournalImpl.cpp (working copy) >@@ -133,7 +133,8 @@ > JournalImpl::initialize(::qpid::linearstore::journal::EmptyFilePool* efpp_, > const uint16_t wcache_num_pages, > const uint32_t wcache_pgsize_sblks, >- ::qpid::linearstore::journal::aio_callback* const cbp) >+ ::qpid::linearstore::journal::aio_callback* const cbp, >+ bool overwriteBeforeEfpReturn) > { > // efpp->createJournal(_jdir); > // QLS_LOG2(notice, _jid, "Initialized"); >@@ -144,16 +145,17 @@ > // oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; > // oss << " wcache_num_pages=" << wcache_num_pages; > // QLS_LOG2(debug, _jid, oss.str()); >- jcntl::initialize(efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp); >+ jcntl::initialize(efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp, overwriteBeforeEfpReturn); > // QLS_LOG2(debug, _jid, "Initialization complete"); > // TODO: replace for linearstore: _lpmgr >-/* >- if (_mgmtObject.get() != 0) >+ >+/* if (_mgmtObject.get() != 0) > { > _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); > _mgmtObject->set_autoExpand(_lpmgr.is_ae()); > _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); > _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); >+ _mgmtObject->set_overwriteBeforeEfpReturn(_lpmgr.getOverwriteBeforeEfpReturn()); // TODO PavelM > _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE); > _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE); > _mgmtObject->set_writePages(wcache_num_pages); >@@ -168,6 +170,7 @@ > JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm, > const uint16_t wcache_num_pages, > const uint32_t wcache_pgsize_sblks, >+ bool overwriteBeforeEfpReturn, > ::qpid::linearstore::journal::aio_callback* const cbp, > boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, > uint64_t& highest_rid, >@@ -187,6 +190,7 @@ > _mgmtObject->set_autoExpand(_lpmgr.is_ae()); > _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); > _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); >+ _mgmtObject->set_overwriteBeforeEfpReturn(_lpmgr.getOverwriteBeforeEfpReturn()); // TODO PavelM > _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE); > _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE); > _mgmtObject->set_writePages(wcache_num_pages); >@@ -201,9 +205,9 @@ > prep_xid_list.push_back(i->xid); > } > >- jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid); >+ jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid, overwriteBeforeEfpReturn); > } else { >- jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid); >+ jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid, overwriteBeforeEfpReturn); > } > > // Populate PreparedTransaction lists from _tmap >Index: cpp/src/qpid/broker/QueueSettings.h >=================================================================== >--- cpp/src/qpid/broker/QueueSettings.h (revision 1612523) >+++ cpp/src/qpid/broker/QueueSettings.h (working copy) >@@ -96,6 +96,9 @@ > uint64_t maxFileSize; > uint64_t maxFileCount; > >+ //rewrite journal file before returning it to EFP? (relevant to linearstore) >+ bool overwriteBeforeEfpReturn; >+ > std::string sequenceKey; > // store bool to avoid testing string value > bool sequencing; >Index: cpp/src/qpid/broker/QueueSettings.cpp >=================================================================== >--- cpp/src/qpid/broker/QueueSettings.cpp (revision 1612523) >+++ cpp/src/qpid/broker/QueueSettings.cpp (working copy) >@@ -70,6 +70,8 @@ > > const std::string SEQUENCING("qpid.queue_msg_sequence"); > >+const std::string OVERWRITE_BEFORE_EFP_RETURN("qpid.overwrite_before_efp_return"); >+ > bool handleFairshareSetting(const std::string& basename, const std::string& key, const qpid::types::Variant& value, QueueSettings& settings) > { > if (key.find(basename) == 0) { >@@ -108,6 +110,7 @@ > alertRepeatInterval(60), > maxFileSize(0), > maxFileCount(0), >+ overwriteBeforeEfpReturn(false), > sequencing(false) > {} > >@@ -209,6 +212,9 @@ > } else if (key == MAX_FILE_SIZE && value.asUint64() > 0) { > maxFileSize = value.asUint64(); > return false; // 'handle' here and also pass to store >+ } else if (key == OVERWRITE_BEFORE_EFP_RETURN) { >+ overwriteBeforeEfpReturn = value; >+ return false; // 'handle' here and also pass to store > } else if (key == PAGING) { > paging = value; > return true;
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 1089652
: 919932