CreateOrDrop.java

package edu.hawaii.ics.yucheng;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;

/**
 * A class implements Runnable. The run method takes a list of SQL statements.
 * They are either create or drop queries.  The queries will be executed 
 * accordingly on the distributed database system.
 * 
 * @author     Cheng Jade
 * @assignment ICS 421 Assignment 4
 * @date       Mar 22, 2010
 * @bugs       None
 */
public class CreateOrDrop implements Runnable {
    
    /* the create or drop configuration object. */
    public final CreateOrDropConfiguration configuration;

    /* the catalog node extracted from the overall configuration. */
    public final ConfigurationNode catalog;

    /* a list of SQL commands to be executed. */
    private final SQLList sqls;

    /* a record of whether all SQL statements have executed successfully. */
    private Boolean success = null;

    /**
     * Initialize a new instance of this object
     */
    public CreateOrDrop(final CreateOrDropConfiguration configuration, final ArrayList<String> sqls) {
        if (null == configuration)
            throw new NullPointerException("CreateOrDrop");
        this.configuration = configuration;
        this.catalog = this.configuration.catalog;
        this.sqls = (SQLList)sqls;
    }

    /**
     * The main routine that takes a list of commands and execute on all the
     * nodes in the configuration object.
     */
    public void run() {
        try {

            // Prepare the 'DTABLES' in the catalog database. If the table does
            // not exist, a new one is created.
            prepareCatalog(this.catalog);

            // Start threads for each configuration node.
            final int count = configuration.nodes.size();
            final Thread[] threads = new Thread[count];
            for (int i = 0; i < count; i++) {
                final ConfigurationNode node = configuration.nodes.get(i);
                threads[i] = new Thread(new Runner(node, this.sqls));
                threads[i].start();
            }

            // wait for all the threads to finish.
            for (int i = 0; i < count; i++)
                DistributedDB.join(threads[i], configuration.nodes.get(i));

            // Print message indicating if all commands completed successfully.
            if (this.success == null || this.success == false)
                System.out.println("Not all DDL commands executed successfully.");
            else
                System.out.println("All DDL commands executed successfully.");

        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            System.exit(1);
            return;
        }
    }

    /**
     * A private runner object.
     */
    private class Runner implements Runnable {

        /** The configuration node associated with a single thread. */
        private final ConfigurationNode node;

        /** The SQL statement to be executed. */
        private final SQLList commands;

        /**
         * Initializes a new instance of the RunSQL Runner.
         * 
         * @param node
         *            The cluster node associated with this instance.
         * 
         * @param command
         *            The command to execute.
         */
        public Runner(final ConfigurationNode node, final SQLList commands) {
            assert null != node;
            assert null != commands;

            this.node = node;
            this.commands = commands;
        }

        /**
         * Executes the DDL commands for the node associated with this instance.
         */
        public void run() {
            try {
                this.node.runStatement(new StatementRunner() {
                    public void run(Statement statement) throws ProgramException, SQLException {
                        for (final String command : commands) {
                            node.log(System.out, "Executing: " + command);
                            statement.executeUpdate(command);
                            updateCatalog(command, node);
                            node.log(System.out, "Statement executed successfully.");
                        }
                    }
                });
                success = true;
            } catch (final ProgramException e) {
                success = false;
                this.node.log(System.err, e.getMessage());
            }
        }
    }

    /**
     * Ensures the catalog 'DTABLES' table exists.
     * 
     * @throws ProgramException
     *             Thrown if there is a problem verifying the table exists or if
     *             the table cannot be created.
     */
    private static void prepareCatalog(final ConfigurationNode catalog) throws ProgramException {
        assert null != catalog;

        // Open the connection, check if the table exists, and create the table
        // if it does not exist.
        catalog.runStatement(new StatementRunner() {
            public void run(Statement statement) throws ProgramException, SQLException {
                if (!DistributedDB.tableExists(statement.getConnection(), "DTABLES")) {
                    catalog.log(System.out, "Creating table CATALOG.DTABLES.");
                    statement.executeUpdate(newCreateTableCommand());
                }
            }
        });
    }

