package edu.hawaii.ics.yucheng;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
/**
* A class implements Runnable. The run method takes a list of SQL statements.
* They are either create or drop queries. The queries will be executed
* accordingly on the distributed database system.
*
* @author Cheng Jade
* @assignment ICS 421 Assignment 4
* @date Mar 22, 2010
* @bugs None
*/
public class CreateOrDrop implements Runnable {
/* the create or drop configuration object. */
public final CreateOrDropConfiguration configuration;
/* the catalog node extracted from the overall configuration. */
public final ConfigurationNode catalog;
/* a list of SQL commands to be executed. */
private final SQLList sqls;
/* a record of whether all SQL statements have executed successfully. */
private Boolean success = null;
/**
* Initialize a new instance of this object
*/
public CreateOrDrop(final CreateOrDropConfiguration configuration, final ArrayList<String> sqls) {
if (null == configuration)
throw new NullPointerException("CreateOrDrop");
this.configuration = configuration;
this.catalog = this.configuration.catalog;
this.sqls = (SQLList)sqls;
}
/**
* The main routine that takes a list of commands and execute on all the
* nodes in the configuration object.
*/
public void run() {
try {
// Prepare the 'DTABLES' in the catalog database. If the table does
// not exist, a new one is created.
prepareCatalog(this.catalog);
// Start threads for each configuration node.
final int count = configuration.nodes.size();
final Thread[] threads = new Thread[count];
for (int i = 0; i < count; i++) {
final ConfigurationNode node = configuration.nodes.get(i);
threads[i] = new Thread(new Runner(node, this.sqls));
threads[i].start();
}
// wait for all the threads to finish.
for (int i = 0; i < count; i++)
DistributedDB.join(threads[i], configuration.nodes.get(i));
// Print message indicating if all commands completed successfully.
if (this.success == null || this.success == false)
System.out.println("Not all DDL commands executed successfully.");
else
System.out.println("All DDL commands executed successfully.");
} catch (final ProgramException e) {
System.err.println(e.getMessage());
System.exit(1);
return;
}
}
/**
* A private runner object.
*/
private class Runner implements Runnable {
/** The configuration node associated with a single thread. */
private final ConfigurationNode node;
/** The SQL statement to be executed. */
private final SQLList commands;
/**
* Initializes a new instance of the RunSQL Runner.
*
* @param node
* The cluster node associated with this instance.
*
* @param command
* The command to execute.
*/
public Runner(final ConfigurationNode node, final SQLList commands) {
assert null != node;
assert null != commands;
this.node = node;
this.commands = commands;
}
/**
* Executes the DDL commands for the node associated with this instance.
*/
public void run() {
try {
this.node.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
for (final String command : commands) {
node.log(System.out, "Executing: " + command);
statement.executeUpdate(command);
updateCatalog(command, node);
node.log(System.out, "Statement executed successfully.");
}
}
});
success = true;
} catch (final ProgramException e) {
success = false;
this.node.log(System.err, e.getMessage());
}
}
}
/**
* Ensures the catalog 'DTABLES' table exists.
*
* @throws ProgramException
* Thrown if there is a problem verifying the table exists or if
* the table cannot be created.
*/
private static void prepareCatalog(final ConfigurationNode catalog) throws ProgramException {
assert null != catalog;
// Open the connection, check if the table exists, and create the table
// if it does not exist.
catalog.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
if (!DistributedDB.tableExists(statement.getConnection(), "DTABLES")) {
catalog.log(System.out, "Creating table CATALOG.DTABLES.");
statement.executeUpdate(newCreateTableCommand());
}
}
});
}
/**
* Updates the catalog database based on a successfully executed DDL command
* by one of the threads.
*
* @param command
* The DDL command that just completed successfully.
*
* @throws ProgramException
* Thrown if there is an error updating the catalog database.
*/
private void updateCatalog(final String command, final ConfigurationNode node)
throws ProgramException {
assert null != command;
// Get the table name. If there is none, return immediately.
final String tableName;
if (null == (tableName = DistributedDB.tableName(command)))
return;
try {
this.catalog.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
// Check for CREATE and DROP commands; in these cases,
// add or remove a row from the DTABLES table, respectively.
final String catalogCommand;
if (DistributedDB.isCommandType(command, "CREATE"))
catalogCommand = newInsertRowCommand(tableName, node);
else if (DistributedDB.isCommandType(command, "DROP"))
catalogCommand = newDeleteRowCommand(tableName, node);
else {
catalog.log(System.err, "Unsupported command: " + command);
return;
}
catalog.log(System.out, "Updating catalog: " + catalogCommand);
statement.executeUpdate(catalogCommand);
}
});
} catch (final Exception e) {
this.success = false;
this.catalog.log(System.err, "Failed to update catalog dtables. " + e.getMessage());
}
}
/**
* Creates and returns a new DDL command to create the 'DTABLES' catalog
* table.
*
* @return The new DDL command to create the 'DTABLES' catalog table.
*/
private static String newCreateTableCommand() {
final StringBuilder builder = new StringBuilder();
builder.append("CREATE TABLE DTABLES (");
builder.append("TNAME CHAR(32), ");
builder.append("NODEDRIVER CHAR(64), ");
builder.append("NODEURL CHAR(128), ");
builder.append("NODEUSER CHAR(16), ");
builder.append("NODEPASSWD CHAR(16), ");
builder.append("PARTMTD INT, ");
builder.append("NODEID INT, ");
builder.append("PARTCOL CHAR(32), ");
builder.append("PARTPARAM1 CHAR(32), ");
builder.append("PARTPARAM2 CHAR(32))");
return builder.toString();
}
/**
* Creates and returns a new DDL command to delete a row from the 'DTABLES'
* catalog table.
*
* @param tableName
* The name of the table to delete.
*
* @return A new DDL command to delete a row from the 'DTABLES' catalog
* table.
*/
private String newDeleteRowCommand(final String tableName, final ConfigurationNode node) {
assert null != tableName;
final StringBuilder builder = new StringBuilder();
builder.append("DELETE FROM DTABLES WHERE TNAME = ");
builder.append(DistributedDB.quote(tableName));
builder.append(" AND NODEID = ");
builder.append(node.name.substring(4));
return builder.toString();
}
/**
* Creates and returns a new DDL command to insert a row from the 'DTABLES'
* catalog table.
*
* @param tableName
* The name of the table to insert.
*
* @return A new DDL command to insert a row from the 'DTABLES' catalog
* table.
*/
private String newInsertRowCommand(final String tableName, final ConfigurationNode node) {
assert null != tableName;
final StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO DTABLES VALUES (");
builder.append(DistributedDB.quote(tableName));
builder.append(", ");
builder.append(DistributedDB.quote(node.driver));
builder.append(", ");
builder.append(DistributedDB.quote(node.hostname));
builder.append(", ");
builder.append(DistributedDB.quote(node.username));
builder.append(", ");
builder.append(DistributedDB.quote(node.password));
builder.append(", NULL, ");
builder.append(node.name.substring(4));
builder.append(", NULL");
builder.append(", NULL");
builder.append(", NULL)");
return builder.toString();
}
public String toString() {
return this.configuration.toString();
}
}