Removed mtmaps OpenMP

Implement mtmaps based on ACE
For stable work MapUpdate.Threads=2 in config

--HG--
branch : trunk
This commit is contained in:
n0n4m3
2009-12-17 07:34:00 +01:00
parent 8507b7da99
commit 2028f0658b
16 changed files with 371 additions and 94 deletions
+2 -1
View File
@@ -150,6 +150,8 @@ SET(game_STAT_SRCS
MapInstanced.h
MapManager.cpp
MapManager.h
MapUpdater.cpp
MapUpdater.h
MapReference.h
MapRefManager.h
MiscHandler.cpp
@@ -294,6 +296,5 @@ SET(game_STAT_SRCS
OutdoorPvPImpl.h
ZoneScript.h
)
add_definitions(-fopenmp)
add_library(game STATIC ${game_STAT_SRCS})
ADD_DEPENDENCIES(game revision.h)
+6 -4
View File
@@ -706,10 +706,12 @@ void Map::RemoveUnitFromNotify(Unit *unit)
void Map::Update(const uint32 &t_diff)
{
/// update players at tick
for (m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter)
if (Player* plr = m_mapRefIter->getSource())
if (plr && plr->IsInWorld())
plr->Update(t_diff);
for(m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter)
{
Player* plr = m_mapRefIter->getSource();
if(plr && plr->IsInWorld())
plr->Update(t_diff);
}
m_notifyTimer.Update(t_diff);
if (m_notifyTimer.Passed())
+25 -17
View File
@@ -18,9 +18,6 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifdef MULTI_THREAD_MAP
#include <omp.h>
#endif
#include "MapManager.h"
#include "InstanceSaveMgr.h"
#include "Policies/SingletonImp.h"
@@ -32,6 +29,7 @@
#include "MapInstanced.h"
#include "InstanceData.h"
#include "DestinationHolderImp.h"
#include "Config/ConfigEnv.h"
#include "World.h"
#include "CellImpl.h"
#include "Corpse.h"
@@ -72,6 +70,12 @@ MapManager::Initialize()
i_GridStateErrorCount = 0;
}
#ifdef MULTI_THREAD_MAP
int num_threads(sWorld.getConfig(CONFIG_NUMTHREADS));
// Start mtmaps if needed.
if(num_threads > 0 && m_updater.activate(num_threads) == -1)
abort();
#endif
InitMaxInstanceId();
}
@@ -299,22 +303,17 @@ MapManager::Update(uint32 diff)
MapMapType::iterator iter = i_maps.begin();
#ifdef MULTI_THREAD_MAP
std::vector<Map*> update_queue(i_maps.size());
int omp_set_num_threads(sWorld.getConfig(CONFIG_NUMTHREADS));
for (uint32 i = 0; iter != i_maps.end(); ++iter, ++i)
update_queue[i] = iter->second;
/*
gomp in gcc <4.4 version cannot parallelise loops using random access iterators
so until gcc 4.4 isnt standard, we need the update_queue workaround
*/
#pragma omp parallel for schedule(dynamic) private(i) shared(update_queue)
for (uint32 i = 0; i < i_maps.size(); ++i)
for(; iter != i_maps.end(); ++iter)
{
checkAndCorrectGridStatesArray(); // debugging code, should be deleted some day
update_queue[i]->Update(i_timer.GetCurrent());
sWorld.RecordTimeDiff("UpdateMap %u", update_queue[i]->GetId());
//sLog.outError("This is thread %d out of %d threads,updating map %u",omp_get_thread_num(),omp_get_num_threads(),iter->second->GetId());
if (m_updater.activated())
m_updater.schedule_update(*iter->second, i_timer.GetCurrent());
else
{
iter->second->Update(i_timer.GetCurrent());
}
}
if (m_updater.activated())
m_updater.wait();
#else
for (; iter != i_maps.end(); ++iter)
{
@@ -377,6 +376,11 @@ void MapManager::UnloadAll()
delete i_maps.begin()->second;
i_maps.erase(i_maps.begin());
}
#ifdef MULTI_THREAD_MAP
if (m_updater.activated())
m_updater.deactivate();
#endif
}
void MapManager::InitMaxInstanceId()
@@ -393,6 +397,8 @@ void MapManager::InitMaxInstanceId()
uint32 MapManager::GetNumInstances()
{
Guard guard(*this);
uint32 ret = 0;
for (MapMapType::iterator itr = i_maps.begin(); itr != i_maps.end(); ++itr)
{
@@ -407,6 +413,8 @@ uint32 MapManager::GetNumInstances()
uint32 MapManager::GetNumPlayersInInstances()
{
Guard guard(*this);
uint32 ret = 0;
for (MapMapType::iterator itr = i_maps.begin(); itr != i_maps.end(); ++itr)
{
+2
View File
@@ -27,6 +27,7 @@
#include "Common.h"
#include "Map.h"
#include "GridStates.h"
#include "MapUpdater.h"
class Transport;
@@ -152,6 +153,7 @@ class MANGOS_DLL_DECL MapManager : public MaNGOS::Singleton<MapManager, MaNGOS::
IntervalTimer i_timer;
uint32 i_MaxInstanceId;
MapUpdater m_updater;
};
#endif
+126
View File
@@ -0,0 +1,126 @@
#include "MapUpdater.h"
#include "DelayExecutor.h"
#include "Map.h"
#include "Database/DatabaseEnv.h"
#include <ace/Guard_T.h>
#include <ace/Method_Request.h>
//the reason this things are here is that i want to make
//the netcode patch and the multithreaded maps independant
//once they are merged 1 class should be used
class WDBThreadStartReq1 : public ACE_Method_Request
{
public:
WDBThreadStartReq1(){}
virtual int
call (void)
{
WorldDatabase.ThreadStart();
return 0;
}
};
class WDBThreadEndReq1 : public ACE_Method_Request
{
public:
WDBThreadEndReq1(){}
virtual int
call (void)
{
WorldDatabase.ThreadEnd();
return 0;
}
};
class MapUpdateRequest : public ACE_Method_Request
{
public:
Map& m_map;
MapUpdater& m_updater;
ACE_UINT32 m_diff;
MapUpdateRequest(Map& m, MapUpdater& u, ACE_UINT32 d) : m_map(m), m_updater(u), m_diff(d){}
virtual int
call (void)
{
m_map.Update (m_diff);
m_updater.update_finished ();
return 0;
}
};
MapUpdater::MapUpdater() :
m_mutex(),
m_condition(m_mutex),
m_executor(),
pedning_requests(0)
{
return;
}
MapUpdater::~MapUpdater()
{
this->deactivate();
}
int MapUpdater::activate(size_t num_threads)
{
return this->m_executor.activate(static_cast<int> (num_threads), new WDBThreadStartReq1, new WDBThreadEndReq1);
}
int MapUpdater::deactivate(void)
{
this->wait();
return this->m_executor.deactivate();
}
int MapUpdater::wait()
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->m_mutex, -1);
while(this->pedning_requests > 0)
this->m_condition.wait();
return 0;
}
int MapUpdater::schedule_update(Map& map, ACE_UINT32 diff)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->m_mutex, -1);
++this->pedning_requests;
if (this->m_executor.execute(new MapUpdateRequest(map, *this, diff)) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) \n"), ACE_TEXT("Failed to schedule Map Update")));
--this->pedning_requests;
return -1;
}
return 0;
}
bool MapUpdater::activated()
{
return m_executor.activated();
}
void MapUpdater::update_finished()
{
ACE_GUARD(ACE_Thread_Mutex, guard, this->m_mutex);
if (this->pedning_requests == 0)
{
ACE_ERROR((LM_ERROR,ACE_TEXT("(%t)\n"), ACE_TEXT("MapUpdater::update_finished BUG, report to devs")));
return;
}
--this->pedning_requests;
this->m_condition.broadcast();
}
+36
View File
@@ -0,0 +1,36 @@
#ifndef _MAP_UPDATER_H_INCLUDED
#define _MAP_UPDATER_H_INCLUDED
#include <ace/Thread_Mutex.h>
#include <ace/Condition_Thread_Mutex.h>
#include "DelayExecutor.h"
class Map;
class MapUpdater
{
public:
MapUpdater();
virtual ~MapUpdater();
friend class MapUpdateRequest;
int schedule_update(Map& map, ACE_UINT32 diff);
int wait();
int activate(size_t num_threads);
int deactivate(void);
bool activated();
private:
void update_finished();
DelayExecutor m_executor;
ACE_Condition_Thread_Mutex m_condition;
ACE_Thread_Mutex m_mutex;
size_t pedning_requests;
};
#endif //_MAP_UPDATER_H_INCLUDED
+1
View File
@@ -162,6 +162,7 @@ Player*
ObjectAccessor::FindPlayerByName(const char *name)
{
//TODO: Player Guard
Guard guard(*HashMapHolder<Player>::GetLock());
HashMapHolder<Player>::MapType& m = HashMapHolder<Player>::GetContainer();
HashMapHolder<Player>::MapType::iterator iter = m.begin();
for (; iter != m.end(); ++iter)
+6 -1
View File
@@ -52,7 +52,11 @@ class HashMapHolder
typedef ACE_Thread_Mutex LockType;
typedef MaNGOS::GeneralLock<LockType > Guard;
static void Insert(T* o) { m_objectMap[o->GetGUID()] = o; }
static void Insert(T* o)
{
Guard guard(i_lock);
m_objectMap[o->GetGUID()] = o;
}
static void Remove(T* o)
{
@@ -62,6 +66,7 @@ class HashMapHolder
static T* Find(uint64 guid)
{
Guard guard(i_lock);
typename MapType::iterator itr = m_objectMap.find(guid);
return (itr != m_objectMap.end()) ? itr->second : NULL;
}
-9
View File
@@ -1121,12 +1121,6 @@ void Player::SetDrunkValue(uint16 newDrunkenValue, uint32 itemId)
void Player::Update( uint32 p_time )
{
// Until Trinity is thread safe, anything that could result in a
// map, zone, or area change in this Update should be preceded by:
// #pragma omp critical(UpdateThreadSafety)
// This will only allow one thread at a time to process a "UpdateThreadSafety" block.
// NOTE: I'm only certain about the map change part. The zone and area #pragma is just a precaution.
if (!IsInWorld())
return;
@@ -1294,7 +1288,6 @@ void Player::Update( uint32 p_time )
m_weaponChangeTimer -= p_time;
}
#pragma omp critical(UpdateThreadSafety)
if (m_zoneUpdateTimer > 0)
{
if (p_time >= m_zoneUpdateTimer)
@@ -1374,7 +1367,6 @@ void Player::Update( uint32 p_time )
// not auto-free ghost from body in instances
if(m_deathTimer > 0 && !GetBaseMap()->Instanceable())
{
#pragma omp critical(UpdateThreadSafety)
if(p_time >= m_deathTimer)
{
m_deathTimer = 0;
@@ -1398,7 +1390,6 @@ void Player::Update( uint32 p_time )
//we should execute delayed teleports only for alive(!) players
//because we don't want player's ghost teleported from graveyard
#pragma omp critical(UpdateThreadSafety)
if (IsHasDelayedTeleport() && isAlive())
TeleportTo(m_teleport_dest, m_teleport_options);
}
+2 -5
View File
@@ -225,11 +225,8 @@ void Unit::Update(uint32 p_time)
// WARNING! Order of execution here is important, do not change.
// Spells must be processed with event system BEFORE they go to _UpdateSpells.
// Or else we may have some SPELL_STATE_FINISHED spells stalled in pointers, that is bad.
#pragma omp critical(UpdateThreadSafety)
{
m_Events.Update(p_time);
_UpdateSpells(p_time);
}
m_Events.Update(p_time);
_UpdateSpells(p_time);
// If this is set during update SetCantProc(false) call is missing somewhere in the code
// Having this would prevent spells from being proced, so let's crash
+2 -1
View File
@@ -9,6 +9,8 @@ SET(shared_STAT_SRCS
ByteBuffer.h
Common.cpp
Common.h
DelayExecutor.cpp
DelayExecutor.h
Errors.h
Log.cpp
Log.h
@@ -21,7 +23,6 @@ SET(shared_STAT_SRCS
WorldPacket.h
SystemConfig.h
)
add_definitions(-fopenmp)
add_library(shared STATIC ${shared_STAT_SRCS})
target_link_libraries(
shared
+114
View File
@@ -0,0 +1,114 @@
#include <ace/Singleton.h>
#include <ace/Thread_Mutex.h>
#include <ace/Log_Msg.h>
#include "DelayExecutor.h"
DelayExecutor*
DelayExecutor::instance()
{
return ACE_Singleton<DelayExecutor, ACE_Thread_Mutex>::instance();
}
DelayExecutor::DelayExecutor():
activated_ (false),
pre_svc_hook_ (0),
post_svc_hook_ (0) {}
DelayExecutor::~DelayExecutor()
{
if (pre_svc_hook_)
delete pre_svc_hook_;
if (post_svc_hook_)
delete post_svc_hook_;
this->deactivate ();
}
int DelayExecutor::deactivate()
{
if (!this->activated())
return -1;
this->activated(false);
this->queue_.queue()->deactivate();
this->wait();
return 0;
}
int DelayExecutor::svc (void)
{
if (pre_svc_hook_)
pre_svc_hook_->call();
for (;;)
{
ACE_Method_Request* rq = this->queue_.dequeue();
if (!rq)
break;
rq->call();
delete rq;
}
if (post_svc_hook_)
post_svc_hook_->call();
return 0;
}
int DelayExecutor::activate(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook)
{
if (this->activated())
return -1;
if (num_threads < 1)
return -1;
if (pre_svc_hook_)
delete pre_svc_hook_;
if (post_svc_hook_)
delete post_svc_hook_;
this->pre_svc_hook_ = pre_svc_hook;
this->post_svc_hook_ = post_svc_hook;
this->queue_.queue ()->activate ();
if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1)
return -1;
this->activated(true);
return true;
}
int DelayExecutor::execute(ACE_Method_Request* new_req)
{
if (new_req == NULL)
return -1;
if (this->queue_.enqueue(new_req,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
{
delete new_req;
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1);
}
return 0;
}
bool DelayExecutor::activated()
{
return this->activated_;
}
void DelayExecutor::activated(bool s)
{
this->activated_ = s;
}
+33
View File
@@ -0,0 +1,33 @@
#ifndef _M_DELAY_EXECUTOR_H
#define _M_DELAY_EXECUTOR_H
#include <ace/Task.h>
#include <ace/Activation_Queue.h>
#include <ace/Method_Request.h>
class DelayExecutor : protected ACE_Task_Base
{
public:
DelayExecutor();
virtual ~DelayExecutor();
static DelayExecutor* instance();
int execute(ACE_Method_Request* new_req);
int activate(int num_threads = 1, ACE_Method_Request* pre_svc_hook = 0, ACE_Method_Request* post_svc_hook = 0);
int deactivate();
bool activated();
virtual int svc(void);
private:
ACE_Activation_Queue queue_;
ACE_Method_Request* pre_svc_hook_;
ACE_Method_Request* post_svc_hook_;
void activated(bool s);
bool activated_;
};
#endif // _M_DELAY_EXECUTOR_H
-56
View File
@@ -28,60 +28,6 @@
typedef ACE_TSS<MTRand> MTRandTSS;
static MTRandTSS mtRand;
#ifdef MULTI_THREAD_MAP
int32 irand (int32 min, int32 max)
{
int32 result;
#pragma omp critical (mtrand)
{
result = int32 (mtRand->randInt (max - min)) + min;
}
return result;
}
uint32 urand (uint32 min, uint32 max)
{
uint32 result;
#pragma omp critical (mtrand)
{
result = mtRand->randInt (max - min) + min;
}
return result;
}
int32 rand32 ()
{
int32 result;
#pragma omp critical (mtrand)
{
result = mtRand->randInt ();
}
return result;
}
double rand_norm(void)
{
double result;
#pragma omp critical (mtrand)
{
result = mtRand->randExc ();
}
return result;
}
double rand_chance (void)
{
double result;
#pragma omp critical (mtrand)
{
result = mtRand->randExc (100.0);
}
return result;
}
#else
int32 irand (int32 min, int32 max)
{
return int32 (mtRand->randInt (max - min)) + min;
@@ -107,8 +53,6 @@ double rand_chance (void)
return mtRand->randExc (100.0);
}
#endif
Tokens StrSplit(const std::string &src, const std::string &sep)
{
Tokens r;
+8
View File
@@ -822,6 +822,14 @@
RelativePath="..\..\src\game\MapManager.h"
>
</File>
<File
RelativePath="..\..\src\game\MapUpdater.cpp"
>
</File>
<File
RelativePath="..\..\src\game\MapUpdater.h"
>
</File>
<File
RelativePath="..\..\src\game\MiscHandler.cpp"
>
+8
View File
@@ -859,6 +859,14 @@
RelativePath="..\..\src\shared\Common.h"
>
</File>
<File
RelativePath="..\..\src\shared\DelayExecutor.cpp"
>
</File>
<File
RelativePath="..\..\src\shared\DelayExecutor.h"
>
</File>
<File
RelativePath="..\..\src\shared\LockedQueue.h"
>