    /**
     * Updates the catalog database based on a successfully executed DDL command
     * by one of the threads.
     * 
     * @param command
     *            The DDL command that just completed successfully.
     * 
     * @throws ProgramException
     *             Thrown if there is an error updating the catalog database.
     */
    private void updateCatalog(final String command, final ConfigurationNode node)
            throws ProgramException {
        assert null != command;

        // Get the table name. If there is none, return immediately.
        final String tableName;
        if (null == (tableName = DistributedDB.tableName(command)))
            return;
        try {
            this.catalog.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    // Check for CREATE and DROP commands; in these cases,
                    // add or remove a row from the DTABLES table, respectively.
                    final String catalogCommand;
                    if (DistributedDB.isCommandType(command, "CREATE"))
                        catalogCommand = newInsertRowCommand(tableName, node);
                    else if (DistributedDB.isCommandType(command, "DROP"))
                        catalogCommand = newDeleteRowCommand(tableName, node);
                    else {
                        catalog.log(System.err, "Unsupported command: " + command);
                        return;
                    }

                    catalog.log(System.out, "Updating catalog: " + catalogCommand);
                    statement.executeUpdate(catalogCommand);
                }
            });
        } catch (final Exception e) {
            this.success = false;
            this.catalog.log(System.err, "Failed to update catalog dtables. " + e.getMessage());
        }
    }
    

    /**
     * Creates and returns a new DDL command to create the 'DTABLES' catalog
     * table.
     * 
     * @return The new DDL command to create the 'DTABLES' catalog table.
     */
    private static String newCreateTableCommand() {
        final StringBuilder builder = new StringBuilder();
        builder.append("CREATE TABLE DTABLES (");
        builder.append("TNAME      CHAR(32), ");
        builder.append("NODEDRIVER CHAR(64), ");
        builder.append("NODEURL    CHAR(128), ");
        builder.append("NODEUSER   CHAR(16), ");
        builder.append("NODEPASSWD CHAR(16), ");
        builder.append("PARTMTD    INT, ");
        builder.append("NODEID     INT, ");
        builder.append("PARTCOL    CHAR(32), ");
        builder.append("PARTPARAM1 CHAR(32), ");
        builder.append("PARTPARAM2 CHAR(32))");
        return builder.toString();
    }

    /**
     * Creates and returns a new DDL command to delete a row from the 'DTABLES'
     * catalog table.
     * 
     * @param tableName
     *            The name of the table to delete.
     * 
     * @return A new DDL command to delete a row from the 'DTABLES' catalog
     *         table.
     */
    private String newDeleteRowCommand(final String tableName, final ConfigurationNode node) {
        assert null != tableName;

        final StringBuilder builder = new StringBuilder();
        builder.append("DELETE FROM DTABLES WHERE TNAME = ");
        builder.append(DistributedDB.quote(tableName));
        builder.append(" AND NODEID = ");
        builder.append(node.name.substring(4));
        return builder.toString();
    }

    /**
     * Creates and returns a new DDL command to insert a row from the 'DTABLES'
     * catalog table.
     * 
     * @param tableName
     *            The name of the table to insert.
     * 
     * @return A new DDL command to insert a row from the 'DTABLES' catalog
     *         table.
     */
    private String newInsertRowCommand(final String tableName, final ConfigurationNode node) {
        assert null != tableName;

        final StringBuilder builder = new StringBuilder();
        builder.append("INSERT INTO DTABLES VALUES (");
        builder.append(DistributedDB.quote(tableName));
        builder.append(", ");
        builder.append(DistributedDB.quote(node.driver));
        builder.append(", ");
        builder.append(DistributedDB.quote(node.hostname));
        builder.append(", ");
        builder.append(DistributedDB.quote(node.username));
        builder.append(", ");
        builder.append(DistributedDB.quote(node.password));
        builder.append(", NULL, ");
        builder.append(node.name.substring(4));
        builder.append(", NULL");
        builder.append(", NULL");
        builder.append(", NULL)");
        return builder.toString();
    }

    public String toString() {
        return this.configuration.toString();
    }
}
Valid HTML 4.01 Valid CSS