RunSQL.java

package edu.hawaii.ics.yucheng;

import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.ArrayList;

import org.antlr.runtime.ANTLRStringStream;
import org.antlr.runtime.CharStream;
import org.antlr.runtime.CommonTokenStream;
import org.antlr.runtime.ParserRuleReturnScope;
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.TokenStream;
import org.antlr.runtime.tree.BaseTree;
import org.antlr.runtime.tree.Tree;

/**
 * A class that executes a given SQL statements on a cluster of computers, each
 * running an instance of a DBMS. This program executes the same DDL statements
 * on the database instance of each of the computers on the cluster concurrently
 * using threads.
 * 
 * @author     Cheng Jade
 * @assignment ICS 421 Assignment 2-1
 * @date       Feb 29, 2010
 * @bugs       None
 */
public class RunSQL {

    /**
     * Parses an SQL command and returns the table name for a SELECT statement.
     * 
     * @param command
     *            The command to parse.
     * 
     * @return The table name.
     * 
     * @throws ProgramException
     *             Thrown if the table name cannot be determined.
     */
    private static String selectTableName(String command)
            throws ProgramException {
        assert null != command;

        // Get ready for ANTLR!
        final CharStream charStream = new ANTLRStringStream(command);
        final SQLLexer lexer = new SQLLexer(charStream);
        final TokenStream tokenStream = new CommonTokenStream(lexer);
        final SQLParser parser = new SQLParser(tokenStream);

        try {
            final ParserRuleReturnScope scope = parser.selectStatement();

            final BaseTree tree = (BaseTree) scope.getTree();
            if (tree == null)
                throw new ProgramException("Cannot parse SQL statement.");

            final Tree from = tree.getFirstChildWithType(SQLParser.SQLFROM);
            if (from == null)
                throw new ProgramException("Cannot parse SQL statement.");

            final int fromIndex = from.getChildIndex();
            if (fromIndex + 1 >= tree.getChildCount())
                throw new ProgramException("Cannot parse SQL statement.");

            final String result = tree.getChild(fromIndex + 1).getText();

            // Note: ANTLR could do this.
            if (result.startsWith("'"))
                return result.substring(1, result.length() - 1);

            return result;

        } catch (final RecognitionException e) {
            throw new ProgramException(e);
        }
    }

    /**
     * The main entry point of the application.
     * 
     * @param args
     *            The command line arguments.
     */
    public static void main(final String[] args) {
        assert null != args;

        // Check for usage errors.
        if (args.length != 2) {
            final String name = RunSQL.class.getSimpleName();
            System.err.println("Usage: java " + name + " <cfg> <ddl>");
            System.err.println("       <cfg> path to a configuration file");
            System.err.println("       <ddl> path to a SQL file");
            System.exit(0);
            return;
        }

        // the nodes read from the catalog.
        final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>();

        // Read the configuration and SQL command files.
        try {
            assert null != args[0];
            final ConfigurationNode catalog = new ConfigurationNode(args[0], "catalog");

            assert null != args[1];
            final CommandList commands = new CommandList(args[1]);
            if (commands.size() != 1)
                throw new ProgramException(
                        "This program supports only one SQL file statement.");
            final String command = commands.get(0);
            final String tableName = selectTableName(command);

            // Connect to the catalog.
            catalog.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    final String lowerSql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName + "'";
                    catalog.log(System.out, "Executing " + lowerSql);
                    ResultSet set = statement.executeQuery(lowerSql);
                    
                    if (!set.next()) {
                        final String upperSql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName.toUpperCase() + "'";
                        catalog.log(System.out, "Executing " + upperSql);
                        set = statement.executeQuery(upperSql);
                    } else {
                        set = statement.executeQuery(lowerSql);
                    }
                    
                    try {
                        // For all cluster node found that have this table, add
                        // the node info to the configuration.
                        while (set.next()) {
                            final String driver = set.getString("NODEDRIVER").trim();
                            final String url = set.getString("NODEURL").trim();
                            final String user = set.getString("NODEUSER").trim();
                            final String password = set.getString("NODEPASSWD").trim();
                            final String name = "node " + set.getString("NODEID").trim();
                            
                            nodes.add(new ConfigurationNode(name, driver, url, user, password));
                        }
                    } finally {
                        set.close();
                    }
                }
            });
            if (nodes.size() == 0)
                throw new ProgramException(
                    "No cluster node with talbe name '" + tableName + "' was found.");
            catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found.");

            // Start threads for each configuration node.
            final ArrayList<Runner> runners = new ArrayList<Runner>();
            for (final ConfigurationNode node : nodes) {
                final Runner runner = new Runner(node, command);
                runner.start();
                runners.add(runner);
            }

            // Wait for threads the complete.
            for (final Runner runner : runners)
                runner.join();

            // Print the results.
            for (final Runner runner : runners)
                for (final String result : runner.getResults())
                    System.out.println(result);

        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            System.exit(1);
            return;
        }

        // Exit cleanly while debugging from Eclipse.
        System.exit(0);
    }
}

class Runner implements Runnable {

    /** The configuration node associated with a single thread. */
    private final ConfigurationNode node;

    /** The SQL statement to be executed. */
    private final String command;

    /** The thread that executes the query statement. */
    private final Thread thread = new Thread(this);

    /** The results from the query. */
    private final ArrayList<String> results = new ArrayList<String>();

    /**
     * Initializes a new instance of the RunSQL Runner.
     * 
     * @param node
     *            The cluster node associated with this instance.
     * 
     * @param command
     *            The command to execute.
     */
    public Runner(final ConfigurationNode node, final String command) {
        assert null != node;
        assert null != command;

        this.node = node;
        this.command = command;
    }

    /**
     * Gets the results.
     * 
     * @return The array of records from the query.
     */
    public String[] getResults() {
        synchronized (this) {
            final String[] result = new String[this.results.size()];
            this.results.toArray(result);
            return result;
        }
    }

    /**
     * Joins the thread. If there are any errors, a warning is displayed, but
     * the method returns successfully.
     */
    public void join() {
        try {
            this.thread.join();
        } catch (final InterruptedException e) {
            this.node.log(System.err, "Thread did not respond.");
        }
    }

    /**
     * Starts the thread.
     */
    public void start() {

        // Clear any previous results.
        synchronized (this) {
            this.results.clear();
        }

        this.thread.start();
    }

    /**
     * Executes the DDL commands for the node associated with this instance.
     */
    public void run() {

        // Connect to the specific node and execute the select statement.
        try {
            this.node.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    // Execute the select statement.
                    Runner.this.node.log(System.out, "Executing " + Runner.this.command);
                    ResultSet set = statement.executeQuery(Runner.this.command);
                    
                    try {
                        synchronized (Runner.this) {
                            while (set.next()) {
                                final StringBuilder builder = new StringBuilder();
                                final int count = set.getMetaData().getColumnCount();
                                for (int i = 1; i <= count; i++) {
                                    builder.append(set.getString(i).trim());
                                    if (i < count)
                                        builder.append("\t");
                                }

                                Runner.this.results.add(builder.toString());
                            }
                        }

                        Runner.this.node.log(System.out, "Success");
                    } finally {
                        set.close();
                    }
                }
            });

        } catch (final ProgramException e) {
            this.node.log(System.err, e.getMessage());
        }
    }
}
Valid HTML 4.01 Valid CSS