package edu.hawaii.ics.yucheng;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
/**
* A class implements Runnable. The run method takes a list of SQL statements.
* Only select and join statements are supported. The queries will be executed
* accordingly on the distributed database system.
*
* @author Cheng Jade
* @assignment ICS 421 Project
* @date Mar 22, 2010
* @bugs None
*/
public class SelectOrJoin implements Runnable {
/* the create or drop configuration object. */
public final SelectOrJoinConfiguration configuration;
/* the catalog node extracted from the overall configuration. */
public final ConfigurationNode catalog;
/* a list of SQL commands to be executed. */
private final SQLList sqls;
/**
* Initialize a new instance of this object
*/
public SelectOrJoin(final SelectOrJoinConfiguration configuration, final ArrayList<String> sqls) {
if (null == configuration)
throw new NullPointerException("CreateOrDrop");
this.configuration = configuration;
this.catalog = this.configuration.catalog;
this.sqls = (SQLList) sqls;
}
/**
* The main routine of that executes a list of SQL statements. They are
* either select statements or join statements.
*/
public void run() {
try {
final ArrayList<SQLStructure> selectSqls = new ArrayList<SQLStructure>();
for (final String sql : this.sqls) {
final SQLStructure sqlStructure = new SQLStructure(sql);
final SQLType sqlType = sqlStructure.type;
if (sqlType == SQLType.JOIN) {
System.err.println("Join queries are not supported in this version of distributed DB.");
return;
}
if (sqlType == SQLType.SELECT) {
selectSqls.add(sqlStructure);
continue;
}
throw new ProgramException("unsupported sql: " + sql);
}
for (final SQLStructure sql : selectSqls)
this.runSelect(sql);
} catch (final ProgramException e) {
System.err.println(e.getMessage());
return;
}
}
/**
* Execute one select statement and print the results.
*/
private void runSelect(final SQLStructure sql) throws ProgramException {
// the nodes read from the catalog.
final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>();
final String tableName = sql.tableName1;
// Connect to the catalog.
this.catalog.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
final String sql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName + "' OR TNAME = UCASE('" + tableName + "')";
SelectOrJoin.this.catalog.log(System.out, "Executing " + sql);
final ResultSet set = statement.executeQuery(sql);
try {
// For all cluster node found that have this table, add
// the node info to the configuration.
while (set.next()) {
final String driver = set.getString("NODEDRIVER").trim();
final String url = set.getString("NODEURL").trim();
final String user = set.getString("NODEUSER").trim();
final String password = set.getString("NODEPASSWD").trim();
final String name = "node " + set.getString("NODEID").trim();
nodes.add(new ConfigurationNode(name, driver, url, user, password));
}
} finally {
set.close();
}
}
});
final int count = nodes.size();
this.catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found.");
// start threads on each node
final Thread[] threads = new Thread[count];
final ArrayList<SelectRunner> runners = new ArrayList<SelectRunner>();
for (int i = 0; i < count; i++) {
final SelectRunner runner = new SelectRunner(nodes.get(i), sql.sql);
runners.add(runner);
threads[i] = new Thread(runner);
threads[i].start();
}
// wait for the threads to complete and collect the results.
final StringBuilder resultBuilder = new StringBuilder();
for (int i = 0; i < count; i++) {
DistributedDB.join(threads[i], nodes.get(i));
final String[] results = runners.get(i).getResults();
for (final String result : results)
resultBuilder.append(result + "\n");
}
// print the collected result
if (count != 0) {
System.out.println("\nResults of executing " + sql.sql + " are: ");
System.out.println(resultBuilder.toString());
}
}
}
/**
* A runner class that implements a run method that executes the select SQL
* statement on a node.
*/
class SelectRunner implements Runnable {
/** The configuration node associated with a single thread. */
private final ConfigurationNode node;
/** The SQL statement to be executed. */
private final String sql;
/** The results from the query. */
private final ArrayList<String> results = new ArrayList<String>();
/**
* returns the select statement results on this node as an array of strings.
*/
public String[] getResults() {
final String[] results = new String[this.results.size()];
this.results.toArray(results);
return results;
}
/**
* Initializes a new instance of the RunSQL Runner.
*
* @param node
* The cluster node associated with this instance.
*
* @param sql
* The select SQL statement to execute.
*/
public SelectRunner(final ConfigurationNode node, final String sql) {
assert null != node;
assert null != sql;
this.node = node;
this.sql = sql;
}
/**
* Executes the select statement on this node.
*/
public void run() {
// Connect to the specific node and execute the select statement.
try {
this.node.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
// Execute the select statement.
SelectRunner.this.node.log(System.out, "Executing " + SelectRunner.this.sql);
final ResultSet set = statement.executeQuery(SelectRunner.this.sql);
try {
while (set.next()) {
final StringBuilder builder = new StringBuilder();
final int count = set.getMetaData().getColumnCount();
for (int i = 1; i <= count; i++) {
builder.append(set.getString(i).trim());
if (i < count)
builder.append("\t");
}
SelectRunner.this.results.add(builder.toString());
}
// SelectRunner.this.node.log(System.out,
// "Select statement executed successfully");
} finally {
set.close();
}
}
});
} catch (final ProgramException e) {
this.node.log(System.err, "SelectRunner run " + e.getMessage());
}
}
}