/* * Copyright (C) 2008-2010 Trinity * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef _DATABASEWORKERPOOL_H #define _DATABASEWORKERPOOL_H #include #include #include "Common.h" #include "Callback.h" #include "MySQLConnection.h" #include "Transaction.h" #include "DatabaseWorker.h" #include "PreparedStatement.h" #include "Log.h" #include "QueryResult.h" #include "QueryHolder.h" #include "AdhocStatement.h" enum MySQLThreadBundle { MYSQL_BUNDLE_NONE = 0x00, //- Each task will run their own MySQL connection MYSQL_BUNDLE_UNUSED = 0x01, //- Temp unused MYSQL_BUNDLE_RA = 0x02, //- Remote admin thread MYSQL_BUNDLE_RAR = 0x04, //- Reactor runnable thread MYSQL_BUNDLE_WORLD = 0x08, //- WorldRunnable MYSQL_BUNDLE_ALL = MYSQL_BUNDLE_RA | MYSQL_BUNDLE_RAR | MYSQL_BUNDLE_WORLD, }; class PingOperation : public SQLOperation { /// Operation for idle delaythreads bool Execute() { m_conn->Ping(); return true; } }; template class DatabaseWorkerPool { private: typedef UNORDERED_MAP ConnectionMap; typedef ACE_Atomic_Op AtomicUInt; public: DatabaseWorkerPool() : m_queue(new ACE_Activation_Queue(new ACE_Message_Queue)), m_connections(0) { m_infoString = ""; mysql_library_init(-1, NULL, NULL); WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); } ~DatabaseWorkerPool() { mysql_library_end(); } bool Open(const std::string& infoString, uint8 num_threads, MySQLThreadBundle mask) { //- Only created bundled connection if configured m_bundle_conn = NULL; if (mask != MYSQL_BUNDLE_NONE) { sLog.outSQLDriver("Creating bundled/master MySQL connection."); m_bundle_conn = new T(); m_bundle_conn->Open(infoString); ++m_connections; } m_async_connections.resize(num_threads); /// Open the Async pool for (uint8 i = 0; i < num_threads; i++) { m_async_connections[i] = new T(m_queue); m_async_connections[i]->Open(infoString); ++m_connections; sLog.outSQLDriver("Async database thread pool opened. Worker thread count: %u", num_threads); } m_infoString = infoString; return true; } void Close() { sLog.outSQLDriver("Closing down %u connections on this DatabaseWorkerPool", (uint32)m_connections.value()); /// Shuts down worker threads for this connection pool. m_queue->queue()->deactivate(); for (uint8 i = 0; i < m_async_connections.size(); i++) { m_async_connections[i]->m_worker->wait(); --m_connections; } if (m_bundleMask != MYSQL_BUNDLE_NONE) { delete m_bundle_conn; m_bundle_conn = NULL; --m_connections; sLog.outSQLDriver("Closed bundled connection."); } //- MySQL::Thread_End() should be called manually from the aborting calling threads } void Init_MySQL_Connection() { T* conn = new T(); conn->Open(m_infoString); // no idea why it doesn't accept sLog here { ACE_Guard guard(m_connectionMap_mtx); ConnectionMap::const_iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); #ifdef _DEBUG if (itr != m_sync_connections.end()) ACE_Singleton::instance()->outSQLDriver("Thread ["UI64FMTD"] already started a MySQL connection", (uint64)ACE_Based::Thread::currentId()); #endif m_sync_connections[ACE_Based::Thread::current()] = conn; } ACE_Singleton::instance()->outSQLDriver("Core thread with ID ["UI64FMTD"] initializing MySQL connection.", (uint64)ACE_Based::Thread::currentId()); ++m_connections; } void End_MySQL_Connection() { MySQLConnection* conn; { ACE_Guard guard(m_connectionMap_mtx); ConnectionMap::iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); #ifdef _DEBUG if (itr == m_sync_connections.end()) sLog.outSQLDriver("Thread ["UI64FMTD"] already shut down their MySQL connection.", (uint64)ACE_Based::Thread::currentId()); #endif conn = itr->second; m_sync_connections.erase(itr); } delete conn; conn = NULL; --m_connections; } void Execute(const char* sql) { if (!sql) return; BasicStatementTask* task = new BasicStatementTask(sql); Enqueue(task); } void PExecute(const char* sql, ...) { if (!sql) return; va_list ap; char szQuery[MAX_QUERY_LEN]; va_start(ap, sql); vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); va_end(ap); Execute(szQuery); } void DirectExecute(const char* sql) { if (sql) GetConnection()->Execute(sql); } void DirectPExecute(const char* sql, ...) { if (!sql) return; va_list ap; char szQuery[MAX_QUERY_LEN]; va_start(ap, sql); vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); va_end(ap); return DirectExecute(szQuery); } QueryResult Query(const char* sql) { ResultSet* result = GetConnection()->Query(sql); if (!result || !result->GetRowCount()) return QueryResult(NULL); result->NextRow(); return QueryResult(result); } QueryResult PQuery(const char* sql, ...) { if (!sql) return QueryResult(NULL); va_list ap; char szQuery[MAX_QUERY_LEN]; va_start(ap, sql); vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); va_end(ap); return Query(szQuery); } ACE_Future AsyncQuery(const char* sql) { QueryResultFuture res; BasicStatementTask* task = new BasicStatementTask(sql, res); Enqueue(task); return res; //! Actual return value has no use yet } ACE_Future AsyncPQuery(const char* sql, ...) { va_list ap; char szQuery[MAX_QUERY_LEN]; va_start(ap, sql); vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); va_end(ap); return AsyncQuery(szQuery); } QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) { QueryResultHolderFuture res; SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res); Enqueue(task); return res; //! Fool compiler, has no use yet } SQLTransaction BeginTransaction() { return SQLTransaction(new Transaction); } void CommitTransaction(SQLTransaction transaction) { #ifdef SQLQUERY_LOG if (transaction->GetSize() == 0) { sLog.outSQLDriver("Transaction contains 0 queries. Not executing."); return; } if (transaction->GetSize() == 1) { sLog.outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); } #endif Enqueue(new TransactionTask(transaction)); } PreparedStatement* GetPreparedStatement(uint32 index) { return new PreparedStatement(index); } void Execute(PreparedStatement* stmt) { PreparedStatementTask* task = new PreparedStatementTask(stmt); Enqueue(task); } void escape_string(std::string& str) { if (str.empty()) return; char* buf = new char[str.size()*2+1]; escape_string(buf,str.c_str(),str.size()); str = buf; delete[] buf; } MySQLThreadBundle GetBundleMask() { return m_bundleMask; } PreparedQueryResult Query(PreparedStatement* stmt) { PreparedResultSet* ret = GetConnection()->Query(stmt); if (!ret || !ret->GetRowCount()) return PreparedQueryResult(NULL); return PreparedQueryResult(ret); } void KeepAlive() { ConnectionMap::const_iterator itr; { /*! MapUpdate + unbundled threads */ ACE_Guard guard(m_connectionMap_mtx); itr = m_sync_connections.find(ACE_Based::Thread::current()); if (itr != m_sync_connections.end()) itr->second->Ping(); } if (m_bundle_conn) m_bundle_conn->Ping(); /// Assuming all worker threads are free, every worker thread will receive 1 ping operation request /// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter /// as the sole purpose is to prevent connections from idling. for (size_t i = 0; i < m_async_connections.size(); ++i) Enqueue(new PingOperation); } private: unsigned long escape_string(char *to, const char *from, unsigned long length) { if (!to || !from || !length) return 0; return (mysql_real_escape_string(GetConnection()->GetHandle(), to, from, length)); } void Enqueue(SQLOperation* op) { m_queue->enqueue(op); } MySQLConnection* GetConnection() { MySQLConnection* conn; ConnectionMap::const_iterator itr; { /*! MapUpdate + unbundled threads */ ACE_Guard guard(m_connectionMap_mtx); itr = m_sync_connections.find(ACE_Based::Thread::current()); if (itr != m_sync_connections.end()) return itr->second; } /*! Bundled threads */ conn = m_bundle_conn; ASSERT (conn); return conn; } private: ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads. std::vector m_async_connections; ConnectionMap m_sync_connections; //! Holds a mysql connection+thread per mapUpdate thread and unbundled runnnables. ACE_Thread_Mutex m_connectionMap_mtx; //! For thread safe access to the synchroneous connection map T* m_bundle_conn; //! Bundled connection (see Database.ThreadBundleMask config) AtomicUInt m_connections; //! Counter of MySQL connections; std::string m_infoString; //! Infostring that is passed on to child connections. MySQLThreadBundle m_bundleMask; //! Our configured bundle mask (see enum) }; #endif