name = $name; $this->connection = $connection; } /** * {@inheritdoc} */ public function createItem($data) { $try_again = FALSE; try { $id = $this->doCreateItem($data); } catch (\Exception $e) { // If there was an exception, try to create the table. if (!$try_again = $this->ensureTableExists()) { // If the exception happened for other reason than the missing table, // propagate the exception. throw $e; } } // Now that the table has been created, try again if necessary. if ($try_again) { $id = $this->doCreateItem($data); } return $id; } /** * Adds a queue item and store it directly to the queue. * * @param $data * Arbitrary data to be associated with the new task in the queue. * * @return * A unique ID if the item was successfully created and was (best effort) * added to the queue, otherwise FALSE. We don't guarantee the item was * committed to disk etc, but as far as we know, the item is now in the * queue. */ protected function doCreateItem($data) { $query = $this->connection->insert(static::TABLE_NAME) ->fields([ 'name' => $this->name, 'data' => serialize($data), // We cannot rely on REQUEST_TIME because many items might be created // by a single request which takes longer than 1 second. 'created' => time(), ]); // Return the new serial ID, or FALSE on failure. return $query->execute(); } /** * {@inheritdoc} */ public function numberOfItems() { try { return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE name = :name', [':name' => $this->name]) ->fetchField(); } catch (\Exception $e) { $this->catchException($e); // If there is no table there cannot be any items. return 0; } } /** * {@inheritdoc} */ public function claimItem($lease_time = 30) { // Claim an item by updating its expire fields. If claim is not successful // another thread may have claimed the item in the meantime. Therefore loop // until an item is successfully claimed or we are reasonably sure there // are no unclaimed items left. while (TRUE) { try { $item = $this->connection->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, [':name' => $this->name])->fetchObject(); } catch (\Exception $e) { $this->catchException($e); // If the table does not exist there are no items currently available to // claim. return FALSE; } if ($item) { // Try to update the item. Only one thread can succeed in UPDATEing the // same row. We cannot rely on REQUEST_TIME because items might be // claimed by a single consumer which runs longer than 1 second. If we // continue to use REQUEST_TIME instead of the current time(), we steal // time from the lease, and will tend to reset items before the lease // should really expire. $update = $this->connection->update(static::TABLE_NAME) ->fields([ 'expire' => time() + $lease_time, ]) ->condition('item_id', $item->item_id) ->condition('expire', 0); // If there are affected rows, this update succeeded. if ($update->execute()) { $item->data = unserialize($item->data); return $item; } } else { // No items currently available to claim. return FALSE; } } } /** * {@inheritdoc} */ public function releaseItem($item) { try { $update = $this->connection->update(static::TABLE_NAME) ->fields([ 'expire' => 0, ]) ->condition('item_id', $item->item_id); return $update->execute(); } catch (\Exception $e) { $this->catchException($e); // If the table doesn't exist we should consider the item released. return TRUE; } } /** * {@inheritdoc} */ public function deleteItem($item) { try { $this->connection->delete(static::TABLE_NAME) ->condition('item_id', $item->item_id) ->execute(); } catch (\Exception $e) { $this->catchException($e); } } /** * {@inheritdoc} */ public function createQueue() { // All tasks are stored in a single database table (which is created on // demand) so there is nothing we need to do to create a new queue. } /** * {@inheritdoc} */ public function deleteQueue() { try { $this->connection->delete(static::TABLE_NAME) ->condition('name', $this->name) ->execute(); } catch (\Exception $e) { $this->catchException($e); } } /** * {@inheritdoc} */ public function garbageCollection() { try { // Clean up the queue for failed batches. $this->connection->delete(static::TABLE_NAME) ->condition('created', REQUEST_TIME - 864000, '<') ->condition('name', 'drupal_batch:%', 'LIKE') ->execute(); // Reset expired items in the default queue implementation table. If that's // not used, this will simply be a no-op. $this->connection->update(static::TABLE_NAME) ->fields([ 'expire' => 0, ]) ->condition('expire', 0, '<>') ->condition('expire', REQUEST_TIME, '<') ->execute(); } catch (\Exception $e) { $this->catchException($e); } } /** * Check if the table exists and create it if not. */ protected function ensureTableExists() { try { $database_schema = $this->connection->schema(); if (!$database_schema->tableExists(static::TABLE_NAME)) { $schema_definition = $this->schemaDefinition(); $database_schema->createTable(static::TABLE_NAME, $schema_definition); return TRUE; } } // If another process has already created the queue table, attempting to // recreate it will throw an exception. In this case just catch the // exception and do nothing. catch (SchemaObjectExistsException $e) { return TRUE; } return FALSE; } /** * Act on an exception when queue might be stale. * * If the table does not yet exist, that's fine, but if the table exists and * yet the query failed, then the queue is stale and the exception needs to * propagate. * * @param $e * The exception. * * @throws \Exception * If the table exists the exception passed in is rethrown. */ protected function catchException(\Exception $e) { if ($this->connection->schema()->tableExists(static::TABLE_NAME)) { throw $e; } } /** * Defines the schema for the queue table. * * @internal */ public function schemaDefinition() { return [ 'description' => 'Stores items in queues.', 'fields' => [ 'item_id' => [ 'type' => 'serial', 'unsigned' => TRUE, 'not null' => TRUE, 'description' => 'Primary Key: Unique item ID.', ], 'name' => [ 'type' => 'varchar_ascii', 'length' => 255, 'not null' => TRUE, 'default' => '', 'description' => 'The queue name.', ], 'data' => [ 'type' => 'blob', 'not null' => FALSE, 'size' => 'big', 'serialize' => TRUE, 'description' => 'The arbitrary data for the item.', ], 'expire' => [ 'type' => 'int', 'not null' => TRUE, 'default' => 0, 'description' => 'Timestamp when the claim lease expires on the item.', ], 'created' => [ 'type' => 'int', 'not null' => TRUE, 'default' => 0, 'description' => 'Timestamp when the item was created.', ], ], 'primary key' => ['item_id'], 'indexes' => [ 'name_created' => ['name', 'created'], 'expire' => ['expire'], ], ]; } }