diff --git a/README.txt b/README.txt index a0c91dc..34995e5 100644 --- a/README.txt +++ b/README.txt @@ -30,7 +30,24 @@ Then mongodb_collections will allow mapping collections to aliases: $conf['mongodb_collections'] = array( 'watchdog' => 'logginghost', -) +); + +If you are using the 1.3 drivers you can specify the following connection +options db_connection and read_preference. These options allow to control +collection level mongo read preferences and whether any tags should be used. + +$conf['mongodb_collections'] = array( + 'watchdog' => array( + db_connection => 'logginghost', + 'read_preference' => array( + 'preference' => 'secondaryPreferred', + 'tags' => array( + array('dc' => 'east', 'use' => 'reporting'), + array('dc' => 'west'), + ), + ), + ), +); If a collection has no alias specified in 'mongodb_collections', then the alias 'default' is used (as noted above, the 'default' connection alias diff --git a/mongodb.drush.inc b/mongodb.drush.inc index b53e5ff..0b83521 100644 --- a/mongodb.drush.inc +++ b/mongodb.drush.inc @@ -113,7 +113,7 @@ function drush_mongodb_query($alias = 'default', $query = '') { } /** - * Returns the basic shell commando string. + * Returns the basic shell command string. */ function _drush_mongodb_connect($alias) { $connections = variable_get('mongodb_connections', array()); diff --git a/mongodb.module b/mongodb.module index 4460e84..a11dc56 100644 --- a/mongodb.module +++ b/mongodb.module @@ -30,7 +30,20 @@ function mongodb($alias = 'default') { $options = $connection['connection_options'] + array('connect' => TRUE); $db = $connection['db']; if (!isset($mongo_objects[$host][$db])) { - $mongo = new Mongo($host, $options); + // Use the 1.3 client if available. + if (class_exists('MongoClient')) { + $mongo = new MongoClient($host, $options); + // Enable read preference and tags if provided. This can also be + // controlled on a per query basis at the cursor level if more control + // is required. + if (!empty($connection['read_preference'])) { + $tags = !empty($connection['read_preference']['tags']) ? $connection['read_preference']['tags'] : array(); + $mongo->setReadPreference($connection['read_preference']['preference'], $tags); + } + } + else { + $mongo = new Mongo($host, $options); + } $mongo_objects[$host][$db] = $mongo->selectDB($db); $mongo_objects[$host][$db]->connection = $mongo; } @@ -52,12 +65,27 @@ function mongodb_collection() { $prefixed = mongodb_collection_name($collection_name); } $collections = variable_get('mongodb_collections', array()); - $alias = isset($collections[$collection_name]) ? $collections[$collection_name] : 'default'; + + if (isset($collections[$collection_name])) { + // We might be dealing with an array or string because of need to preserve + // backwards comptability. + $alias = !empty($collections[$collection_name]['db_connection']) ? $collections[$collection_name]['db_connection'] : $collections[$collection_name]; + } + else { + $alias = 'default'; + } + // Prefix the collection name for simpletest. It will be in the same DB as the // non-prefixed version so it's enough to prefix after choosing the mongodb // object. $mongodb_object = mongodb($alias); $collection = $mongodb_object->selectCollection(mongodb_collection_name($collection_name)); + // Enable read preference and tags at a collection level if we have 1.3 + // client. + if (!empty($collections[$alias]['read_preference']) && get_class($mongodb_object->connection) == 'MongoClient') { + $tags = !empty($collections[$alias]['read_preference']['tags']) ? $collections[$alias]['read_preference']['tags'] : array(); + $collection->setReadPreference($collections[$alias]['read_preference']['preference'], $tags); + } $collection->connection = $mongodb_object->connection; return variable_get('mongodb_debug', FALSE) ? new mongoDebugCollection($collection) : $collection; } @@ -140,7 +168,12 @@ function mongodb_test_group_finished() { */ function mongodb_set_active_connection($alias, $connection_name = 'default') { // No need to check if the connection is valid as mongodb() does this. - $GLOBALS['conf']['mongodb_collections'][$alias] = $connection_name; + if (!empty($GLOBALS['conf']['mongodb_collections'][$alias]['db_connection'])) { + $GLOBALS['conf']['mongodb_collections'][$alias]['db_connection'] = $connection_name; + } + else { + $GLOBALS['conf']['mongodb_collections'][$alias] = $connection_name; + } } /** diff --git a/mongodb_block/mongodb_block.module b/mongodb_block/mongodb_block.module index b0dba3b..f8a586c 100644 --- a/mongodb_block/mongodb_block.module +++ b/mongodb_block/mongodb_block.module @@ -262,13 +262,13 @@ function _mongodb_block_rehash($theme) { mongodb_block_save_block($collection, $block); } } - $collection->remove(array('_id' => array('$nin' => $ids))); + $collection->remove(array('_id' => array('$nin' => $ids)), array('safe' => TRUE)); return $blocks; } function mongodb_block_save_block($collection, $block) { unset($block['module'], $block['delta'], $block['status']); - $collection->save($block); + $collection->save($block, array('safe' => TRUE)); } /** diff --git a/mongodb_block_ui/mongodb_block_ui.admin.inc b/mongodb_block_ui/mongodb_block_ui.admin.inc index ab38bd0..9826249 100644 --- a/mongodb_block_ui/mongodb_block_ui.admin.inc +++ b/mongodb_block_ui/mongodb_block_ui.admin.inc @@ -479,14 +479,14 @@ function mongodb_block_ui_custom_block_delete($form, &$form_state, $theme, $modu */ function mongodb_block_ui_custom_block_delete_submit($form, &$form_state) { // Remove the custom block. - mongodb_collection('block_custom')->remove(array('_id' => $form_state['values']['delta'])); + mongodb_collection('block_custom')->remove(array('_id' => $form_state['values']['delta']), array('safe' => TRUE)); // Also remove customization for every theme. $id = array( 'module' => $form_state['values']['module'], 'delta' => $form_state['values']['delta'], ); foreach (list_themes() as $theme) { - mongodb_collection('block_customized', $theme->name)->remove(array('_id' => $id)); + mongodb_collection('block_customized', $theme->name)->remove(array('_id' => $id), array('safe' => TRUE)); } drupal_set_message(t('The block %name has been removed.', array('%name' => $form_state['values']['info']))); cache_clear_all(); diff --git a/mongodb_block_ui/mongodb_block_ui.module b/mongodb_block_ui/mongodb_block_ui.module index 174def0..f47db82 100644 --- a/mongodb_block_ui/mongodb_block_ui.module +++ b/mongodb_block_ui/mongodb_block_ui.module @@ -450,10 +450,7 @@ function mongodb_block_ui_custom_block_form($edit = array()) { function mongodb_block_ui_menu_delete($menu) { $collection = mongodb_collection('block_custom'); - $collection->delete(array( - 'module' => 'menu', - 'delta' => $menu['menu_name'], - )); + $collection->delete(array('module' => 'menu', 'delta' => $menu['menu_name']), array('safe' => TRUE)); // db_delete('mongodb_block_ui_role') // ->condition('module', 'menu') // ->condition('delta', $menu['menu_name']) diff --git a/mongodb_cache/mongodb_cache.inc b/mongodb_cache/mongodb_cache.inc index a2b9d46..acbf7f6 100644 --- a/mongodb_cache/mongodb_cache.inc +++ b/mongodb_cache/mongodb_cache.inc @@ -60,7 +60,7 @@ class DrupalMongoDBCache implements DrupalCacheInterface { $find = array('expire' => array('$lte' => $cache_flush, '$ne' => CACHE_PERMANENT)); - $collection->remove($find); + $collection->remove($find, array('w' => 0)); } } @@ -119,7 +119,7 @@ class DrupalMongoDBCache implements DrupalCacheInterface { $collection = mongodb_collection($this->bin); - $collection->save($entry); + $collection->save($entry, array('w' => 0)); } function clear($cid = NULL, $wildcard = FALSE) { @@ -144,34 +144,34 @@ class DrupalMongoDBCache implements DrupalCacheInterface { // Clear the cache for everyone, cache_lifetime seconds have // passed since the first request to clear the cache. - $collection->remove(array('expire' => array('$ne' => CACHE_PERMANENT, '$lte' => REQUEST_TIME))); + $collection->remove(array('expire' => array('$ne' => CACHE_PERMANENT, '$lte' => REQUEST_TIME)), array('w' => 0)); variable_set('cache_flush_' . $this->bin, 0); } } else { // No minimum cache lifetime, flush all temporary cache entries now. - $collection->remove(array('expire' => array('$ne' => CACHE_PERMANENT, '$lte' => REQUEST_TIME))); + $collection->remove(array('expire' => array('$ne' => CACHE_PERMANENT, '$lte' => REQUEST_TIME)), array('w' => 0)); } } else { if ($wildcard) { if ($cid == '*') { - $collection->remove(); + $collection->remove(array(), array('w' => 0)); } else { - $collection->remove(array('cid' => new MongoRegex('/'. preg_quote($cid) .'.*/'))); + $collection->remove(array('cid' => new MongoRegex('/'. preg_quote($cid) .'.*/')), array('w' => 0)); } } elseif (is_array($cid)) { // Delete in chunks when a large array is passed. do { $find = array('cid' => array('$in' => array_map('strval', array_splice($cid, 0, 1000)))); - $collection->remove($find); + $collection->remove($find, array('w' => 0)); } while (count($cid)); } else { - $collection->remove(array('_id' => (string)$cid)); + $collection->remove(array('_id' => (string)$cid), array('w' => 0)); } } } diff --git a/mongodb_field_storage/mongodb_field_storage.module b/mongodb_field_storage/mongodb_field_storage.module index a9d274e..fac540b 100644 --- a/mongodb_field_storage/mongodb_field_storage.module +++ b/mongodb_field_storage/mongodb_field_storage.module @@ -199,7 +199,7 @@ function mongodb_field_storage_field_storage_write($entity_type, $entity, $op, $ } if (isset($revision_id)) { $new_entity->_id = (int) $revision_id; - mongodb_collection('fields_revision', $entity_type)->save($new_entity); + mongodb_collection('fields_revision', $entity_type)->save($new_entity, array('safe' => TRUE)); } } @@ -224,13 +224,9 @@ function _mongodb_field_storage_value($type, $value) { function mongodb_field_storage_field_storage_delete($entity_type, $entity, $fields) { list($entity_id, $revision_id, $bundle) = entity_extract_ids($entity_type, $entity); - mongodb_collection('fields_current', $entity_type)->remove(array( - '_id' => (int) $entity_id, - )); + mongodb_collection('fields_current', $entity_type)->remove(array('_id' => (int) $entity_id), array('safe' => TRUE)); $entity_info = entity_get_info($entity_type); - mongodb_collection('fields_revision', $entity_type)->remove(array( - $entity_info['entity keys']['id'] => (int) $entity_id, - )); + mongodb_collection('fields_revision', $entity_type)->remove(array($entity_info['entity keys']['id'] => (int) $entity_id), array('safe' => TRUE)); } /** @@ -240,9 +236,7 @@ function mongodb_field_storage_field_storage_delete($entity_type, $entity, $fiel */ function mongodb_field_storage_field_storage_delete_revision($entity_type, $entity, $fields) { list($entity_id, $revision_id, $bundle) = entity_extract_ids($entity_type, $entity); - mongodb_collection('fields_revision', $entity_type)->remove(array( - '_id' => (int) $revision_id, - )); + mongodb_collection('fields_revision', $entity_type)->remove(array('_id' => (int) $revision_id), array('safe' => TRUE)); } /** @@ -394,8 +388,8 @@ function _mongodb_field_storage_query_value($value, $operator) { * Implement hook_field_attach_rename_bundle(). */ function mongodb_field_storage_field_attach_rename_bundle($entity_type, $bundle_old, $bundle_new) { - mongodb_collection('fields_current', $entity_type)->update(array('_bundle' => $bundle_old), array('_bundle' => $bundle_new), array('multiple' => TRUE)); - mongodb_collection('fields_revision', $entity_type)->update(array('_bundle' => $bundle_old), array('_bundle' => $bundle_new), array('multiple' => TRUE)); + mongodb_collection('fields_current', $entity_type)->update(array('_bundle' => $bundle_old), array('_bundle' => $bundle_new), array('multiple' => TRUE), array('safe' => TRUE)); + mongodb_collection('fields_revision', $entity_type)->update(array('_bundle' => $bundle_old), array('_bundle' => $bundle_new), array('multiple' => TRUE), array('safe' => TRUE)); } /** @@ -417,8 +411,8 @@ function mongodb_field_storage_entity_update($entity, $entity_type) { */ function mongodb_field_storage_field_attach_delete($entity_type, $entity) { list($entity_id, $revision_id) = entity_extract_ids($entity_type, $entity); - mongodb_collection('fields_current', $entity_type)->remove(array('_id' => (int) $entity_id)); + mongodb_collection('fields_current', $entity_type)->remove(array('_id' => (int) $entity_id), array('safe' => TRUE)); if ($revision_id) { - mongodb_collection('fields_revision', $entity_type)->remove(array('_id' => (int) $revision_id)); + mongodb_collection('fields_revision', $entity_type)->remove(array('_id' => (int) $revision_id), array('safe' => TRUE)); } } diff --git a/mongodb_queue/mongodb_queue.inc b/mongodb_queue/mongodb_queue.inc index 7312ff0..6ec9d32 100644 --- a/mongodb_queue/mongodb_queue.inc +++ b/mongodb_queue/mongodb_queue.inc @@ -20,7 +20,7 @@ class MongoDBQueue implements DrupalQueueInterface { 'expire' => 0, ); return mongodb_collection($this->collection) - ->insert($item); + ->insert($item, array('safe' => TRUE)); } public function numberOfItems() { @@ -48,12 +48,12 @@ class MongoDBQueue implements DrupalQueueInterface { public function releaseItem($item) { return mongodb_collection($this->collection) - ->update(array('_id' => $item->_id), array('$set' => array('expire' => 0))); + ->update(array('_id' => $item->_id), array('$set' => array('expire' => 0)), array('safe' => TRUE)); } public function deleteItem($item) { mongodb_collection($this->collection) - ->remove(array('_id' => $item->_id)); + ->remove(array('_id' => $item->_id), array('safe' => TRUE)); } public function createQueue() { diff --git a/mongodb_queue/mongodb_queue.module b/mongodb_queue/mongodb_queue.module index 93224a9..fda79ed 100644 --- a/mongodb_queue/mongodb_queue.module +++ b/mongodb_queue/mongodb_queue.module @@ -27,7 +27,7 @@ function mongodb_queue_cron() { if (strpos($collection_name, '$') === FALSE && substr($collection_name, 0, $n) == "$db.") { // Split off the name of the database. $db->selectCollection(substr($collection_data['name'], $n)) - ->update($update_query, array('$set' => array('expire' => 0)), array('multiple' => TRUE)); + ->update($update_query, array('$set' => array('expire' => 0)), array('multiple' => TRUE, 'safe' => TRUE)); } } } diff --git a/mongodb_session/mongodb_session.inc b/mongodb_session/mongodb_session.inc index ca61492..8a400b2 100644 --- a/mongodb_session/mongodb_session.inc +++ b/mongodb_session/mongodb_session.inc @@ -117,7 +117,7 @@ function _drupal_session_read($sid) { elseif (!isset($user->roles)) { include_once dirname(__FILE__) . '/mongodb_session.module'; $user->roles = _mongodb_session_get_roles($user); - $collection->update(array('_id' => (int) $user->uid), array('$set' => array('roles' => $user->roles))); + $collection->update(array('_id' => (int) $user->uid), array('$set' => array('roles' => $user->roles)), array('w' => 0)); } $user->session = $session['session']; } @@ -156,8 +156,8 @@ function _drupal_session_write($sid, $value) { } $fields = array( - 'uid' => (int)$user->uid, - 'cache' => isset($user->cache) ? (int)$user->cache : 0, + 'uid' => (int) $user->uid, + 'cache' => isset($user->cache) ? (int) $user->cache : 0, 'hostname' => ip_address(), 'session' => $value, 'timestamp' => REQUEST_TIME, @@ -179,7 +179,7 @@ function _drupal_session_write($sid, $value) { $collection = mongodb_collection('session'); $collection - ->update($key, $key + $fields, array('upsert' => TRUE)); + ->update($key, $key + $fields, array('upsert' => TRUE), array('w' => 0)); // Last access time is updated no more frequently than once every 180 seconds. // This reduces contention in the users table. @@ -193,7 +193,7 @@ function _drupal_session_write($sid, $value) { ->condition('uid', $user->uid) ->execute(); // Update MongoDB because we prefer that. - mongodb_collection('fields_current', 'user')->update(array('_id' => (int) $user->uid), array('$set' => $fields)); + mongodb_collection('fields_current', 'user')->update(array('_id' => (int) $user->uid), array('$set' => $fields), array('w' => 0)); // Throw a bone to entitycache, too. if (variable_get('entitycache_enabled', TRUE)) { cache_clear_all($user->uid, 'cache_entity_user'); @@ -324,7 +324,7 @@ function drupal_session_regenerate() { if (isset($old_session_id)) { $field = $is_https ? 'ssid' : 'sid'; mongodb_collection('session') - ->update(array('sid' => $old_session_id), array('$set' => array($field => session_id()))); + ->update(array('sid' => $old_session_id), array('$set' => array($field => session_id())), array('w' => 0)); } } @@ -341,7 +341,7 @@ function _drupal_session_destroy($sid) { $field = $is_https ? 'ssid' : 'sid'; mongodb_collection('session') - ->remove(array($field => $sid)); + ->remove(array($field => $sid), array('w' => 0)); // Reset $_SESSION and $user to prevent a new session from being started // in drupal_session_commit(). @@ -379,7 +379,7 @@ function _drupal_session_delete_cookie($name, $force_insecure = FALSE) { */ function drupal_session_destroy_uid($uid) { mongodb_collection('session') - ->remove(array('uid' => $uid)); + ->remove(array('uid' => $uid), array('w' => 0)); } /** @@ -398,7 +398,7 @@ function _drupal_session_garbage_collection($lifetime) { // to '1814400'. At that value, only after a user doesn't log in after // three weeks (1814400 seconds) will his/her session be removed. mongodb_collection('session') - ->remove(array('timestamp' => array('$lt' => REQUEST_TIME - $lifetime))); + ->remove(array('timestamp' => array('$lt' => REQUEST_TIME - $lifetime)), array('w' => 0)); return TRUE; } diff --git a/mongodb_session/mongodb_session.module b/mongodb_session/mongodb_session.module index b13fe3a..efe84af 100644 --- a/mongodb_session/mongodb_session.module +++ b/mongodb_session/mongodb_session.module @@ -26,7 +26,7 @@ function mongodb_session_user_update($edit, $account) { foreach (array('uid', 'created', 'access', 'login', 'status', 'picture') as $key) { $save[$key] = (int) $save[$key]; } - mongodb_collection('fields_current', 'user')->save($save); + mongodb_collection('fields_current', 'user')->save($save, array('safe' => TRUE)); } return $roles; } diff --git a/mongodb_watchdog/mongodb_watchdog.module b/mongodb_watchdog/mongodb_watchdog.module index a559f50..ecfb8c0 100644 --- a/mongodb_watchdog/mongodb_watchdog.module +++ b/mongodb_watchdog/mongodb_watchdog.module @@ -61,7 +61,7 @@ function mongodb_watchdog_watchdog(array $log_entry) { ); $collection = mongodb_collection('watchdog'); $id = md5($log_entry['function'] . ':' . $log_entry['line'] . ':' . $log_entry['severity'] . ':' . $log_entry['type'] . ':' . $log_entry['message']); - $collection->update(array('_id' => $id), $newobj, array('upsert' => TRUE)); + $collection->update(array('_id' => $id), $newobj, array('upsert' => TRUE, 'safe' => TRUE)); $result = $collection->db->command(array('getlasterror' => 1)); $collection = $collection->db->selectCollection('watchdog_event_' . $id); if (empty($result['updatedExisting'])) { @@ -69,7 +69,7 @@ function mongodb_watchdog_watchdog(array $log_entry) { $command = array('create' => $collection->getName(), 'capped' => TRUE, 'size' => $max * 1000, "max" => $max); $collection->db->command($command); } - $collection->insert($event); + $collection->insert($event, array('safe' => TRUE)); } /**