Index: includes/database/mysql/query.inc =================================================================== RCS file: /cvs/drupal/drupal/includes/database/mysql/query.inc,v retrieving revision 1.18 diff -u -p -r1.18 query.inc --- includes/database/mysql/query.inc 26 Jun 2010 01:40:05 -0000 1.18 +++ includes/database/mysql/query.inc 13 Sep 2010 14:07:49 -0000 @@ -103,6 +103,97 @@ class TruncateQuery_mysql extends Trunca } } +class MergeQuery_mysql extends MergeQuery { + /** + * Run the MERGE query against the database. + * + * @return + * A status indicating the executed operation: + * - MergeQuery::STATUS_INSERT for an INSERT operation. + * - MergeQuery::STATUS_UPDATE for an UPDATE operation. + */ + public function execute() { + // If validation fails, simply return NULL. + // Note that validation routines in preExecute() may throw exceptions instead. + if (!$this->preExecute()) { + return NULL; + } + + // In the degenerate case of this query type, we have to run multiple + // queries as there is no universal single-query mechanism that will work. + + // Wrap multiple queries in a transaction, if the database supports it. + $transaction = $this->connection->startTransaction(); + + try { + // Manually check if the record already exists. + // We build a 'SELECT 1 FROM table WHERE conditions LOCK IN SHARE MODE' + // query, that will write lock the rows that matches our set of conditions + // as well as return the information that there are such rows. SELECT + // queries matching the rows will not be locked. + $select = $this->connection->select($this->table); + $select->addExpression('1'); + foreach ($this->keyFields as $field => $value) { + $select->condition($field, $value); + } + + // Using SELECT FOR UPDATE syntax will lock the rows we want to attempt to update. + $sql = ((string) $select) . ' LOCK IN SHARE MODE'; + $arguments = $select->getArguments(); + + // If there are no existing records, run an insert query. + if (!$this->connection->query($sql, $arguments)->fetchField()) { + // If there is no existing record, run an insert query. + $insert_fields = $this->insertFields + $this->keyFields; + try { + $this->connection->insert($this->table, $this->queryOptions)->fields($insert_fields)->execute(); + return MergeQuery::STATUS_INSERT; + } + catch (Exception $e) { + // The insert query failed, maybe it's because a racing insert query + // beat us in inserting the same row. Retry the select query, if it + // returns a row, ignore the error and continue with the update + // query below. + if (!$this->connection->query($sql, $arguments)->fetchField()) { + throw $e; + } + } + } + + // Proceed with an update query if a row was found. + if ($this->updateFields) { + $update_fields = $this->updateFields; + } + else { + $update_fields = $this->insertFields; + // If there are no exclude fields, this is a no-op. + foreach ($this->excludeFields as $exclude_field) { + unset($update_fields[$exclude_field]); + } + } + if ($update_fields || $this->expressionFields) { + // Only run the update if there are fields or expressions to update. + $update = $this->connection->update($this->table, $this->queryOptions)->fields($update_fields); + foreach ($this->keyFields as $field => $value) { + $update->condition($field, $value); + } + foreach ($this->expressionFields as $field => $expression) { + $update->expression($field, $expression['expression'], $expression['arguments']); + } + $update->execute(); + return MergeQuery::STATUS_UPDATE; + } + } + catch (Exception $e) { + // Something really wrong happened here, bubble up the exception to the + // caller. + $transaction->rollback(); + throw $e; + } + // Transaction commits here where $transaction looses scope. + } +} + /** * @} End of "ingroup database". */