diff --git a/CHANGELOG.md b/CHANGELOG.md index c7672b201..5870d0898 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,11 @@ Yii Framework 2 mongodb extension Change Log 2.1.10 under development ------------------------ +- Enh #294: Add transactions support (ziaratban) - Bug #308: Fix `yii\mongodb\file\Upload::addFile()` error when uploading file with readonly permissions (sparchatus) + 2.1.9 November 19, 2019 ----------------------- diff --git a/src/ActiveRecord.php b/src/ActiveRecord.php index c720d94b2..8b5a8cdee 100644 --- a/src/ActiveRecord.php +++ b/src/ActiveRecord.php @@ -9,6 +9,7 @@ use MongoDB\BSON\Binary; use MongoDB\BSON\Type; +use MongoDB\BSON\ObjectId; use Yii; use yii\base\InvalidConfigException; use yii\db\BaseActiveRecord; @@ -25,6 +26,27 @@ */ abstract class ActiveRecord extends BaseActiveRecord { + /** + * The insert operation. This is mainly used when overriding [[transactions()]] to specify which operations are transactional. + */ + const OP_INSERT = 0x01; + + /** + * The update operation. This is mainly used when overriding [[transactions()]] to specify which operations are transactional. + */ + const OP_UPDATE = 0x02; + + /** + * The delete operation. This is mainly used when overriding [[transactions()]] to specify which operations are transactional. + */ + const OP_DELETE = 0x04; + + /** + * All three operations: insert, update, delete. + * This is a shortcut of the expression: OP_INSERT | OP_UPDATE | OP_DELETE. + */ + const OP_ALL = 0x07; + /** * Returns the Mongo connection used by this AR class. * By default, the "mongodb" application component is used as the Mongo connection. @@ -208,8 +230,15 @@ public function insert($runValidation = true, $attributes = null) if ($runValidation && !$this->validate($attributes)) { return false; } - $result = $this->insertInternal($attributes); + if (!$this->isTransactional(self::OP_INSERT)) { + return $this->insertInternal($attributes); + } + + $result = null; + static::getDb()->transaction(function() use ($attribute, &$result) { + $result = $this->insertInternal($attributes); + }); return $result; } @@ -243,6 +272,76 @@ protected function insertInternal($attributes = null) return true; } + /** + * Saves the changes to this active record into the associated database table. + * + * This method performs the following steps in order: + * + * 1. call [[beforeValidate()]] when `$runValidation` is `true`. If [[beforeValidate()]] + * returns `false`, the rest of the steps will be skipped; + * 2. call [[afterValidate()]] when `$runValidation` is `true`. If validation + * failed, the rest of the steps will be skipped; + * 3. call [[beforeSave()]]. If [[beforeSave()]] returns `false`, + * the rest of the steps will be skipped; + * 4. save the record into database. If this fails, it will skip the rest of the steps; + * 5. call [[afterSave()]]; + * + * In the above step 1, 2, 3 and 5, events [[EVENT_BEFORE_VALIDATE]], + * [[EVENT_AFTER_VALIDATE]], [[EVENT_BEFORE_UPDATE]], and [[EVENT_AFTER_UPDATE]] + * will be raised by the corresponding methods. + * + * Only the [[dirtyAttributes|changed attribute values]] will be saved into database. + * + * For example, to update a customer record: + * + * ```php + * $customer = Customer::findOne($id); + * $customer->name = $name; + * $customer->email = $email; + * $customer->update(); + * ``` + * + * Note that it is possible the update does not affect any row in the table. + * In this case, this method will return 0. For this reason, you should use the following + * code to check if update() is successful or not: + * + * ```php + * if ($customer->update() !== false) { + * // update successful + * } else { + * // update failed + * } + * ``` + * + * @param bool $runValidation whether to perform validation (calling [[validate()]]) + * before saving the record. Defaults to `true`. If the validation fails, the record + * will not be saved to the database and this method will return `false`. + * @param array $attributeNames list of attributes that need to be saved. Defaults to `null`, + * meaning all attributes that are loaded from DB will be saved. + * @return int|false the number of rows affected, or false if validation fails + * or [[beforeSave()]] stops the updating process. + * @throws StaleObjectException if [[optimisticLock|optimistic locking]] is enabled and the data + * being updated is outdated. + * @throws \Exception|\Throwable in case update failed. + */ + public function update($runValidation = true, $attributeNames = null) + { + if ($runValidation && !$this->validate($attributeNames)) { + Yii::info('Model not updated due to validation error.', __METHOD__); + return false; + } + + if (!$this->isTransactional(self::OP_UPDATE)) { + return $this->updateInternal($attributeNames); + } + + $result = null; + static::getDb()->transaction(function() use ($attributeNames, &$result) { + $result = $this->updateInternal($attributeNames); + }); + return $result; + } + /** * @see ActiveRecord::update() * @throws StaleObjectException @@ -308,12 +407,14 @@ protected function updateInternal($attributes = null) */ public function delete() { - $result = false; - if ($this->beforeDelete()) { - $result = $this->deleteInternal(); - $this->afterDelete(); + if (!$this->isTransactional(self::OP_DELETE)) { + return $this->deleteInternal(); } + $result = null; + static::getDb()->transaction(function() use (&$result) { + $result = $this->deleteInternal(); + }); return $result; } @@ -323,6 +424,9 @@ public function delete() */ protected function deleteInternal() { + if (!$this->beforeDelete()) { + return false; + } // we do not check the return value of deleteAll() because it's possible // the record is already deleted in the database and thus the method will return 0 $condition = $this->getOldPrimaryKey(true); @@ -335,6 +439,7 @@ protected function deleteInternal() throw new StaleObjectException('The object being deleted is outdated.'); } $this->setOldAttributes(null); + $this->afterDelete(); return $result; } @@ -411,4 +516,146 @@ private function dumpBsonObject(Type $object) } return ArrayHelper::toArray($object); } -} + + /** + * Locks a document of the collection in a transaction (like `select for update` feature in MySQL) + * @see https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions + * @param mixed $id a document id (primary key > _id) + * @param string $lockFieldName The name of the field you want to lock. + * @param array $modifyOptions list of the options in format: optionName => optionValue. + * @param Connection $db the Mongo connection uses it to execute the query. + * @return ActiveRecord|null the locked document. + * Returns instance of ActiveRecord. Null will be returned if the query does not have a result. + */ + public static function LockDocument($id, $lockFieldName, $modifyOptions = [], $db = null) + { + $db = $db ? $db : static::getDb(); + $db->transactionReady('lock document'); + $options['new'] = true; + return static::find() + ->where(['_id' => $id]) + ->modify( + [ + '$set' =>[$lockFieldName => new ObjectId] + ], + $modifyOptions, + $db + ) + ; + } + + /** + * Locking a document in stubborn mode on a transaction (like `select for update` feature in MySQL) + * @see https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions + * notice : you can not use stubborn mode if transaction is started in current session (or use your session with `mySession` parameter). + * @param mixed $id a document id (primary key > _id) + * @param array $options list of options in format: + * [ + * 'mySession' => false, # A custom session instance of ClientSession for start a transaction. + * 'transactionOptions' => [], # New transaction options. see $transactionOptions in Transaction::start() + * 'modifyOptions' => [], # See $options in ActiveQuery::modify() + * 'sleep' => 1000000, # A time parameter in microseconds to wait. the default is one second. + * 'try' => 0, # Maximum count of retry. throw write conflict error after reached this value. the zero default is unlimited. + * 'lockFieldName' => '_lock' # The name of the field you want to lock. default is '_lock' + * ] + * @param Connection $db the Mongo connection uses it to execute the query. + * @return ActiveRecord|null returns the locked document. + * Returns instance of ActiveRecord. Null will be returned if the query does not have a result. + * When the total number of attempts to lock the document passes `try`, conflict error will be thrown + */ + public static function LockDocumentStubbornly($id, $lockFieldName, $options = [], $db = null) + { + $db = $db ? $db : static::getDb(); + + $options = array_replace_recursive( + [ + 'mySession' => false, + 'transactionOptions' => [], + 'modifyOptions' => [], + 'sleep' => 1000000, + 'try' => 0, + ], + $options + ); + + $options['modifyOptions']['new'] = true; + + $session = $options['mySession'] ? $options['mySession'] : $db->startSessionOnce(); + + if ($session->getInTransaction()) { + throw new Exception('You can\'t use stubborn lock feature because current connection is in a transaction.'); + } + + // start stubborn + $tiredCounter = 0; + StartStubborn: + $session->transaction->start($options['transactionOptions']); + try { + $doc = static::find() + ->where(['_id' => $id]) + ->modify( + [ + '$set' => [ + $lockFieldName => new ObjectId + ] + ], + $options['modifyOptions'], + $db + ); + return $doc; + } catch(\Exception $e) { + $session->transaction->rollBack(); + $tiredCounter++; + if ($options['try'] !== 0 && $tiredCounter === $options['try']) { + throw $e; + } + usleep($options['sleep']); + goto StartStubborn; + } + } + + /** + * Declares which DB operations should be performed within a transaction in different scenarios. + * The supported DB operations are: [[OP_INSERT]], [[OP_UPDATE]] and [[OP_DELETE]], + * which correspond to the [[insert()]], [[update()]] and [[delete()]] methods, respectively. + * By default, these methods are NOT enclosed in a DB transaction. + * + * In some scenarios, to ensure data consistency, you may want to enclose some or all of them + * in transactions. You can do so by overriding this method and returning the operations + * that need to be transactional. For example, + * + * ```php + * return [ + * 'admin' => self::OP_INSERT, + * 'api' => self::OP_INSERT | self::OP_UPDATE | self::OP_DELETE, + * // the above is equivalent to the following: + * // 'api' => self::OP_ALL, + * + * ]; + * ``` + * + * The above declaration specifies that in the "admin" scenario, the insert operation ([[insert()]]) + * should be done in a transaction; and in the "api" scenario, all the operations should be done + * in a transaction. + * + * @return array the declarations of transactional operations. The array keys are scenarios names, + * and the array values are the corresponding transaction operations. + */ + public function transactions() + { + return []; + } + + /** + * Returns a value indicating whether the specified operation is transactional in the current [[$scenario]]. + * @param int $operation the operation to check. Possible values are [[OP_INSERT]], [[OP_UPDATE]] and [[OP_DELETE]]. + * @return bool whether the specified operation is transactional in the current [[scenario]]. + */ + public function isTransactional($operation) + { + $scenario = $this->getScenario(); + $transactions = $this->transactions(); + + return isset($transactions[$scenario]) && ($transactions[$scenario] & $operation); + } +} \ No newline at end of file diff --git a/src/ClientSession.php b/src/ClientSession.php new file mode 100644 index 000000000..738ec17fc --- /dev/null +++ b/src/ClientSession.php @@ -0,0 +1,173 @@ + + */ +class ClientSession extends \yii\base\BaseObject +{ + + /** + * @var Connection the database connection that this transaction is associated with. + */ + public $db; + + /** + * @var MongoDB\Driver\Session class represents a client session and Commands, + * queries, and write operations may then be associated the session. + * @see https://www.php.net/manual/en/class.mongodb-driver-session.php + */ + public $mongoSession; + + /** + * @var Transaction the current transaction in session. this transaction can only be created once. + */ + private $_transaction = null; + + /** + * Prepares options for some purposes + * @param array by reference + * convert string option to object + * [ + * 'defaultTransactionOptions' => [ + * 'readConcern' => 'snapshot', + * 'writeConcern' => 'majority', + * 'writeConcern' => ['majority',true], + * 'readPreference' => 'primary', + * ], + * ] + * convert to : + * [ + * 'defaultTransactionOptions' => [ + * 'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'), + * 'writeConcern' => new \MongoDB\Driver\WriteConcern('majority'), + * 'writeConcern' => new \MongoDB\Driver\WriteConcern('majority',true), + * 'readPreference' => new \MongoDB\Driver\ReadPreference('primary'), + * ], + * ] + */ + public static function prepareOptions(&$options) + { + if (array_key_exists('defaultTransactionOptions', $options)) { + + //convert readConcern + if ( + array_key_exists('readConcern', $options['defaultTransactionOptions']) && + is_string($options['defaultTransactionOptions']['readConcern']) + ) { + $options['defaultTransactionOptions']['readConcern'] = new ReadConcern($options['defaultTransactionOptions']['readConcern']); + } + + //convert writeConcern + if (array_key_exists('writeConcern',$options['defaultTransactionOptions'])) { + if ( + is_string($options['defaultTransactionOptions']['writeConcern']) || + is_int($options['defaultTransactionOptions']['writeConcern']) + ) { + $options['defaultTransactionOptions']['writeConcern'] = new WriteConcern($options['defaultTransactionOptions']['writeConcern']); + } elseif (is_array($options['defaultTransactionOptions']['writeConcern'])) { + $options['defaultTransactionOptions']['writeConcern'] = + (new \ReflectionClass('\MongoDB\Driver\WriteConcern')) + ->newInstanceArgs( + $options['defaultTransactionOptions']['writeConcern'] + ) + ; + } + } + + //Convert readPreference + if (array_key_exists('readPreference',$options['defaultTransactionOptions'])) { + if (is_string($options['defaultTransactionOptions']['readPreference'])) { + $options['defaultTransactionOptions']['readPreference'] = new ReadPreference($options['defaultTransactionOptions']['readPreference']); + } else if(is_array($options['defaultTransactionOptions']['readPreference'])) { + $options['defaultTransactionOptions']['readPreference'] = + (new \ReflectionClass('\MongoDB\Driver\ReadPreference')) + ->newInstanceArgs( + $options['defaultTransactionOptions']['readPreference'] + ) + ; + } + } + } + } + + /** + * Returns the logical session ID as string for this session, which may be used to identify this session's operations on the server. + * @see https://www.php.net/manual/en/mongodb-driver-session.getlogicalsessionid.php + * @return string + */ + public function getId() + { + return $this->mongoSession->getLogicalSessionId()->id->jsonSerialize()['$binary']; + } + + /** + * Starts a new mongodb session in a connection. + * @param Connection $db + * @param Array $sessionOptions Creates a ClientSession for the given options {@see https://www.php.net/manual/en/mongodb-driver-manager.startsession.php#refsect1-mongodb-driver-manager.startsession-parameters} + * @return ClientSession returns new session base on a session options for the given connection + */ + public static function start($db, $sessionOptions = []) + { + self::prepareOptions($sessionOptions); + if ($db->enableProfiling) { + Yii::debug('Starting mongodb session ...', __METHOD__); + } + $db->trigger(Connection::EVENT_START_SESSION); + $newSession = new self([ + 'db' => $db, + 'mongoSession' => $db->manager->startSession($sessionOptions), + ]); + if ($db->enableProfiling) { + Yii::debug('MongoDB session started.', __METHOD__); + } + return $newSession; + } + + /** + * Gets a current transaction of session or creates a new transaction once + * @return Transaction returns current transaction + */ + public function getTransaction() + { + if ($this->_transaction === null) { + return $this->_transaction = new Transaction(['clientSession' => $this]); + } + return $this->_transaction; + } + + /** + * Returns true if the transaction is in progress + * @return bool + */ + public function getInTransaction() + { + return $this->mongoSession->isInTransaction(); + } + + /** + * Ends the current session. + */ + public function end() + { + $this->mongoSession->endSession(); + $this->db->trigger(Connection::EVENT_END_SESSION); + } +} \ No newline at end of file diff --git a/src/Collection.php b/src/Collection.php index c897fd4cb..281edc718 100644 --- a/src/Collection.php +++ b/src/Collection.php @@ -56,23 +56,25 @@ public function getFullName() /** * Drops this collection. + * @param array $execOptions {@see Command::dropCollection()} * @throws Exception on failure. * @return bool whether the operation successful. */ - public function drop() + public function drop($execOptions = []) { - return $this->database->dropCollection($this->name); + return $this->database->dropCollection($this->name, $execOptions); } /** * Returns the list of defined indexes. * @return array list of indexes info. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::listIndexes()} * @since 2.1 */ - public function listIndexes($options = []) + public function listIndexes($options = [], $execOptions = []) { - return $this->database->createCommand()->listIndexes($this->name, $options); + return $this->database->createCommand()->listIndexes($this->name, $options, $execOptions); } /** @@ -107,23 +109,25 @@ public function listIndexes($options = []) * * See [[https://docs.mongodb.com/manual/reference/method/db.collection.createIndex/#options-for-all-index-types]] * for the full list of options. + * @param array $execOptions {@see Command::createIndexes()} * @return bool whether operation was successful. * @since 2.1 */ - public function createIndexes($indexes) + public function createIndexes($indexes, $execOptions = []) { - return $this->database->createCommand()->createIndexes($this->name, $indexes); + return $this->database->createCommand()->createIndexes($this->name, $indexes, $execOptions); } /** * Drops collection indexes by name. * @param string $indexes wildcard for name of the indexes to be dropped. * You can use `*` to drop all indexes. + * @param array $execOptions {@see Command::dropIndexes()} * @return int count of dropped indexes. */ - public function dropIndexes($indexes) + public function dropIndexes($indexes, $execOptions = []) { - $result = $this->database->createCommand()->dropIndexes($this->name, $indexes); + $result = $this->database->createCommand()->dropIndexes($this->name, $indexes, $execOptions); return $result['nIndexesWas']; } @@ -144,13 +148,14 @@ public function dropIndexes($indexes) * ``` * * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::createIndexes()} * @throws Exception on failure. * @return bool whether the operation successful. */ - public function createIndex($columns, $options = []) + public function createIndex($columns, $options = [], $execOptions = []) { $index = array_merge(['key' => $columns], $options); - return $this->database->createCommand()->createIndexes($this->name, [$index]); + return $this->database->createCommand()->createIndexes($this->name, [$index], $execOptions); } /** @@ -171,17 +176,18 @@ public function createIndex($columns, $options = []) * ] * ``` * + * @param array $execOptions {@see Command::dropIndexes()} * @throws Exception on failure. * @return bool whether the operation successful. */ - public function dropIndex($columns) + public function dropIndex($columns, $execOptions = []) { $existingIndexes = $this->listIndexes(); $indexKey = $this->database->connection->getQueryBuilder()->buildSortFields($columns); foreach ($existingIndexes as $index) { if ($index['key'] == $indexKey) { - $this->database->createCommand()->dropIndexes($this->name, $index['name']); + $this->database->createCommand()->dropIndexes($this->name, $index['name'], $execOptions); return true; } } @@ -190,7 +196,7 @@ public function dropIndex($columns) $indexName = $this->database->connection->getQueryBuilder()->generateIndexName($indexKey); foreach ($existingIndexes as $index) { if ($index['name'] === $indexName) { - $this->database->createCommand()->dropIndexes($this->name, $index['name']); + $this->database->createCommand()->dropIndexes($this->name, $index['name'], $execOptions); return true; } } @@ -200,12 +206,13 @@ public function dropIndex($columns) /** * Drops all indexes for this collection. + * @param array $execOptions {@see Command::dropIndexes()} * @throws Exception on failure. * @return int count of dropped indexes. */ - public function dropAllIndexes() + public function dropAllIndexes($execOptions = []) { - $result = $this->database->createCommand()->dropIndexes($this->name, '*'); + $result = $this->database->createCommand()->dropIndexes($this->name, '*', $execOptions); return isset($result['nIndexesWas']) ? $result['nIndexesWas'] : 0; } @@ -215,15 +222,16 @@ public function dropAllIndexes() * @param array $condition query condition * @param array $fields fields to be selected * @param array $options query options (available since 2.1). + * @param array $execOptions {@see Command::find()} * @return \MongoDB\Driver\Cursor cursor for the search results * @see Query */ - public function find($condition = [], $fields = [], $options = []) + public function find($condition = [], $fields = [], $options = [], $execOptions = []) { if (!empty($fields)) { $options['projection'] = $fields; } - return $this->database->createCommand()->find($this->name, $condition, $options); + return $this->database->createCommand()->find($this->name, $condition, $options, $execOptions); } /** @@ -231,12 +239,13 @@ public function find($condition = [], $fields = [], $options = []) * @param array $condition query condition * @param array $fields fields to be selected * @param array $options query options (available since 2.1). + * @param array $execOptions {@see find()} * @return array|null the single document. Null is returned if the query results in nothing. */ - public function findOne($condition = [], $fields = [], $options = []) + public function findOne($condition = [], $fields = [], $options = [], $execOptions = []) { $options['limit'] = 1; - $cursor = $this->find($condition, $fields, $options); + $cursor = $this->find($condition, $fields, $options, $execOptions); $rows = $cursor->toArray(); return empty($rows) ? null : current($rows); } @@ -246,12 +255,13 @@ public function findOne($condition = [], $fields = [], $options = []) * @param array $condition query condition * @param array $update update criteria * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::findAndModify()} * @return array|null the original document, or the modified document when $options['new'] is set. * @throws Exception on failure. */ - public function findAndModify($condition, $update, $options = []) + public function findAndModify($condition, $update, $options = [], $execOptions = []) { - return $this->database->createCommand()->findAndModify($this->name, $condition, $update, $options); + return $this->database->createCommand()->findAndModify($this->name, $condition, $update, $options, $execOptions); } /** @@ -259,23 +269,25 @@ public function findAndModify($condition, $update, $options = []) * @param array|object $data data to be inserted. * @param array $options list of options in format: optionName => optionValue. * @return \MongoDB\BSON\ObjectID new record ID instance. + * @param array $execOptions {@see Command::insert()} * @throws Exception on failure. */ - public function insert($data, $options = []) + public function insert($data, $options = [], $execOptions = []) { - return $this->database->createCommand()->insert($this->name, $data, $options); + return $this->database->createCommand()->insert($this->name, $data, $options, $execOptions); } /** * Inserts several new rows into collection. * @param array $rows array of arrays or objects to be inserted. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::batchInsert()} * @return array inserted data, each row will have "_id" key assigned to it. * @throws Exception on failure. */ - public function batchInsert($rows, $options = []) + public function batchInsert($rows, $options = [], $execOptions = []) { - $insertedIds = $this->database->createCommand()->batchInsert($this->name, $rows, $options); + $insertedIds = $this->database->createCommand()->batchInsert($this->name, $rows, $options, $execOptions); foreach ($rows as $key => $row) { $rows[$key]['_id'] = $insertedIds[$key]; } @@ -289,12 +301,13 @@ public function batchInsert($rows, $options = []) * @param array $condition description of the objects to update. * @param array $newData the object with which to update the matching records. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::update()} * @return int|bool number of updated documents or whether operation was successful. * @throws Exception on failure. */ - public function update($condition, $newData, $options = []) + public function update($condition, $newData, $options = [], $execOptions = []) { - $writeResult = $this->database->createCommand()->update($this->name, $condition, $newData, $options); + $writeResult = $this->database->createCommand()->update($this->name, $condition, $newData, $options, $execOptions); return $writeResult->getModifiedCount() + $writeResult->getUpsertedCount(); } @@ -303,16 +316,17 @@ public function update($condition, $newData, $options = []) * @param array|object $data data to be updated/inserted. * @param array $options list of options in format: optionName => optionValue. * @return \MongoDB\BSON\ObjectID updated/new record id instance. + * @param array $execOptions {@see Command::insert()} * @throws Exception on failure. */ - public function save($data, $options = []) + public function save($data, $options = [], $execOptions = []) { if (empty($data['_id'])) { - return $this->insert($data, $options); + return $this->insert($data, $options, $execOptions); } $id = $data['_id']; unset($data['_id']); - $this->update(['_id' => $id], ['$set' => $data], ['upsert' => true]); + $this->update(['_id' => $id], ['$set' => $data], ['upsert' => true], $execOptions); return is_object($id) ? $id : new ObjectID($id); } @@ -321,13 +335,14 @@ public function save($data, $options = []) * Removes data from the collection. * @param array $condition description of records to remove. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::delete()} * @return int|bool number of updated documents or whether operation was successful. * @throws Exception on failure. */ - public function remove($condition = [], $options = []) + public function remove($condition = [], $options = [], $execOptions = []) { $options = array_merge(['limit' => 0], $options); - $writeResult = $this->database->createCommand()->delete($this->name, $condition, $options); + $writeResult = $this->database->createCommand()->delete($this->name, $condition, $options, $execOptions); return $writeResult->getDeletedCount(); } @@ -335,12 +350,13 @@ public function remove($condition = [], $options = []) * Counts records in this collection. * @param array $condition query condition * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::count()} * @return int records count. * @since 2.1 */ - public function count($condition = [], $options = []) + public function count($condition = [], $options = [], $execOptions = []) { - return $this->database->createCommand()->count($this->name, $condition, $options); + return $this->database->createCommand()->count($this->name, $condition, $options, $execOptions); } /** @@ -348,12 +364,13 @@ public function count($condition = [], $options = []) * @param string $column column to use. * @param array $condition query parameters. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see Command::distinct()} * @return array|bool array of distinct values, or "false" on failure. * @throws Exception on failure. */ - public function distinct($column, $condition = [], $options = []) + public function distinct($column, $condition = [], $options = [], $execOptions = []) { - return $this->database->createCommand()->distinct($this->name, $column, $condition, $options); + return $this->database->createCommand()->distinct($this->name, $column, $condition, $options, $execOptions); } /** @@ -362,12 +379,13 @@ public function distinct($column, $condition = [], $options = []) * otherwise - an array of aggregation results. * @param array $pipelines list of pipeline operators. * @param array $options optional parameters. + * @param array $execOptions {@see Command::aggregate()} * @return array|\MongoDB\Driver\Cursor the result of the aggregation. * @throws Exception on failure. */ - public function aggregate($pipelines, $options = []) + public function aggregate($pipelines, $options = [], $execOptions = []) { - return $this->database->createCommand()->aggregate($this->name, $pipelines, $options); + return $this->database->createCommand()->aggregate($this->name, $pipelines, $options, $execOptions); } /** @@ -382,12 +400,13 @@ public function aggregate($pipelines, $options = []) * @param array $options optional parameters to the group command. Valid options include: * - condition - criteria for including a document in the aggregation. * - finalize - function called once per unique key that takes the final output of the reduce function. + * @param array $execOptions {@see Command::group()} * @return array the result of the aggregation. * @throws Exception on failure. */ - public function group($keys, $initial, $reduce, $options = []) + public function group($keys, $initial, $reduce, $options = [], $execOptions = []) { - return $this->database->createCommand()->group($this->name, $keys, $initial, $reduce, $options); + return $this->database->createCommand()->group($this->name, $keys, $initial, $reduce, $options, $execOptions); } /** @@ -426,11 +445,12 @@ public function group($keys, $initial, $reduce, $options = []) * - jsMode: bool, specifies whether to convert intermediate data into BSON format between the execution of the map and reduce functions. * - verbose: bool, specifies whether to include the timing information in the result information. * + * @param array $execOptions {@see Command::mapReduce()} * @return string|array the map reduce output collection name or output results. * @throws Exception on failure. */ - public function mapReduce($map, $reduce, $out, $condition = [], $options = []) + public function mapReduce($map, $reduce, $out, $condition = [], $options = [], $execOptions = []) { - return $this->database->createCommand()->mapReduce($this->name, $map, $reduce, $out, $condition, $options); + return $this->database->createCommand()->mapReduce($this->name, $map, $reduce, $out, $condition, $options, $execOptions); } } diff --git a/src/Command.php b/src/Command.php index e56b0b51c..f8c621174 100644 --- a/src/Command.php +++ b/src/Command.php @@ -10,10 +10,10 @@ use MongoDB\BSON\ObjectID; use MongoDB\Driver\BulkWrite; use MongoDB\Driver\Exception\RuntimeException; +use MongoDB\Driver\WriteResult; use MongoDB\Driver\ReadConcern; -use MongoDB\Driver\ReadPreference; use MongoDB\Driver\WriteConcern; -use MongoDB\Driver\WriteResult; +use MongoDB\Driver\ReadPreference; use Yii; use yii\base\InvalidConfigException; use yii\base\BaseObject; @@ -74,106 +74,102 @@ class Command extends BaseObject public $document = []; /** - * @var ReadPreference|int|string|null command read preference. - */ - private $_readPreference; - /** - * @var WriteConcern|int|string|null write concern to be used by this command. - */ - private $_writeConcern; - /** - * @var ReadConcern|string read concern to be used by this command - */ - private $_readConcern; - + * @var array default options for `executeCommand` method of MongoDB\Driver\Manager. + */ + public $globalExecOptions = []; /** - * Returns read preference for this command. - * @return ReadPreference read preference. - */ - public function getReadPreference() + * prepares execOptions for some purposes + * @param array|object|null $execOptions {@see prepareManagerOptions()} + */ + private function prepareExecCommandOptions(&$execOptions) { - if (!is_object($this->_readPreference)) { - if ($this->_readPreference === null) { - $this->_readPreference = $this->db->manager->getReadPreference(); - } elseif (is_scalar($this->_readPreference)) { - $this->_readPreference = new ReadPreference($this->_readPreference); - } + if (empty($execOptions)) { + $execOptions = array_merge($this->globalExecOptions['command'],$this->globalExecOptions['share']); } - return $this->_readPreference; + self::prepareManagerOptions($execOptions); } /** - * Sets read preference for this command. - * @param ReadPreference|int|string|null $readPreference read reference, it can be specified as - * instance of [[ReadPreference]] or scalar mode value, for example: `ReadPreference::RP_PRIMARY`. - * @return $this self reference. - */ - public function setReadPreference($readPreference) - { - $this->_readPreference = $readPreference; - return $this; - } - - /** - * Returns write concern for this command. - * @return WriteConcern|null write concern to be used in this command. - */ - public function getWriteConcern() + * prepares execOptions for some purposes + * @param array|object|null $execOptions {@see prepareManagerOptions()} + */ + private function prepareExecBulkWriteOptions(&$execOptions) { - if ($this->_writeConcern !== null) { - if (is_scalar($this->_writeConcern)) { - $this->_writeConcern = new WriteConcern($this->_writeConcern); - } + if (empty($execOptions)) { + $execOptions = array_merge($this->globalExecOptions['bulkWrite'],$this->globalExecOptions['share']); } - return $this->_writeConcern; + self::prepareManagerOptions($execOptions); } /** - * Sets write concern for this command. - * @param WriteConcern|int|string|null $writeConcern write concern, it can be an instance of [[WriteConcern]] - * or its scalar mode value, for example: `majority`. - * @return $this self reference - */ - public function setWriteConcern($writeConcern) + * prepares execOptions for some purposes + * @param array|object|null $execOptions {@see prepareManagerOptions()} + */ + private function prepareExecQueryOptions(&$execOptions) { - $this->_writeConcern = $writeConcern; - return $this; - } + if (empty($execOptions)) { + $execOptions = array_merge($this->globalExecOptions['query'],$this->globalExecOptions['share']); + } + self::prepareManagerOptions($execOptions); + } + + /** + * preapare Concern and Preference options for easy use + * @param array|object $options by reference + * convert string option to object + * ['readConcern' => 'snapshot'] > ['readConcern' => new \MongoDB\Driver\ReadConcern('snapshot')] + * ['writeConcern' => 'majority'] > ['writeConcern' => new \MongoDB\Driver\WriteConcern('majority')] + * ['writeConcern' => ['majority',true]] > ['writeConcern' => new \MongoDB\Driver\WriteConcern('majority',true)] + * ['readPreference' => 'snapshot'] > ['readPreference' => new \MongoDB\Driver\ReadPreference('primary')] + * {@see https://www.php.net/manual/en/mongodb-driver-manager.executecommand.php#refsect1-mongodb-driver-manager.executecommand-parameters} + * {@see https://www.php.net/manual/en/mongodb-driver-manager.executebulkwrite.php#refsect1-mongodb-driver-manager.executebulkwrite-parameters} + * {@see https://www.php.net/manual/en/mongodb-driver-server.executequery.php#refsect1-mongodb-driver-server.executequery-parameters} + */ + public static function prepareManagerOptions(&$options) + { + //Convert readConcern option + if (array_key_exists('readConcern', $options) && is_string($options['readConcern'])) { + $options['readConcern'] = new ReadConcern($options['readConcern']); + } - /** - * Retuns read concern for this command. - * @return ReadConcern|string read concern to be used in this command. - */ - public function getReadConcern() - { - if ($this->_readConcern !== null) { - if (is_scalar($this->_readConcern)) { - $this->_readConcern = new ReadConcern($this->_readConcern); + //Convert writeConcern option + if (array_key_exists('writeConcern', $options)) { + if (is_string($options['writeConcern']) || is_int($options['writeConcern'])) { + $options['writeConcern'] = new WriteConcern($options['writeConcern']); + } elseif (is_array($options['writeConcern'])) { + $options['writeConcern'] = (new \ReflectionClass('\MongoDB\Driver\WriteConcern'))->newInstanceArgs($options['writeConcern']); } } - return $this->_readConcern; - } - /** - * Sets read concern for this command. - * @param ReadConcern|string $readConcern read concern, it can be an instance of [[ReadConcern]] or - * scalar level value, for example: 'local'. - * @return $this self reference - */ - public function setReadConcern($readConcern) - { - $this->_readConcern = $readConcern; - return $this; - } + //Convert readPreference option + if (array_key_exists('readPreference', $options)) { + if (is_string($options['readPreference'])) { + $options['readPreference'] = new ReadPreference($options['readPreference']); + } elseif (is_array($options['readPreference'])) { + $options['readPreference'] = (new \ReflectionClass('\MongoDB\Driver\ReadPreference'))->newInstanceArgs($options['readPreference']); + } + } + + //Convert session option + if (array_key_exists('session', $options) && $options['session'] instanceof ClientSession) { + $options['session'] = $options['session']->mongoSession; + } + } /** * Executes this command. + * @param array $execOptions (@see prepareExecCommandOptions()) + * Note: "readConcern" and "writeConcern" options will not default to corresponding values from the MongoDB + * Connection URI nor will the MongoDB server version be taken into account + * @see https://www.php.net/manual/en/mongodb-driver-server.executebulkwrite.php#refsect1-mongodb-driver-server.executebulkwrite-parameters * @return \MongoDB\Driver\Cursor result cursor. * @throws Exception on failure. */ - public function execute() + public function execute($execOptions = []) { + $this->prepareExecCommandOptions($execOptions); + $databaseName = $this->databaseName === null ? $this->db->defaultDatabaseName : $this->databaseName; $token = $this->log([$databaseName, 'command'], $this->document, __METHOD__); @@ -183,7 +179,7 @@ public function execute() $this->db->open(); $mongoCommand = new \MongoDB\Driver\Command($this->document); - $cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $this->getReadPreference()); + $cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $execOptions); $cursor->setTypeMap($this->db->typeMap); $this->endProfile($token, __METHOD__); @@ -199,16 +195,20 @@ public function execute() * Execute commands batch (bulk). * @param string $collectionName collection name. * @param array $options batch options. + * @param array $execOptions (@see prepareExecBulkWriteOptions()) * @return array array of 2 elements: * * - 'insertedIds' - contains inserted IDs. * - 'result' - [[\MongoDB\Driver\WriteResult]] instance. * + * @see https://www.php.net/manual/en/mongodb-driver-server.executebulkwrite.php#refsect1-mongodb-driver-server.executebulkwrite-parameters * @throws Exception on failure. * @throws InvalidConfigException on invalid [[document]] format. */ - public function executeBatch($collectionName, $options = []) + public function executeBatch($collectionName, $options = [], $execOptions = []) { + $this->prepareExecBulkWriteOptions($execOptions); + $databaseName = $this->databaseName === null ? $this->db->defaultDatabaseName : $this->databaseName; $token = $this->log([$databaseName, $collectionName, 'bulkWrite'], $this->document, __METHOD__); @@ -236,7 +236,7 @@ public function executeBatch($collectionName, $options = []) } $this->db->open(); - $writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $this->getWriteConcern()); + $writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $execOptions); $this->endProfile($token, __METHOD__); } catch (RuntimeException $e) { @@ -254,11 +254,14 @@ public function executeBatch($collectionName, $options = []) * Executes this command as a mongo query * @param string $collectionName collection name * @param array $options query options. + * @param array $execOptions (@see prepareExecQueryOptions()) * @return \MongoDB\Driver\Cursor result cursor. * @throws Exception on failure */ - public function query($collectionName, $options = []) + public function query($collectionName, $options = [], $execOptions = []) { + $this->prepareExecQueryOptions($execOptions); + $databaseName = $this->databaseName === null ? $this->db->defaultDatabaseName : $this->databaseName; $token = $this->log( @@ -273,17 +276,12 @@ public function query($collectionName, $options = []) __METHOD__ ); - $readConcern = $this->getReadConcern(); - if ($readConcern !== null) { - $options['readConcern'] = $readConcern; - } - try { $this->beginProfile($token, __METHOD__); $query = new \MongoDB\Driver\Query($this->document, $options); $this->db->open(); - $cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $this->getReadPreference()); + $cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $execOptions); $cursor->setTypeMap($this->db->typeMap); $this->endProfile($token, __METHOD__); @@ -297,13 +295,13 @@ public function query($collectionName, $options = []) /** * Drops database associated with this command. + * @param array $execOptions {@see execute()} * @return bool whether operation was successful. */ - public function dropDatabase() + public function dropDatabase($execOptions = []) { $this->document = $this->db->getQueryBuilder()->dropDatabase(); - - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } @@ -311,26 +309,26 @@ public function dropDatabase() * Creates new collection in database associated with this command.s * @param string $collectionName collection name * @param array $options collection options in format: "name" => "value" + * @param array $execOptions {@see execute()} * @return bool whether operation was successful. */ - public function createCollection($collectionName, array $options = []) + public function createCollection($collectionName, array $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->createCollection($collectionName, $options); - - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } /** * Drops specified collection. * @param string $collectionName name of the collection to be dropped. + * @param array $execOptions {@see execute()} * @return bool whether operation was successful. */ - public function dropCollection($collectionName) + public function dropCollection($collectionName, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->dropCollection($collectionName); - - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } @@ -348,13 +346,13 @@ public function dropCollection($collectionName) * * See [[https://docs.mongodb.com/manual/reference/method/db.collection.createIndex/#options-for-all-index-types]] * for the full list of options. + * @param array $execOptions {@see execute()} * @return bool whether operation was successful. */ - public function createIndexes($collectionName, $indexes) + public function createIndexes($collectionName, $indexes, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->createIndexes($this->databaseName, $collectionName, $indexes); - - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['ok'] > 0; } @@ -362,28 +360,29 @@ public function createIndexes($collectionName, $indexes) * Drops collection indexes by name. * @param string $collectionName collection name. * @param string $indexes wildcard for name of the indexes to be dropped. + * @param array $execOptions {@see execute()} * @return array result data. */ - public function dropIndexes($collectionName, $indexes) + public function dropIndexes($collectionName, $indexes, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->dropIndexes($collectionName, $indexes); - - return current($this->execute()->toArray()); + return current($this->execute($execOptions)->toArray()); } /** * Returns information about current collection indexes. * @param string $collectionName collection name * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see execute()} * @return array list of indexes info. * @throws Exception on failure. */ - public function listIndexes($collectionName, $options = []) + public function listIndexes($collectionName, $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->listIndexes($collectionName, $options); try { - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); } catch (Exception $e) { // The server may return an error if the collection does not exist. $notFoundCodes = [ @@ -405,13 +404,13 @@ public function listIndexes($collectionName, $options = []) * @param string $collectionName collection name * @param array $condition filter condition * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see execute()} * @return int records count */ - public function count($collectionName, $condition = [], $options = []) + public function count($collectionName, $condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->count($collectionName, $condition, $options); - - $result = current($this->execute()->toArray()); + $result = current($this->execute($execOptions)->toArray()); return $result['n']; } @@ -486,13 +485,14 @@ public function addDelete($condition, $options = []) * @param string $collectionName collection name * @param array $document document content * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see executeBatch()} * @return ObjectID|bool inserted record ID, `false` - on failure. */ - public function insert($collectionName, $document, $options = []) + public function insert($collectionName, $document, $options = [], $execOptions = []) { $this->document = []; $this->addInsert($document); - $result = $this->executeBatch($collectionName, $options); + $result = $this->executeBatch($collectionName, $options, $execOptions); if ($result['result']->getInsertedCount() < 1) { return false; @@ -506,9 +506,10 @@ public function insert($collectionName, $document, $options = []) * @param string $collectionName collection name * @param array[] $documents documents list * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see executeBatch()} * @return array|false list of inserted IDs, `false` on failure. */ - public function batchInsert($collectionName, $documents, $options = []) + public function batchInsert($collectionName, $documents, $options = [], $execOptions = []) { $this->document = []; foreach ($documents as $key => $document) { @@ -518,7 +519,7 @@ public function batchInsert($collectionName, $documents, $options = []) ]; } - $result = $this->executeBatch($collectionName, $options); + $result = $this->executeBatch($collectionName, $options, $execOptions); if ($result['result']->getInsertedCount() < 1) { return false; @@ -533,9 +534,10 @@ public function batchInsert($collectionName, $documents, $options = []) * @param array $condition filter condition * @param array $document data to be updated. * @param array $options update options. + * @param array $execOptions {@see executeBatch()} * @return WriteResult write result. */ - public function update($collectionName, $condition, $document, $options = []) + public function update($collectionName, $condition, $document, $options = [], $execOptions = []) { $batchOptions = []; foreach (['bypassDocumentValidation'] as $name) { @@ -547,7 +549,7 @@ public function update($collectionName, $condition, $document, $options = []) $this->document = []; $this->addUpdate($condition, $document, $options); - $result = $this->executeBatch($collectionName, $batchOptions); + $result = $this->executeBatch($collectionName, $batchOptions, $execOptions); return $result['result']; } @@ -557,9 +559,10 @@ public function update($collectionName, $condition, $document, $options = []) * @param string $collectionName collection name. * @param array $condition filter condition. * @param array $options delete options. + * @param array $execOptions {@see executeBatch()} * @return WriteResult write result. */ - public function delete($collectionName, $condition, $options = []) + public function delete($collectionName, $condition, $options = [], $execOptions = []) { $batchOptions = []; foreach (['bypassDocumentValidation'] as $name) { @@ -571,7 +574,7 @@ public function delete($collectionName, $condition, $options = []) $this->document = []; $this->addDelete($condition, $options); - $result = $this->executeBatch($collectionName, $batchOptions); + $result = $this->executeBatch($collectionName, $batchOptions, $execOptions); return $result['result']; } @@ -581,9 +584,10 @@ public function delete($collectionName, $condition, $options = []) * @param string $collectionName collection name * @param array $condition filter condition * @param array $options query options. + * @param array $execOptions {@see query()} * @return \MongoDB\Driver\Cursor result cursor. */ - public function find($collectionName, $condition, $options = []) + public function find($collectionName, $condition, $options = [], $execOptions = []) { $queryBuilder = $this->db->getQueryBuilder(); @@ -612,7 +616,7 @@ public function find($collectionName, $condition, $options = []) } } - return $this->query($collectionName, $options); + return $this->query($collectionName, $options, $execOptions); } /** @@ -621,12 +625,13 @@ public function find($collectionName, $condition, $options = []) * @param array $condition query condition * @param array $update update criteria * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see execute()} * @return array|null the original document, or the modified document when $options['new'] is set. */ - public function findAndModify($collectionName, $condition = [], $update = [], $options = []) + public function findAndModify($collectionName, $condition = [], $update = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->findAndModify($collectionName, $condition, $update, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -643,12 +648,13 @@ public function findAndModify($collectionName, $condition = [], $update = [], $o * @param string $fieldName field name to use. * @param array $condition query parameters. * @param array $options list of options in format: optionName => optionValue. + * @param array $execOptions {@see execute()} * @return array array of distinct values, or "false" on failure. */ - public function distinct($collectionName, $fieldName, $condition = [], $options = []) + public function distinct($collectionName, $fieldName, $condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->distinct($collectionName, $fieldName, $condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -672,12 +678,13 @@ public function distinct($collectionName, $fieldName, $condition = [], $options * @param array $options optional parameters to the group command. Valid options include: * - condition - criteria for including a document in the aggregation. * - finalize - function called once per unique key that takes the final output of the reduce function. + * @param array $execOptions {@see execute()} * @return array the result of the aggregation. */ - public function group($collectionName, $keys, $initial, $reduce, $options = []) + public function group($collectionName, $keys, $initial, $reduce, $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->group($collectionName, $keys, $initial, $reduce, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -705,12 +712,13 @@ public function group($collectionName, $keys, $initial, $reduce, $options = []) * - jsMode: bool, specifies whether to convert intermediate data into BSON format between the execution of the map and reduce functions. * - verbose: bool, specifies whether to include the timing information in the result information. * + * @param array $execOptions {@see execute()} * @return string|array the map reduce output collection name or output results. */ - public function mapReduce($collectionName, $map, $reduce, $out, $condition = [], $options = []) + public function mapReduce($collectionName, $map, $reduce, $out, $condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->mapReduce($collectionName, $map, $reduce, $out, $condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); @@ -724,9 +732,10 @@ public function mapReduce($collectionName, $map, $reduce, $out, $condition = [], * @param string $collectionName collection name * @param array $pipelines list of pipeline operators. * @param array $options optional parameters. + * @param array $execOptions {@see execute()} * @return array|\MongoDB\Driver\Cursor aggregation result. */ - public function aggregate($collectionName, $pipelines, $options = []) + public function aggregate($collectionName, $pipelines, $options = [], $execOptions = []) { if (empty($options['cursor'])) { $returnCursor = false; @@ -736,7 +745,7 @@ public function aggregate($collectionName, $pipelines, $options = []) } $this->document = $this->db->getQueryBuilder()->aggregate($collectionName, $pipelines, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); if ($returnCursor) { return $cursor; @@ -749,12 +758,13 @@ public function aggregate($collectionName, $pipelines, $options = []) * Return an explanation of the query, often useful for optimization and debugging. * @param string $collectionName collection name * @param array $query query document. + * @param array $execOptions {@see execute()} * @return array explanation of the query. */ - public function explain($collectionName, $query) + public function explain($collectionName, $query, $execOptions = []) { $this->document = $this->db->getQueryBuilder()->explain($collectionName, $query); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); return current($cursor->toArray()); } @@ -763,16 +773,17 @@ public function explain($collectionName, $query) * Returns the list of available databases. * @param array $condition filter condition. * @param array $options options list. + * @param array $execOptions {@see execute()} * @return array database information */ - public function listDatabases($condition = [], $options = []) + public function listDatabases($condition = [], $options = [], $execOptions = []) { if ($this->databaseName === null) { $this->databaseName = 'admin'; } $this->document = $this->db->getQueryBuilder()->listDatabases($condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); $result = current($cursor->toArray()); if (empty($result['databases'])) { @@ -785,12 +796,13 @@ public function listDatabases($condition = [], $options = []) * Returns the list of available collections. * @param array $condition filter condition. * @param array $options options list. + * @param array $execOptions {@see execute()} * @return array collections information. */ - public function listCollections($condition = [], $options = []) + public function listCollections($condition = [], $options = [], $execOptions = []) { $this->document = $this->db->getQueryBuilder()->listCollections($condition, $options); - $cursor = $this->execute(); + $cursor = $this->execute($execOptions); return $cursor->toArray(); } @@ -839,4 +851,4 @@ protected function endProfile($token, $category) Yii::endProfile($token, $category); } } -} +} \ No newline at end of file diff --git a/src/Connection.php b/src/Connection.php index 590f29d82..c6e5cb657 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -83,6 +83,26 @@ class Connection extends Component * @event Event an event that is triggered after a DB connection is established */ const EVENT_AFTER_OPEN = 'afterOpen'; + /** + * @event yii\base\Event an event that is triggered right before a mongo client session is started + */ + const EVENT_START_SESSION = 'startSession'; + /** + * @event yii\base\Event an event that is triggered right after a mongo client session is ended + */ + const EVENT_END_SESSION = 'endSession'; + /** + * @event yii\base\Event an event that is triggered right before a transaction is started + */ + const EVENT_START_TRANSACTION = 'startTransaction'; + /** + * @event yii\base\Event an event that is triggered right after a transaction is committed + */ + const EVENT_COMMIT_TRANSACTION = 'commitTransaction'; + /** + * @event yii\base\Event an event that is triggered right after a transaction is rolled back + */ + const EVENT_ROLLBACK_TRANSACTION = 'rollbackTransaction'; /** * @var string host:port @@ -155,6 +175,21 @@ class Connection extends Component */ public $fileStreamWrapperClass = 'yii\mongodb\file\StreamWrapper'; + /** + * @var array default options for `executeCommand` , executeBulkWrite and executeQuery method of MongoDB\Driver\Manager in `Command` class. + */ + public $globalExecOptions = [ + /** + * Shared between some(or all) methods(executeCommand|executeBulkWrite|executeQuery). + * This options are : + * - session + */ + 'share' => [], + 'command' => [], + 'bulkWrite' => [], + 'query' => [], + ]; + /** * @var string name of the MongoDB database to use by default. * If this field left blank, connection instance will attempt to determine it from @@ -349,7 +384,7 @@ public function open() } $token = 'Opening MongoDB connection: ' . $this->dsn; try { - Yii::trace($token, __METHOD__); + Yii::debug($token, __METHOD__); Yii::beginProfile($token, __METHOD__); $options = $this->options; @@ -380,7 +415,7 @@ public function open() public function close() { if ($this->manager !== null) { - Yii::trace('Closing MongoDB connection: ' . $this->dsn, __METHOD__); + Yii::debug('Closing MongoDB connection: ' . $this->dsn, __METHOD__); $this->manager = null; foreach ($this->_databases as $database) { $database->clearCollections(); @@ -412,6 +447,7 @@ public function createCommand($document = [], $databaseName = null) 'db' => $this, 'databaseName' => $databaseName, 'document' => $document, + 'globalExecOptions' => $this->globalExecOptions ]); } @@ -432,4 +468,229 @@ public function registerFileStreamWrapper($force = false) return $this->fileStreamProtocol; } -} + + /** + * Recursive replacement on $this->globalExecOptions with new options. {@see $this->globalExecOptions} + * @param array $newExecOptions {@see $this->globalExecOptions} + * @return $this + */ + public function execOptions($newExecOptions) + { + if (empty($newExecOptions)) { + $this->globalExecOptions = []; + } + else { + $this->globalExecOptions = array_replace_recursive($this->globalExecOptions, $newExecOptions); + } + return $this; + } + + /** + * Ends the previous session and starts the new session. + * @param array $sessionOptions see doc of ClientSession::start() + * return ClientSession + */ + public function startSession($sessionOptions = []) + { + + if ($this->getInSession()) { + $this->getSession()->end(); + } + + $newSession = $this->newSession($sessionOptions); + $this->setSession($newSession); + return $newSession; + } + + /** + * Starts a new session if the session has not started, otherwise returns previous session. + * @param array $sessionOptions see doc of ClientSession::start() + * return ClientSession + */ + public function startSessionOnce($sessionOptions = []) + { + if ($this->getInSession()) { + return $this->getSession(); + } + return $this->startSession($sessionOptions); + } + + /** + * Only starts the new session for current connection but this session does not set for current connection. + * @param array $sessionOptions see doc of ClientSession::start() + * return ClientSession + */ + public function newSession($sessionOptions = []) + { + return ClientSession::start($this, $sessionOptions); + } + + /** + * Checks whether the current connection is in session. + * return bool + */ + public function getInSession() + { + return array_key_exists('session',$this->globalExecOptions['share']); + } + + /** + * Checks that the current connection is in session and transaction + * return bool + */ + public function getInTransaction() + { + return $this->getInSession() && $this->getSession()->getInTransaction(); + } + + /** + * Throws custom error if transaction is not ready in connection + * @param string $operation a custom message to be shown + */ + public function transactionReady($operation) + { + if (!$this->getInSession()) { + throw new Exception('You can\'t ' . $operation . ' because current connection is\'t in a session.'); + } + if (!$this->getSession()->getInTransaction()) { + throw new Exception('You can\'t ' . $operation . ' because transaction not started in current session.'); + } + } + + /** + * Returns current session + * return ClientSession|null + */ + public function getSession() + { + return $this->getInSession() ? $this->globalExecOptions['share']['session'] : null; + } + + /** + * Starts a transaction with three steps : + * - starts new session if has not started + * - starts the transaction in new session + * - sets new session to current connection + * @param array $transactionOptions see doc of Transaction::start() + * @param array $sessionOptions see doc of ClientSession::start() + * return ClientSession + */ + public function startTransaction($transactionOptions = [], $sessionOptions = []) + { + $session = $this->startSession($sessionOptions,true); + $session->getTransaction()->start($transactionOptions); + return $session; + } + + /** + * Starts a transaction in current session if the previous transaction was not started in current session. + * @param array $transactionOptions see doc of Transaction::start() + * @param array $sessionOptions see doc of ClientSession::start() + * return ClientSession + */ + public function startTransactionOnce($transactionOptions = [], $sessionOptions = []) + { + if ($this->getInTransaction()) { + return $this->getSession(); + } + return $this->startTransaction($transactionOptions,$sessionOptions); + } + + /** + * Commits transaction in current session + */ + public function commitTransaction() + { + $this->transactionReady('commit transaction'); + $this->getSession()->transaction->commit(); + } + + /** + * Rollbacks transaction in current session + */ + public function rollBackTransaction() + { + $this->transactionReady('roll back transaction'); + $this->getSession()->transaction->rollBack(); + } + + /** + * Changes the current session of connection to execute commands (or drop session) + * @param ClientSession|null $clientSession new instance of ClientSession to replace + * return $this + */ + public function setSession($clientSession) + { + #drop session + if (empty($clientSession)) { + unset($this->globalExecOptions['share']['session']); + } + else { + $this->globalExecOptions['share']['session'] = $clientSession; + } + return $this; + } + + /** + * Starts and commits a transaction in easy mode. + * @param callable $actions your block of code must be runned after transaction started and before commit + * if the $actions returns false then transaction rolls back. + * @param array $transactionOptions see doc of Transaction::start() + * @param array $sessionOptions see doc of ClientSession::start() + */ + public function transaction(callable $actions, $transactionOptions = [], $sessionOptions = []) + { + $session = $this->startTransaction($transactionOptions, $sessionOptions); + $success = false; + try { + $result = call_user_func($actions, $session); + if ($session->getTransaction()->getIsActive()) { + if ($result === false) { + $session->getTransaction()->rollBack(); + } + else { + $session->getTransaction()->commit(); + } + } + $success = true; + } finally { + if (!$success && $session->getTransaction()->getIsActive()) { + $session->getTransaction()->rollBack(); + } + } + } + + /** + * Starts and commits transaction in easy mode if the previous transaction was not executed, + * otherwise only runs your actions in previous transaction. + * @param callable $actions your block of code must be runned after transaction started and before commit + * @param array $transactionOptions see doc of Transaction::start() + * @param array $sessionOptions see doc of ClientSession::start() + */ + public function transactionOnce(callable $actions, $transactionOptions = [], $sessionOptions = []) + { + if ($this->getInTransaction()) { + $actions(); + } + else { + $this->transaction($actions,$transactionOptions,$sessionOptions); + } + } + + /** + * Runs your mongodb command out of session and transaction. + * @param callable $actions your block of code must be runned out of session and transaction + * @return mixed returns a result of $actions() + */ + public function noTransaction(callable $actions) + { + $lastSession = $this->getSession(); + $this->setSession(null); + try { + $result = $actions(); + } finally { + $this->setSession($lastSession); + } + return $result; + } +} \ No newline at end of file diff --git a/src/Database.php b/src/Database.php index 395aac79e..0eba51b5a 100644 --- a/src/Database.php +++ b/src/Database.php @@ -115,35 +115,38 @@ public function createCommand($document = []) * you need to create collection with the specific options. * @param string $name name of the collection * @param array $options collection options in format: "name" => "value" + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. * @throws Exception on failure. */ - public function createCollection($name, $options = []) + public function createCollection($name, $options = [], $execOptions = []) { - return $this->createCommand()->createCollection($name, $options); + return $this->createCommand()->createCollection($name, $options, $execOptions); } /** * Drops specified collection. * @param string $name name of the collection + * @param array $execOptions -> goto Command::execute() * @return bool whether operation was successful. * @since 2.1 */ - public function dropCollection($name) + public function dropCollection($name, $execOptions = []) { - return $this->createCommand()->dropCollection($name); + return $this->createCommand()->dropCollection($name, $execOptions); } /** * Returns the list of available collections in this database. * @param array $condition filter condition. * @param array $options options list. + * @param array $execOptions -> goto Command::execute() * @return array collections information. * @since 2.1.1 */ - public function listCollections($condition = [], $options = []) + public function listCollections($condition = [], $options = [], $execOptions = []) { - return $this->createCommand()->listCollections($condition, $options); + return $this->createCommand()->listCollections($condition, $options, $execOptions); } /** diff --git a/src/Transaction.php b/src/Transaction.php new file mode 100644 index 000000000..5c40faeb8 --- /dev/null +++ b/src/Transaction.php @@ -0,0 +1,142 @@ + + */ +class Transaction extends \yii\base\BaseObject +{ + + const STATE_NONE = 'none'; + const STATE_STARTING = 'starting'; + const STATE_ABORTED = 'aborted'; + const STATE_COMMITTED = 'committed'; + + /** + * @var MongoDB\Driver\Session class represents a client session and Commands, queries, and write operations may then be associated the session. + * @see https://www.php.net/manual/en/class.mongodb-driver-session.php + */ + public $clientSession; + + /** + * Set debug message if `enableLogging` property is enable in yii\mongodb\Connection + * @var string $message please see $this->yiiDebug() + * @var string $category please see $this->yiiDebug() + */ + protected function yiiDebug($message, $category = 'mongodb') + { + if ($this->clientSession->db->enableLogging) { + Yii::debug($message,$category); + } + } + + /** + * Begin profile if `enableProfiling` property is enable in yii\mongodb\Connection + * @var string $token please see $this->yiiBeginProfile() + * @var string $category please see $this->yiiBeginProfile() + */ + protected function yiiBeginProfile($token, $category = 'mongodb') + { + if ($this->clientSession->db->enableProfiling) { + Yii::beginProfile($token,$category); + } + } + + /** + * End profile if `enableProfiling` property is enable in yii\mongodb\Connection + * @var string $token please see $this->yiiEndProfile() + * @var string $category please see $this->yiiEndProfile() + */ + protected function yiiEndProfile($token, $category = 'mongodb') + { + if ($this->clientSession->db->enableProfiling) { + Yii::endProfile($token,$category); + } + } + + /** + * Returns the transaction state. + */ + public function getState() + { + return $this->clientSession->mongoSession->getTransactionState(); + } + + /** + * Returns a value indicating whether this transaction is active. + * @return bool whether this transaction is active. Only an active transaction + * can [[commit()]] or [[rollBack()]]. + */ + public function getIsActive() + { + return $this->clientSession->db->getIsActive() && $this->clientSession->getInTransaction(); + } + + /** + * Start a transaction if session is not in transaction process. + * @see https://www.php.net/manual/en/mongodb-driver-session.starttransaction.php + * @param array $transactionOptions Options can be passed as argument to this method. + * Each element in this options array overrides the corresponding option from the "sessionOptions" option, + * if set when starting the session with ClientSession::start(). + * @see https://www.php.net/manual/en/mongodb-driver-session.starttransaction.php#refsect1-mongodb-driver-session.starttransaction-parameters + */ + public function start($transactionOptions = []) + { + Command::prepareManagerOptions($transactionOptions); + $this->yiiDebug('Starting mongodb transaction ...', __METHOD__); + if ($this->clientSession->getInTransaction()) { + throw new Exception('Nested transaction not supported'); + } + $this->clientSession->db->trigger(Connection::EVENT_START_TRANSACTION); + $this->yiiBeginProfile('mongodb > start transaction(session id => ' . $this->clientSession->getId() . ')'); + $this->clientSession->mongoSession->startTransaction($transactionOptions); + $this->yiiDebug('MongoDB transaction started.', __METHOD__); + } + + /** + * Commit a transaction. + * @see https://www.php.net/manual/en/mongodb-driver-session.committransaction.php + */ + public function commit() + { + $this->yiiDebug('Committing mongodb transaction ...', __METHOD__); + $this->clientSession->mongoSession->commitTransaction(); + $this->yiiEndProfile('mongodb > start transaction(session id => ' . $this->clientSession->getId() . ')'); + $this->yiiDebug('Commit mongodb transaction.', __METHOD__); + $this->clientSession->db->trigger(Connection::EVENT_COMMIT_TRANSACTION); + } + + /** + * Rolls back a transaction. + * @see https://www.php.net/manual/en/mongodb-driver-session.aborttransaction.php + */ + public function rollBack() + { + $this->yiiDebug('Rolling back mongodb transaction ...', __METHOD__); + $this->clientSession->mongoSession->abortTransaction(); + $this->yiiEndProfile('mongodb > start transaction(session id => ' . $this->clientSession->getId() . ')'); + $this->yiiDebug('Roll back mongodb transaction.', __METHOD__); + $this->clientSession->db->trigger(Connection::EVENT_ROLLBACK_TRANSACTION); + } +} \ No newline at end of file diff --git a/src/file/Collection.php b/src/file/Collection.php index 860867c96..1ede82dad 100644 --- a/src/file/Collection.php +++ b/src/file/Collection.php @@ -128,31 +128,30 @@ public function getFileCollection($refresh = false) 'name' => $this->name ]); } - return $this->_fileCollection; } /** * {@inheritdoc} */ - public function drop() + public function drop($execOptions = []) { - return parent::drop() && $this->database->dropCollection($this->getChunkCollection()->name); + return parent::drop($execOptions) && $this->database->dropCollection($this->getChunkCollection()->name,$execOptions); } /** * {@inheritdoc} * @return Cursor cursor for the search results */ - public function find($condition = [], $fields = [], $options = []) + public function find($condition = [], $fields = [], $options = [], $execOptions = []) { - return new Cursor($this, parent::find($condition, $fields, $options)); + return new Cursor($this, parent::find($condition, $fields, $options, $execOptions)); } /** * {@inheritdoc} */ - public function remove($condition = [], $options = []) + public function remove($condition = [], $options = [], $execOptions = []) { $fileCollection = $this->getFileCollection(); $chunkCollection = $this->getChunkCollection(); @@ -166,7 +165,7 @@ public function remove($condition = [], $options = []) $batchSize = 200; $options['batchSize'] = $batchSize; - $cursor = $fileCollection->find($condition, ['_id'], $options); + $cursor = $fileCollection->find($condition, ['_id'], $options, $execOptions); unset($options['limit']); $deleteCount = 0; $deleteCallback = function ($ids) use ($fileCollection, $chunkCollection, $options) { diff --git a/src/rbac/MongoDbManager.php b/src/rbac/MongoDbManager.php index 78f73113d..97b96f9ac 100644 --- a/src/rbac/MongoDbManager.php +++ b/src/rbac/MongoDbManager.php @@ -133,7 +133,7 @@ protected function checkAccessFromCache($user, $itemName, $params, $assignments) $item = $this->items[$itemName]; - Yii::trace($item instanceof Role ? "Checking role: $itemName" : "Checking permission: $itemName", __METHOD__); + Yii::debug($item instanceof Role ? "Checking role: $itemName" : "Checking permission: $itemName", __METHOD__); if (!$this->executeRule($user, $item, $params)) { return false; @@ -172,7 +172,7 @@ protected function checkAccessRecursive($user, $itemName, $params, $assignments) return false; } - Yii::trace($item instanceof Role ? "Checking role: $itemName" : "Checking permission: $itemName", __METHOD__); + Yii::debug($item instanceof Role ? "Checking role: $itemName" : "Checking permission: $itemName", __METHOD__); if (!$this->executeRule($user, $item, $params)) { return false;