package edu.hawaii.ics.yucheng; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; /** * A class implements Runnable. The run method takes a list of SQL statements. * Only select and join statements are supported. The queries will be executed * accordingly on the distributed database system. * * @author Cheng Jade * @assignment ICS 421 Project * @date Mar 22, 2010 * @bugs None */ public class SelectOrJoin implements Runnable { /* the create or drop configuration object. */ public final SelectOrJoinConfiguration configuration; /* the catalog node extracted from the overall configuration. */ public final ConfigurationNode catalog; /* a list of SQL commands to be executed. */ private final SQLList sqls; /** * Initialize a new instance of this object */ public SelectOrJoin(final SelectOrJoinConfiguration configuration, final ArrayList<String> sqls) { if (null == configuration) throw new NullPointerException("CreateOrDrop"); this.configuration = configuration; this.catalog = this.configuration.catalog; this.sqls = (SQLList) sqls; } /** * The main routine of that executes a list of SQL statements. They are * either select statements or join statements. */ public void run() { try { final ArrayList<SQLStructure> selectSqls = new ArrayList<SQLStructure>(); for (final String sql : this.sqls) { final SQLStructure sqlStructure = new SQLStructure(sql); final SQLType sqlType = sqlStructure.type; if (sqlType == SQLType.JOIN) { System.err.println("Join queries are not supported in this version of distributed DB."); return; } if (sqlType == SQLType.SELECT) { selectSqls.add(sqlStructure); continue; } throw new ProgramException("unsupported sql: " + sql); } for (final SQLStructure sql : selectSqls) this.runSelect(sql); } catch (final ProgramException e) { System.err.println(e.getMessage()); return; } } /** * Execute one select statement and print the results. */ private void runSelect(final SQLStructure sql) throws ProgramException { // the nodes read from the catalog. final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>(); final String tableName = sql.tableName1; // Connect to the catalog. this.catalog.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { final String sql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName + "' OR TNAME = UCASE('" + tableName + "')"; SelectOrJoin.this.catalog.log(System.out, "Executing " + sql); final ResultSet set = statement.executeQuery(sql); try { // For all cluster node found that have this table, add // the node info to the configuration. while (set.next()) { final String driver = set.getString("NODEDRIVER").trim(); final String url = set.getString("NODEURL").trim(); final String user = set.getString("NODEUSER").trim(); final String password = set.getString("NODEPASSWD").trim(); final String name = "node " + set.getString("NODEID").trim(); nodes.add(new ConfigurationNode(name, driver, url, user, password)); } } finally { set.close(); } } }); final int count = nodes.size(); this.catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found."); // start threads on each node final Thread[] threads = new Thread[count]; final ArrayList<SelectRunner> runners = new ArrayList<SelectRunner>(); for (int i = 0; i < count; i++) { final SelectRunner runner = new SelectRunner(nodes.get(i), sql.sql); runners.add(runner); threads[i] = new Thread(runner); threads[i].start(); } // wait for the threads to complete and collect the results. final StringBuilder resultBuilder = new StringBuilder(); for (int i = 0; i < count; i++) { DistributedDB.join(threads[i], nodes.get(i)); final String[] results = runners.get(i).getResults(); for (final String result : results) resultBuilder.append(result + "\n"); } // print the collected result if (count != 0) { System.out.println("\nResults of executing " + sql.sql + " are: "); System.out.println(resultBuilder.toString()); } } } /** * A runner class that implements a run method that executes the select SQL * statement on a node. */ class SelectRunner implements Runnable { /** The configuration node associated with a single thread. */ private final ConfigurationNode node; /** The SQL statement to be executed. */ private final String sql; /** The results from the query. */ private final ArrayList<String> results = new ArrayList<String>(); /** * returns the select statement results on this node as an array of strings. */ public String[] getResults() { final String[] results = new String[this.results.size()]; this.results.toArray(results); return results; } /** * Initializes a new instance of the RunSQL Runner. * * @param node * The cluster node associated with this instance. * * @param sql * The select SQL statement to execute. */ public SelectRunner(final ConfigurationNode node, final String sql) { assert null != node; assert null != sql; this.node = node; this.sql = sql; } /** * Executes the select statement on this node. */ public void run() { // Connect to the specific node and execute the select statement. try { this.node.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { // Execute the select statement. SelectRunner.this.node.log(System.out, "Executing " + SelectRunner.this.sql); final ResultSet set = statement.executeQuery(SelectRunner.this.sql); try { while (set.next()) { final StringBuilder builder = new StringBuilder(); final int count = set.getMetaData().getColumnCount(); for (int i = 1; i <= count; i++) { builder.append(set.getString(i).trim()); if (i < count) builder.append("\t"); } SelectRunner.this.results.add(builder.toString()); } // SelectRunner.this.node.log(System.out, // "Select statement executed successfully"); } finally { set.close(); } } }); } catch (final ProgramException e) { this.node.log(System.err, "SelectRunner run " + e.getMessage()); } } }