diff --git a/src/ActiveRecord.php b/src/ActiveRecord.php index c720d94b2..8bee14b44 100644 --- a/src/ActiveRecord.php +++ b/src/ActiveRecord.php @@ -25,6 +25,50 @@ */ abstract class ActiveRecord extends BaseActiveRecord { + /* + * @var bool register_shutdown_function is set? + */ + public static $batchShutdownFunction = false; + + /* + * @var Command instance of Command class for batch insert. + */ + private static $batchInsertCommand = []; + /* + * @var integer count of insert operation in queue + */ + private static $batchInsertQueue = []; + /* + * @var int size of batch for insert operations + */ + public static $batchInsertSize = 500; + + /* + * @var Command instance of Command class for batch update. + */ + private static $batchUpdateCommand = []; + /* + * @var integer count of update operation in queue + */ + private static $batchUpdateQueue = []; + /* + * @var int size of batch for update operations + */ + public static $batchUpdateSize = 500; + + /* + * @var Command instance of Command class for batch delete. + */ + private static $batchDeleteCommand = []; + /* + * @var integer count of delete operation in queue + */ + private static $batchDeleteQueue = []; + /* + * @var int size of batch for delete operations + */ + public static $batchDeleteSize = 500; + /** * Returns the Mongo connection used by this AR class. * By default, the "mongodb" application component is used as the Mongo connection. @@ -411,4 +455,300 @@ private function dumpBsonObject(Type $object) } return ArrayHelper::toArray($object); } -} + + /** + * invoke batchInsert() or batchUpdate() base on getIsNewRecord() + * @param array $attributes list of attributes that need to be inserted or updated. Defaults to null, + * meaning all attributes that are loaded will be inserted or updated. + * @return see ActiveRecord::batchInsert() + */ + public function batchSave($attributes = null, $scope = ''){ + if($this->getIsNewRecord()) + return $this->batchInsert($attributes,$scope); + return $this->batchUpdate($attributes,$scope); + } + + private static function batchInit(){ + + if(self::$batchShutdownFunction) + return; + + self::$batchShutdownFunction = true; + + register_shutdown_function(function(){ + + $className = static::className(); + + foreach(self::$batchInsertQueue as $scope => $_) + if(self::hasBatchInsert($scope)) + if(self::$batchInsertCommand[$className][$scope]->db->enableLogging) + yii::warning($className.' : batch insert mode not completed!'); + + foreach(self::$batchUpdateQueue as $scope => $_) + if(self::hasBatchUpdate($scope)) + if(self::$batchUpdateCommand[$className][$scope]->db->enableLogging) + yii::warning($className.' : batch update mode not completed!'); + + foreach(self::$batchDeleteQueue as $scope => $_) + if(self::hasBatchDelete($scope)) + if(self::$batchDeleteCommand[$className][$scope]->db->enableLogging) + yii::warning($className.' : batch delete mode not completed!'); + }); + } + + /** + * checking if current ActiveRecord class has documents in queue for insert + * @return bool + */ + public static function hasBatchInsert($scope = ''){ + $className = static::className(); + return @self::$batchInsertQueue[$className][$scope] ? true : false; + } + + /** + * this method is invoked in first call of batchInsert() method for once + */ + private static function batchInsertInit($scope = ''){ + + self::batchInit(); + + $className = static::className(); + + if(@self::$batchInsertCommand[$className][$scope]) + return; + + if(!@self::$batchInsertQueue[$className]) + self::$batchInsertQueue[$className] = []; + + self::$batchInsertQueue[$className][$scope] = 0; + self::$batchInsertCommand[$className][$scope] = static::getDb()->createCommand(); + } + + /** + * adding insert operation to queue base on current instance data + * @param array $attributes list of attributes that need to be inserted. Defaults to null, + * meaning all attributes that are loaded will be inserted. + * @return null|array return null if no execute command or return result of Command::executeBatch() + */ + public function batchInsert($attributes = null, $scope = ''){ + self::batchInsertInit($scope); + $values = $this->getDirtyAttributes($attributes); + if (empty($values)) { + $currentAttributes = $this->getAttributes(); + foreach ($this->primaryKey() as $key) { + if (isset($currentAttributes[$key])) { + $values[$key] = $currentAttributes[$key]; + } + } + } + $className = static::className(); + self::$batchInsertCommand[$className][$scope]->AddInsert($values); + self::$batchInsertQueue[$className][$scope]++; + if(self::$batchInsertQueue[$className][$scope] >= static::$batchInsertSize) + return self::flushBatchInsert($scope); + } + + /** + * resetting batch insert + */ + public static function resetBatchInsert($scope = ''){ + $className = static::className(); + if(!@self::$batchInsertCommand[$className][$scope]) + return; + unset( + self::$batchInsertQueue[$className][$scope], + self::$batchInsertCommand[$className][$scope] + ); + } + + /** + * execute batch insert operations in queue and reset anything + * this method is not continue when not exists any insert operations in queue + * @return see docs of Command::executeBatch() + */ + public static function flushBatchInsert($scope = ''){ + $className = static::className(); + if(!@self::$batchInsertQueue[$className][$scope]) + return; + $result = self::$batchInsertCommand[$className][$scope]->executeBatch(static::collectionName()); + self::resetBatchInsert($scope); + return $result; + } + + /** + * checking if current ActiveRecord class has documents in queue for update + * @return bool + */ + public static function hasBatchUpdate($scope = ''){ + $className = static::className(); + return @self::$batchUpdateQueue[$className][$scope] ? true : false; + } + + /** + * this method is invoked in first call of batchUpdate() method for once + */ + private static function batchUpdateInit($scope = ''){ + + self::batchInit(); + + $className = static::className(); + + if(@self::$batchUpdateCommand[$className][$scope]) + return; + + if(!@self::$batchUpdateQueue[$className]) + self::$batchUpdateQueue[$className] = []; + + self::$batchUpdateQueue[$className][$scope] = 0; + self::$batchUpdateCommand[$className][$scope] = static::getDb()->createCommand(); + } + + /** + * adding update operation to queue base on current instance data + * @param array $attributes list of attribute names that need to be updated. Defaults to null, + * meaning all attributes that are loaded from DB will be updated. + * @return null|array return null if no execute command or return result of Command::executeBatch() + */ + public function batchUpdate($attributes = null, $scope = ''){ + self::batchUpdateInit($scope); + $values = $this->getDirtyAttributes($attributes); + if (empty($values)) + return; + $condition = $this->getOldPrimaryKey(true); + $className = static::className(); + self::$batchUpdateCommand[$className][$scope]->addUpdate($condition, $values); + self::$batchUpdateQueue[$className][$scope]++; + if(self::$batchUpdateQueue[$className][$scope] >= static::$batchUpdateSize) + return self::flushBatchUpdate($scope); + } + + /** + * adding update operation to queue + * @param array $attributes list of attribute names that need to be updated. + * @param array $condition Description of the objects to update. + * Please refer to Query::where() on how to specify this parameter. + * @param array $options List of options in format: optionName => optionValue. + * Please refer to Command::addUpdate() on how to specify this parameter. + * @return null|array return null if no execute command or return result of Command::executeBatch() + */ + public static function batchUpdateAll($attributes, $condition = [], $options = [], $scope = ''){ + self::batchUpdateInit($scope); + $className = static::className(); + self::$batchUpdateCommand[$className][$scope]->addUpdate($condition, $attributes, $options); + self::$batchUpdateQueue[$className][$scope]++; + if(self::$batchUpdateQueue[$className][$scope] >= static::$batchUpdateSize) + return self::flushBatchUpdate(); + } + + /** + * resetting batch update + */ + public static function resetBatchUpdate($scope = ''){ + $className = static::className(); + if(!@self::$batchUpdateCommand[$className][$scope]) + return; + unset( + self::$batchUpdateQueue[$className][$scope], + self::$batchUpdateCommand[$className][$scope] + ); + } + + /** + * execute batch update operations in queue and reset anything + * this method is not continue when not exists any update operations in queue + * @return see docs of Command::executeBatch() + */ + public static function flushBatchUpdate($scope = ''){ + $className = static::className(); + if(!@self::$batchUpdateQueue[$className][$scope]) + return; + $result = self::$batchUpdateCommand[$className][$scope]->executeBatch(static::collectionName()); + self::resetBatchUpdate(); + return $result; + } + + /** + * checking if current ActiveRecord class has documents in queue for delete + * @return bool + */ + public static function hasBatchDelete($scope = ''){ + $className = static::className(); + return @self::$batchDeleteQueue[$className][$scope] ? true : false; + } + + /** + * this method is invoked in first call of batchDelete() method for once + */ + private static function batchDeleteInit($scope = ''){ + + self::batchInit(); + + $className = static::className(); + + if(@self::$batchDeleteCommand[$className][$scope]) + return; + + if(!@self::$batchDeleteQueue[$className]) + self::$batchDeleteQueue[$className] = []; + + self::$batchDeleteQueue[$className][$scope] = 0; + self::$batchDeleteCommand[$className][$scope] = static::getDb()->createCommand(); + } + + /** + * adding delete operation to queue base on current instance data + * @return null|array return null if no execute command or return result of Command::executeBatch() + */ + public function batchDelete($scope = ''){ + self::batchDeleteInit($scope); + $className = static::className(); + self::$batchDeleteCommand[$className][$scope]->AddDelete($this->getOldPrimaryKey(true)); + self::$batchDeleteQueue[$className][$scope]++; + if(self::$batchDeleteQueue[$className][$scope] >= static::$batchDeleteSize) + return self::flushBatchDelete($scope); + } + + /** + * adding delete operation to queue + * @param array $condition Description of the objects to delete. + * Please refer to Query::where() on how to specify this parameter. + * @param array $options List of options in format: optionName => optionValue. + * Please refer to Command::AddDelete() on how to specify this parameter. + * @return null|array return null if no execute command or return result of Command::executeBatch() + */ + public static function batchDeleteAll($condition = [], $options = [], $scope = ''){ + self::batchDeleteInit($scope); + $className = static::className(); + self::$batchDeleteCommand[$className][$scope]->AddDelete($condition, $options); + self::$batchDeleteQueue[$className][$scope]++; + if(self::$batchDeleteQueue[$className][$scope] >= static::$batchDeleteSize) + return self::flushBatchDelete($scope); + } + + /** + * resetting batch delete + */ + public static function resetBatchDelete($scope = ''){ + $className = static::className(); + if(!@self::$batchDeleteCommand[$className][$scope]) + return; + unset( + self::$batchDeleteQueue[$className][$scope], + self::$batchDeleteCommand[$className][$scope] + ); + } + + /** + * execute batch delete operations in queue and reset anything + * this method is not continue when not exists any delete operations in queue + * @return see docs of Command::executeBatch() + */ + public static function flushBatchDelete($scope = ''){ + $className = static::className(); + if(!@self::$batchDeleteQueue[$className][$scope]) + return; + $result = self::$batchDeleteCommand[$className][$scope]->executeBatch(static::collectionName()); + self::resetBatchDelete($scope); + return $result; + } +} \ No newline at end of file