Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 341 additions & 1 deletion src/ActiveRecord.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,50 @@
*/
abstract class ActiveRecord extends BaseActiveRecord
{
/*
* @var bool register_shutdown_function is set?
*/
public static $batchShutdownFunction = false;

/*
* @var Command instance of Command class for batch insert.
*/
private static $batchInsertCommand = [];
/*
* @var integer count of insert operation in queue
*/
private static $batchInsertQueue = [];
/*
* @var int size of batch for insert operations
*/
public static $batchInsertSize = 500;

/*
* @var Command instance of Command class for batch update.
*/
private static $batchUpdateCommand = [];
/*
* @var integer count of update operation in queue
*/
private static $batchUpdateQueue = [];
/*
* @var int size of batch for update operations
*/
public static $batchUpdateSize = 500;

/*
* @var Command instance of Command class for batch delete.
*/
private static $batchDeleteCommand = [];
/*
* @var integer count of delete operation in queue
*/
private static $batchDeleteQueue = [];
/*
* @var int size of batch for delete operations
*/
public static $batchDeleteSize = 500;

/**
* Returns the Mongo connection used by this AR class.
* By default, the "mongodb" application component is used as the Mongo connection.
Expand Down Expand Up @@ -411,4 +455,300 @@ private function dumpBsonObject(Type $object)
}
return ArrayHelper::toArray($object);
}
}

/**
* invoke batchInsert() or batchUpdate() base on getIsNewRecord()
* @param array $attributes list of attributes that need to be inserted or updated. Defaults to null,
* meaning all attributes that are loaded will be inserted or updated.
* @return see ActiveRecord::batchInsert()
*/
public function batchSave($attributes = null, $scope = ''){
if($this->getIsNewRecord())
return $this->batchInsert($attributes,$scope);
return $this->batchUpdate($attributes,$scope);
}

private static function batchInit(){

if(self::$batchShutdownFunction)
return;

self::$batchShutdownFunction = true;

register_shutdown_function(function(){

$className = static::className();

foreach(self::$batchInsertQueue as $scope => $_)
if(self::hasBatchInsert($scope))
if(self::$batchInsertCommand[$className][$scope]->db->enableLogging)
yii::warning($className.' : batch insert mode not completed!');

foreach(self::$batchUpdateQueue as $scope => $_)
if(self::hasBatchUpdate($scope))
if(self::$batchUpdateCommand[$className][$scope]->db->enableLogging)
yii::warning($className.' : batch update mode not completed!');

foreach(self::$batchDeleteQueue as $scope => $_)
if(self::hasBatchDelete($scope))
if(self::$batchDeleteCommand[$className][$scope]->db->enableLogging)
yii::warning($className.' : batch delete mode not completed!');
});
}

/**
* checking if current ActiveRecord class has documents in queue for insert
* @return bool
*/
public static function hasBatchInsert($scope = ''){
$className = static::className();
return @self::$batchInsertQueue[$className][$scope] ? true : false;
}

/**
* this method is invoked in first call of batchInsert() method for once
*/
private static function batchInsertInit($scope = ''){

self::batchInit();

$className = static::className();

if(@self::$batchInsertCommand[$className][$scope])
return;

if(!@self::$batchInsertQueue[$className])
self::$batchInsertQueue[$className] = [];

self::$batchInsertQueue[$className][$scope] = 0;
self::$batchInsertCommand[$className][$scope] = static::getDb()->createCommand();
}

/**
* adding insert operation to queue base on current instance data
* @param array $attributes list of attributes that need to be inserted. Defaults to null,
* meaning all attributes that are loaded will be inserted.
* @return null|array return null if no execute command or return result of Command::executeBatch()
*/
public function batchInsert($attributes = null, $scope = ''){
self::batchInsertInit($scope);
$values = $this->getDirtyAttributes($attributes);
if (empty($values)) {
$currentAttributes = $this->getAttributes();
foreach ($this->primaryKey() as $key) {
if (isset($currentAttributes[$key])) {
$values[$key] = $currentAttributes[$key];
}
}
}
$className = static::className();
self::$batchInsertCommand[$className][$scope]->AddInsert($values);
self::$batchInsertQueue[$className][$scope]++;
if(self::$batchInsertQueue[$className][$scope] >= static::$batchInsertSize)
return self::flushBatchInsert($scope);
}

/**
* resetting batch insert
*/
public static function resetBatchInsert($scope = ''){
$className = static::className();
if(!@self::$batchInsertCommand[$className][$scope])
return;
unset(
self::$batchInsertQueue[$className][$scope],
self::$batchInsertCommand[$className][$scope]
);
}

/**
* execute batch insert operations in queue and reset anything
* this method is not continue when not exists any insert operations in queue
* @return see docs of Command::executeBatch()
*/
public static function flushBatchInsert($scope = ''){
$className = static::className();
if(!@self::$batchInsertQueue[$className][$scope])
return;
$result = self::$batchInsertCommand[$className][$scope]->executeBatch(static::collectionName());
self::resetBatchInsert($scope);
return $result;
}

/**
* checking if current ActiveRecord class has documents in queue for update
* @return bool
*/
public static function hasBatchUpdate($scope = ''){
$className = static::className();
return @self::$batchUpdateQueue[$className][$scope] ? true : false;
}

/**
* this method is invoked in first call of batchUpdate() method for once
*/
private static function batchUpdateInit($scope = ''){

self::batchInit();

$className = static::className();

if(@self::$batchUpdateCommand[$className][$scope])
return;

if(!@self::$batchUpdateQueue[$className])
self::$batchUpdateQueue[$className] = [];

self::$batchUpdateQueue[$className][$scope] = 0;
self::$batchUpdateCommand[$className][$scope] = static::getDb()->createCommand();
}

/**
* adding update operation to queue base on current instance data
* @param array $attributes list of attribute names that need to be updated. Defaults to null,
* meaning all attributes that are loaded from DB will be updated.
* @return null|array return null if no execute command or return result of Command::executeBatch()
*/
public function batchUpdate($attributes = null, $scope = ''){
self::batchUpdateInit($scope);
$values = $this->getDirtyAttributes($attributes);
if (empty($values))
return;
$condition = $this->getOldPrimaryKey(true);
$className = static::className();
self::$batchUpdateCommand[$className][$scope]->addUpdate($condition, $values);
self::$batchUpdateQueue[$className][$scope]++;
if(self::$batchUpdateQueue[$className][$scope] >= static::$batchUpdateSize)
return self::flushBatchUpdate($scope);
}

/**
* adding update operation to queue
* @param array $attributes list of attribute names that need to be updated.
* @param array $condition Description of the objects to update.
* Please refer to Query::where() on how to specify this parameter.
* @param array $options List of options in format: optionName => optionValue.
* Please refer to Command::addUpdate() on how to specify this parameter.
* @return null|array return null if no execute command or return result of Command::executeBatch()
*/
public static function batchUpdateAll($attributes, $condition = [], $options = [], $scope = ''){
self::batchUpdateInit($scope);
$className = static::className();
self::$batchUpdateCommand[$className][$scope]->addUpdate($condition, $attributes, $options);
self::$batchUpdateQueue[$className][$scope]++;
if(self::$batchUpdateQueue[$className][$scope] >= static::$batchUpdateSize)
return self::flushBatchUpdate();
}

/**
* resetting batch update
*/
public static function resetBatchUpdate($scope = ''){
$className = static::className();
if(!@self::$batchUpdateCommand[$className][$scope])
return;
unset(
self::$batchUpdateQueue[$className][$scope],
self::$batchUpdateCommand[$className][$scope]
);
}

/**
* execute batch update operations in queue and reset anything
* this method is not continue when not exists any update operations in queue
* @return see docs of Command::executeBatch()
*/
public static function flushBatchUpdate($scope = ''){
$className = static::className();
if(!@self::$batchUpdateQueue[$className][$scope])
return;
$result = self::$batchUpdateCommand[$className][$scope]->executeBatch(static::collectionName());
self::resetBatchUpdate();
return $result;
}

/**
* checking if current ActiveRecord class has documents in queue for delete
* @return bool
*/
public static function hasBatchDelete($scope = ''){
$className = static::className();
return @self::$batchDeleteQueue[$className][$scope] ? true : false;
}

/**
* this method is invoked in first call of batchDelete() method for once
*/
private static function batchDeleteInit($scope = ''){

self::batchInit();

$className = static::className();

if(@self::$batchDeleteCommand[$className][$scope])
return;

if(!@self::$batchDeleteQueue[$className])
self::$batchDeleteQueue[$className] = [];

self::$batchDeleteQueue[$className][$scope] = 0;
self::$batchDeleteCommand[$className][$scope] = static::getDb()->createCommand();
}

/**
* adding delete operation to queue base on current instance data
* @return null|array return null if no execute command or return result of Command::executeBatch()
*/
public function batchDelete($scope = ''){
self::batchDeleteInit($scope);
$className = static::className();
self::$batchDeleteCommand[$className][$scope]->AddDelete($this->getOldPrimaryKey(true));
self::$batchDeleteQueue[$className][$scope]++;
if(self::$batchDeleteQueue[$className][$scope] >= static::$batchDeleteSize)
return self::flushBatchDelete($scope);
}

/**
* adding delete operation to queue
* @param array $condition Description of the objects to delete.
* Please refer to Query::where() on how to specify this parameter.
* @param array $options List of options in format: optionName => optionValue.
* Please refer to Command::AddDelete() on how to specify this parameter.
* @return null|array return null if no execute command or return result of Command::executeBatch()
*/
public static function batchDeleteAll($condition = [], $options = [], $scope = ''){
self::batchDeleteInit($scope);
$className = static::className();
self::$batchDeleteCommand[$className][$scope]->AddDelete($condition, $options);
self::$batchDeleteQueue[$className][$scope]++;
if(self::$batchDeleteQueue[$className][$scope] >= static::$batchDeleteSize)
return self::flushBatchDelete($scope);
}

/**
* resetting batch delete
*/
public static function resetBatchDelete($scope = ''){
$className = static::className();
if(!@self::$batchDeleteCommand[$className][$scope])
return;
unset(
self::$batchDeleteQueue[$className][$scope],
self::$batchDeleteCommand[$className][$scope]
);
}

/**
* execute batch delete operations in queue and reset anything
* this method is not continue when not exists any delete operations in queue
* @return see docs of Command::executeBatch()
*/
public static function flushBatchDelete($scope = ''){
$className = static::className();
if(!@self::$batchDeleteQueue[$className][$scope])
return;
$result = self::$batchDeleteCommand[$className][$scope]->executeBatch(static::collectionName());
self::resetBatchDelete($scope);
return $result;
}
}