package edu.hawaii.ics.yucheng; import java.sql.ResultSet; import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; import org.antlr.runtime.ANTLRStringStream; import org.antlr.runtime.CharStream; import org.antlr.runtime.CommonTokenStream; import org.antlr.runtime.ParserRuleReturnScope; import org.antlr.runtime.RecognitionException; import org.antlr.runtime.TokenStream; import org.antlr.runtime.tree.BaseTree; import org.antlr.runtime.tree.Tree; /** * A class that executes a given SQL statements on a cluster of computers, each * running an instance of a DBMS. This program executes the same DDL statements * on the database instance of each of the computers on the cluster concurrently * using threads. * * @author Cheng Jade * @assignment ICS 421 Assignment 2-1 * @date Feb 29, 2010 * @bugs None */ public class RunSQL { /** * Parses an SQL command and returns the table name for a SELECT statement. * * @param command * The command to parse. * * @return The table name. * * @throws ProgramException * Thrown if the table name cannot be determined. */ private static String selectTableName(String command) throws ProgramException { assert null != command; // Get ready for ANTLR! final CharStream charStream = new ANTLRStringStream(command); final SQLLexer lexer = new SQLLexer(charStream); final TokenStream tokenStream = new CommonTokenStream(lexer); final SQLParser parser = new SQLParser(tokenStream); try { final ParserRuleReturnScope scope = parser.selectStatement(); final BaseTree tree = (BaseTree) scope.getTree(); if (tree == null) throw new ProgramException("Cannot parse SQL statement."); final Tree from = tree.getFirstChildWithType(SQLParser.SQLFROM); if (from == null) throw new ProgramException("Cannot parse SQL statement."); final int fromIndex = from.getChildIndex(); if (fromIndex + 1 >= tree.getChildCount()) throw new ProgramException("Cannot parse SQL statement."); final String result = tree.getChild(fromIndex + 1).getText(); // Note: ANTLR could do this. if (result.startsWith("'")) return result.substring(1, result.length() - 1); return result; } catch (final RecognitionException e) { throw new ProgramException(e); } } /** * The main entry point of the application. * * @param args * The command line arguments. */ public static void main(final String[] args) { assert null != args; // Check for usage errors. if (args.length != 2) { final String name = RunSQL.class.getSimpleName(); System.err.println("Usage: java " + name + " <cfg> <ddl>"); System.err.println(" <cfg> path to a configuration file"); System.err.println(" <ddl> path to a SQL file"); System.exit(0); return; } // the nodes read from the catalog. final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>(); // Read the configuration and SQL command files. try { assert null != args[0]; final ConfigurationNode catalog = new ConfigurationNode(args[0], "catalog"); assert null != args[1]; final CommandList commands = new CommandList(args[1]); if (commands.size() != 1) throw new ProgramException( "This program supports only one SQL file statement."); final String command = commands.get(0); final String tableName = selectTableName(command); // Connect to the catalog. catalog.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { final String lowerSql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName + "'"; catalog.log(System.out, "Executing " + lowerSql); ResultSet set = statement.executeQuery(lowerSql); if (!set.next()) { final String upperSql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName.toUpperCase() + "'"; catalog.log(System.out, "Executing " + upperSql); set = statement.executeQuery(upperSql); } else { set = statement.executeQuery(lowerSql); } try { // For all cluster node found that have this table, add // the node info to the configuration. while (set.next()) { final String driver = set.getString("NODEDRIVER").trim(); final String url = set.getString("NODEURL").trim(); final String user = set.getString("NODEUSER").trim(); final String password = set.getString("NODEPASSWD").trim(); final String name = "node " + set.getString("NODEID").trim(); nodes.add(new ConfigurationNode(name, driver, url, user, password)); } } finally { set.close(); } } }); if (nodes.size() == 0) throw new ProgramException( "No cluster node with talbe name '" + tableName + "' was found."); catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found."); // Start threads for each configuration node. final ArrayList<Runner> runners = new ArrayList<Runner>(); for (final ConfigurationNode node : nodes) { final Runner runner = new Runner(node, command); runner.start(); runners.add(runner); } // Wait for threads the complete. for (final Runner runner : runners) runner.join(); // Print the results. for (final Runner runner : runners) for (final String result : runner.getResults()) System.out.println(result); } catch (final ProgramException e) { System.err.println(e.getMessage()); System.exit(1); return; } // Exit cleanly while debugging from Eclipse. System.exit(0); } } class Runner implements Runnable { /** The configuration node associated with a single thread. */ private final ConfigurationNode node; /** The SQL statement to be executed. */ private final String command; /** The thread that executes the query statement. */ private final Thread thread = new Thread(this); /** The results from the query. */ private final ArrayList<String> results = new ArrayList<String>(); /** * Initializes a new instance of the RunSQL Runner. * * @param node * The cluster node associated with this instance. * * @param command * The command to execute. */ public Runner(final ConfigurationNode node, final String command) { assert null != node; assert null != command; this.node = node; this.command = command; } /** * Gets the results. * * @return The array of records from the query. */ public String[] getResults() { synchronized (this) { final String[] result = new String[this.results.size()]; this.results.toArray(result); return result; } } /** * Joins the thread. If there are any errors, a warning is displayed, but * the method returns successfully. */ public void join() { try { this.thread.join(); } catch (final InterruptedException e) { this.node.log(System.err, "Thread did not respond."); } } /** * Starts the thread. */ public void start() { // Clear any previous results. synchronized (this) { this.results.clear(); } this.thread.start(); } /** * Executes the DDL commands for the node associated with this instance. */ public void run() { // Connect to the specific node and execute the select statement. try { this.node.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { // Execute the select statement. Runner.this.node.log(System.out, "Executing " + Runner.this.command); ResultSet set = statement.executeQuery(Runner.this.command); try { synchronized (Runner.this) { while (set.next()) { final StringBuilder builder = new StringBuilder(); final int count = set.getMetaData().getColumnCount(); for (int i = 1; i <= count; i++) { builder.append(set.getString(i).trim()); if (i < count) builder.append("\t"); } Runner.this.results.add(builder.toString()); } } Runner.this.node.log(System.out, "Success"); } finally { set.close(); } } }); } catch (final ProgramException e) { this.node.log(System.err, e.getMessage()); } } }