package edu.hawaii.ics.yucheng; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; /** * A class implements Runnable. The run method takes a list of SQL statements. * They are either create or drop queries. The queries will be executed * accordingly on the distributed database system. * * @author Cheng Jade * @assignment ICS 421 Project * @date Mar 22, 2010 * @bugs None */ public class CreateOrDrop implements Runnable { /* the create or drop configuration object. */ public final CreateOrDropConfiguration configuration; /* the catalog node extracted from the overall configuration. */ public final ConfigurationNode catalog; /* a list of SQL commands to be executed. */ private final SQLList sqls; /* a record of whether all SQL statements have executed successfully. */ private Boolean success = null; /** * Initialize a new instance of this object */ public CreateOrDrop(final CreateOrDropConfiguration configuration, final ArrayList<String> sqls) { if (null == configuration) throw new NullPointerException("CreateOrDrop"); this.configuration = configuration; this.catalog = this.configuration.catalog; this.sqls = (SQLList) sqls; } /** * The main routine that takes a list of commands and execute on all the * nodes in the configuration object. */ public void run() { try { // Prepare the 'DTABLES' in the catalog database. If the table does // not exist, a new one is created. prepareCatalog(this.catalog); // Start threads for each configuration node. final int count = this.configuration.nodes.size(); final Thread[] threads = new Thread[count]; for (int i = 0; i < count; i++) { final ConfigurationNode node = this.configuration.nodes.get(i); threads[i] = new Thread(new Runner(node, this.sqls)); threads[i].start(); } // wait for all the threads to finish. for (int i = 0; i < count; i++) DistributedDB.join(threads[i], this.configuration.nodes.get(i)); // Print message indicating if all commands completed successfully. if (this.success == null || this.success == false) System.out.println("Not all DDL commands executed successfully."); else System.out.println("All DDL commands executed successfully."); } catch (final ProgramException e) { System.err.println(e.getMessage()); System.exit(1); return; } } /** * A private runner object. */ private class Runner implements Runnable { /** The configuration node associated with a single thread. */ private final ConfigurationNode node; /** The SQL statement to be executed. */ private final SQLList commands; /** * Initializes a new instance of the RunSQL Runner. * * @param node * The cluster node associated with this instance. * * @param command * The command to execute. */ public Runner(final ConfigurationNode node, final SQLList commands) { assert null != node; assert null != commands; this.node = node; this.commands = commands; } /** * Executes the DDL commands for the node associated with this instance. */ public void run() { try { this.node.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { for (final String command : Runner.this.commands) { Runner.this.node.log(System.out, "Executing: " + command); statement.executeUpdate(command); CreateOrDrop.this.updateCatalog(command, Runner.this.node); Runner.this.node.log(System.out, "Statement executed successfully."); } } }); CreateOrDrop.this.success = true; } catch (final ProgramException e) { CreateOrDrop.this.success = false; this.node.log(System.err, e.getMessage()); } } } /** * Ensures the catalog 'DTABLES' table exists. * * @throws ProgramException * Thrown if there is a problem verifying the table exists or if * the table cannot be created. */ private static void prepareCatalog(final ConfigurationNode catalog) throws ProgramException { assert null != catalog; // Open the connection, check if the table exists, and create the table // if it does not exist. catalog.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { if (!DistributedDB.tableExists(statement.getConnection(), "DTABLES")) { catalog.log(System.out, "Creating table CATALOG.DTABLES."); statement.executeUpdate(newCreateTableCommand()); } } }); } /** * Updates the catalog database based on a successfully executed DDL command * by one of the threads. * * @param command * The DDL command that just completed successfully. * * @throws ProgramException * Thrown if there is an error updating the catalog database. */ private void updateCatalog(final String command, final ConfigurationNode node) throws ProgramException { assert null != command; // Get the table name. If there is none, return immediately. final String tableName; if (null == (tableName = DistributedDB.tableName(command))) return; try { this.catalog.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { // Check for CREATE and DROP commands; in these cases, // add or remove a row from the DTABLES table, respectively. final String catalogCommand; if (DistributedDB.isCommandType(command, "CREATE")) catalogCommand = CreateOrDrop.this.newInsertRowCommand(tableName, node); else if (DistributedDB.isCommandType(command, "DROP")) catalogCommand = CreateOrDrop.this.newDeleteRowCommand(tableName, node); else { CreateOrDrop.this.catalog.log(System.err, "Unsupported command: " + command); return; } CreateOrDrop.this.catalog.log(System.out, "Updating catalog: " + catalogCommand); statement.executeUpdate(catalogCommand); } }); } catch (final Exception e) { this.success = false; this.catalog.log(System.err, "Failed to update catalog dtables. " + e.getMessage()); } } /** * Creates and returns a new DDL command to create the 'DTABLES' catalog * table. * * @return The new DDL command to create the 'DTABLES' catalog table. */ private static String newCreateTableCommand() { final StringBuilder builder = new StringBuilder(); builder.append("CREATE TABLE DTABLES ("); builder.append("TNAME CHAR(32), "); builder.append("NODEDRIVER CHAR(64), "); builder.append("NODEURL CHAR(128), "); builder.append("NODEUSER CHAR(16), "); builder.append("NODEPASSWD CHAR(16), "); builder.append("PARTMTD INT, "); builder.append("NODEID INT, "); builder.append("PARTCOL CHAR(32), "); builder.append("PARTPARAM1 CHAR(32), "); builder.append("PARTPARAM2 CHAR(32))"); return builder.toString(); } /** * Creates and returns a new DDL command to delete a row from the 'DTABLES' * catalog table. * * @param tableName * The name of the table to delete. * * @return A new DDL command to delete a row from the 'DTABLES' catalog * table. */ private String newDeleteRowCommand(final String tableName, final ConfigurationNode node) { assert null != tableName; final StringBuilder builder = new StringBuilder(); builder.append("DELETE FROM DTABLES WHERE TNAME = "); builder.append(DistributedDB.quote(tableName)); builder.append(" AND NODEID = "); builder.append(node.name.substring(4)); return builder.toString(); } /** * Creates and returns a new DDL command to insert a row from the 'DTABLES' * catalog table. * * @param tableName * The name of the table to insert. * * @return A new DDL command to insert a row from the 'DTABLES' catalog * table. */ private String newInsertRowCommand(final String tableName, final ConfigurationNode node) { assert null != tableName; final StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO DTABLES VALUES ("); builder.append(DistributedDB.quote(tableName)); builder.append(", "); builder.append(DistributedDB.quote(node.driver)); builder.append(", "); builder.append(DistributedDB.quote(node.hostname)); builder.append(", "); builder.append(DistributedDB.quote(node.username)); builder.append(", "); builder.append(DistributedDB.quote(node.password)); builder.append(", NULL, "); builder.append(node.name.substring(4)); builder.append(", NULL"); builder.append(", NULL"); builder.append(", NULL)"); return builder.toString(); } @Override public String toString() { return this.configuration.toString(); } }