package edu.hawaii.ics.yucheng; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; /** * A class implements Runnable. The run method takes a list of SQL statements. * Only select and join statements are supported. The queries will be executed * accordingly on the distributed database system. * * @author Cheng Jade * @assignment ICS 421 Assignment 4 * @date Mar 22, 2010 * @bugs None */ public class SelectOrJoin implements Runnable { /* the create or drop configuration object. */ public final SelectOrJoinConfiguration configuration; /* the catalog node extracted from the overall configuration. */ public final ConfigurationNode catalog; /* a list of SQL commands to be executed. */ private final SQLList sqls; /** * Initialize a new instance of this object */ public SelectOrJoin(final SelectOrJoinConfiguration configuration, final ArrayList<String> sqls) { if (null == configuration) throw new NullPointerException("CreateOrDrop"); this.configuration = configuration; this.catalog = this.configuration.catalog; this.sqls = (SQLList) sqls; } /** * The main routine of that executes a list of SQL statements. They are * either select statements or join statements. */ public void run() { try { final ArrayList<SQLStructure> selectSqls = new ArrayList<SQLStructure>(); final ArrayList<SQLStructure> joinSqls = new ArrayList<SQLStructure>(); for (final String sql : this.sqls) { final SQLStructure sqlStructure = new SQLStructure(sql); final SQLType sqlType = sqlStructure.type; if (sqlType == SQLType.SELECT) { selectSqls.add(sqlStructure); continue; } if (sqlType == SQLType.JOIN) { joinSqls.add(sqlStructure); continue; } throw new ProgramException("unsupported sql: " + sql); } for (final SQLStructure sql : selectSqls) this.runSelect(sql); for (final SQLStructure sql : joinSqls) this.runJoin(sql); } catch (final ProgramException e) { System.err.println(e.getMessage()); return; } } /** * Execute one join statement and print the results. */ private void runJoin(final SQLStructure sql) throws ProgramException { // Parse sql statement. Obtain the table names, projection push down // column names for each table, selection push down qualifications // for each table and the overall where clause. final String tableName1 = sql.tableName1; final String tableName2 = sql.tableName2; final ArrayList<String> table1RelevantColumns = stripTableName(sql.table1RelevantColumns); final ArrayList<String> table2RelevantColumns = stripTableName(sql.table2RelevantColumns); // Parse the configuration file. Use the catalog as localhost if // there's no local host section detected in the configuration file. final ConfigurationNode localNode = this.configuration.localNode; // Connect to the catalog and retrieve the nodes that contain pieces // of each target table. final ArrayList<ConfigurationNode> table1RelevantNodes = new ArrayList<ConfigurationNode>(); final ArrayList<ConfigurationNode> table2RelevantNodes = new ArrayList<ConfigurationNode>(); this.fetchRelevantNodes(tableName1, tableName2, table1RelevantNodes, table2RelevantNodes); // Make a target list with "TEMP_" appended in the column names. final String[] targetColumns = sql.targetColumns; final ArrayList<String> tempTargetColumns = new ArrayList<String>(); for (final String item : targetColumns) tempTargetColumns.add(("TEMP_" + item)); // an array list collecting final join results. final ArrayList<String> result = new ArrayList<String>(); for (int i = 0; i < table1RelevantNodes.size(); i++) { final ConfigurationNode nodei = table1RelevantNodes.get(i); final String createTempTableStatement1 = createTempTableStatement(nodei, tableName1, table1RelevantColumns); final String insertTempTableStatement1 = insertTempTableStatement(nodei, tableName1, table1RelevantColumns, sql.table1QualificationColumns); for (int j = 0; j < table2RelevantNodes.size(); j++) { final ConfigurationNode nodej = table2RelevantNodes.get(j); final String createTempTableStatement2 = createTempTableStatement(nodej, tableName2, table2RelevantColumns); final String insertTempTableStatement2 = insertTempTableStatement(nodej, tableName2, table2RelevantColumns, sql.table2QualificationColumns); // create the join statement. final String joinStatement = joinStatement( sql.sql, tableName1, tableName2); // execute the join statement. localNode.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { // create global temporary table with only the relevant // columns for table 1 localNode.log(System.out, "Executing " + createTempTableStatement1); statement.executeUpdate(createTempTableStatement1); // bulk insert the relevant rows from node i to // temporary table 1 if (insertTempTableStatement1 != null) { localNode.log(System.out, "Executing " + insertTempTableStatement1); statement.executeUpdate(insertTempTableStatement1); } // create global temporary table with only the relevant // columns for table 2 localNode.log(System.out, "Executing " + createTempTableStatement2); statement.executeUpdate(createTempTableStatement2); // bulk insert the relevant rows from node j to // temporary table 2 if (insertTempTableStatement2 != null) { localNode.log(System.out, "Executing " + insertTempTableStatement2); statement.executeUpdate(insertTempTableStatement2); } // execute the join on the two temporary tables. localNode.log(System.out, "Executing " + joinStatement); final ResultSet set = statement.executeQuery(joinStatement); while (set.next()) { final StringBuilder builder = new StringBuilder(); final int count = set.getMetaData().getColumnCount(); for (int i = 1; i <= count; i++) builder.append(set.getString(i).trim() + " "); result.add(builder.toString()); } } }); } } // Print the final result. System.out.println("\nStatement, " + sql.sql + ", executing result: "); for (final String item : result) System.out.println(item.trim()); System.out.println(); } /** * Execute one select statement and print the results. */ private void runSelect(final SQLStructure sql) throws ProgramException { // the nodes read from the catalog. final ArrayList<ConfigurationNode> nodes = new ArrayList<ConfigurationNode>(); final String tableName = sql.tableName1; // Connect to the catalog. this.catalog.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { final String sql = "SELECT DISTINCT * FROM DTABLES WHERE TNAME = '" + tableName + "' OR TNAME = UCASE('" + tableName + "')"; SelectOrJoin.this.catalog.log(System.out, "Executing " + sql); final ResultSet set = statement.executeQuery(sql); try { // For all cluster node found that have this table, add // the node info to the configuration. while (set.next()) { final String driver = set.getString("NODEDRIVER").trim(); final String url = set.getString("NODEURL").trim(); final String user = set.getString("NODEUSER").trim(); final String password = set.getString("NODEPASSWD").trim(); final String name = "node " + set.getString("NODEID").trim(); nodes.add(new ConfigurationNode(name, driver, url, user, password)); } } finally { set.close(); } } }); final int count = nodes.size(); this.catalog.log(System.out, nodes.size() + " nodes with table name '" + tableName + "' were found."); // start threads on each node final Thread[] threads = new Thread[count]; final ArrayList<SelectRunner> runners = new ArrayList<SelectRunner>(); for (int i = 0; i < count; i++) { final SelectRunner runner = new SelectRunner(nodes.get(i), sql.sql); runners.add(runner); threads[i] = new Thread(runner); threads[i].start(); } // wait for the threads to complete and collect the results. final StringBuilder resultBuilder = new StringBuilder(); for (int i = 0; i < count; i++) { DistributedDB.join(threads[i], nodes.get(i)); final String[] results = runners.get(i).getResults(); for (final String result : results) resultBuilder.append(result + "\n"); } // print the collected result if (count != 0) { System.out.println("\nResults of executing " + sql.sql + " are: "); System.out.println(resultBuilder.toString()); } } /** * Method returns a bulk insert statement based on the relevant columns and * relevant qualifications. This method applies both projection and * selection push down. * * @param srcNode * The node that's to be copied from. * @param tableName * The table whose contents are to be copied. * @param relevantColumns * The list of columns that're related to this table. * @param relevantQualifications * The list of qualifications that're related to this table. * */ private static String insertTempTableStatement( final ConfigurationNode srcNode, final String tableName, final ArrayList<String> relevantColumns, final ArrayList<String> relevantQualifications) { final int relaventColumnsCount = relevantColumns.size(); final String tempTableName = ("TEMP_" + tableName); // retrieve all records from the target table in node1. final ArrayList<String[]> records = new ArrayList<String[]>(); try { srcNode.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { String whereClause = ""; final int size = relevantQualifications.size(); if (size != 0) { whereClause += " WHERE "; for (int i = 0; i < size - 1; i++) whereClause += relevantQualifications.get(i) + " OR "; whereClause += relevantQualifications.get(size - 1); } final String selectStatement = "SELECT " + commaSeparatedColumnNames(relevantColumns) + " FROM " + tableName + whereClause; final ResultSet set = statement.executeQuery(selectStatement); while (set.next()) { final String[] record = new String[relaventColumnsCount]; for (int i = 1; i <= relaventColumnsCount; i++) { final Object o = set.getString(i); record[i - 1] = o == null ? "null" : o.toString().trim(); } records.add(record); } } }); } catch (final Exception e) { srcNode.log(System.err, e.toString()); } // construct the bulk insert statement. final StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO SESSION." + tempTableName + " VALUES \n"); final int recordsCount = records.size(); if (recordsCount == 0) return null; for (int i = 0; i < recordsCount; i++) { final String[] record = records.get(i); builder.append("\t\t\t\t("); for (int j = 0; j < relaventColumnsCount - 1; j++) builder.append(DistributedDB.quote(record[j]) + ", "); builder.append(DistributedDB.quote(record[relaventColumnsCount - 1]) + ")"); if (i != recordsCount - 1) builder.append(", \n"); } return builder.toString(); } /** * Method returns a creates table statement that makes a temporary table. * The temporary table contains only the necessary columns. * * @param srcNode * The node that the original table is located. * @param tableName * The table whose schema is used for the temporary table. * @param relevantColumns * The list of columns that're related to this table. * */ private static String createTempTableStatement( final ConfigurationNode srcNode, final String tableName, final ArrayList<String> relevantColumns) { final String tempTableName = ("TEMP_" + tableName); // otherwise make a temp table with minimal relevant columns final ArrayList<String> columnNames = new ArrayList<String>(); final ArrayList<String> columnTypes = new ArrayList<String>(); try { srcNode.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { final String query = "SELECT " + commaSeparatedColumnNames(relevantColumns) + " FROM " + tableName; final ResultSet set = statement.executeQuery(query); srcNode.log(System.out, "Excecuting " + query); // retrieve the column names and column types. final ResultSetMetaData meta = set.getMetaData(); if (!set.next()) srcNode.log(System.out, String.format("Table '%s' is empty", tableName)); final StringBuilder builder = new StringBuilder(); for (int i = 1; i <= meta.getColumnCount(); i++) { columnNames.add(meta.getColumnName(i)); String columnType = meta.getColumnTypeName(i); // if column type is char, make its size to be 128 // long in the temp table. if (columnType.equalsIgnoreCase("CHAR")) { builder.setLength(0); builder.append(columnType); builder.append("(128)"); columnType = builder.toString(); } columnTypes.add(columnType); } } }); } catch (final Exception e) { srcNode.log(System.err, e.getMessage()); } // construct the create table statement. final StringBuilder builder = new StringBuilder(); // builder.append("CREATE TABLE "); builder.append("DECLARE GLOBAL TEMPORARY TABLE "); builder.append(tempTableName + " ("); final int columnNumber = columnNames.size(); for (int i = 0; i < columnNumber - 1; i++) { builder.append(columnNames.get(i) + " "); builder.append(columnTypes.get(i) + ", "); } builder.append(columnNames.get(columnNumber - 1) + " "); builder.append(columnTypes.get(columnNumber - 1) + ")"); builder.append(" ON COMMIT PRESERVE ROWS NOT LOGGED"); return builder.toString(); } /** * A helper method that takes a list of columns names and append them into * to a comma separated string. * * @param columns * The list of columns names * * @return The comma separated column names * */ private static String commaSeparatedColumnNames( final ArrayList<String> columns) { final StringBuilder builder = new StringBuilder(); for (int i = 0; i < columns.size() - 1; i++) builder.append(columns.get(i) + ", "); builder.append(columns.get(columns.size() - 1)); return builder.toString(); } /** * A helper method that takes a list of columns names in the format of * "tableName.columnName, and returns a list of columns without the leading * table names. * * @param columns * The list of columns in the format of "tableName.columnName" * * @return A list of column names in the format of "columnName" * */ private static ArrayList<String> stripTableName(final ArrayList<String> columns) { final ArrayList<String> stripedColumns = new ArrayList<String>(); for (String item : columns) { item = item.replace('.', ':'); final String[] result = item.split(":"); final int size = result.length; assert size <= 2; if (size == 1) stripedColumns.add(result[0].trim()); if (size == 2) stripedColumns.add(result[1].trim()); } return stripedColumns; } /** * Connect to specified catalog node and retrieves the node information. It * fills two ArrayLists with nodes that contain pieces of each of the two * specified input tables. * * @param catalog * The catalog DB, whose dtables contains the distribution info. * @param tableName1 * The first table name that it's looking for. * @param tableName2 * The second table name that it's looking for. * @param table1RElevantNodes * The ArrayList containing the nodes that relevant to * tableName1. * @param table2RElevantNodes * The ArrayList containing the nodes that relevant to * tableName2. * */ private void fetchRelevantNodes( final String tableName1, final String tableName2, final ArrayList<ConfigurationNode> table1RelevantNodes, final ArrayList<ConfigurationNode> table2RelevantNodes) { // Connect to the catalog. try { this.catalog.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { // fetch the relevant nodes in table1 String sql = SelectOrJoin.this.fetchRelevantNodesHelper(tableName1, statement, table1RelevantNodes); SelectOrJoin.this.catalog.log(System.out, "Eexcuted " + sql); // fetch the relevant nodes in table2 sql = SelectOrJoin.this.fetchRelevantNodesHelper(tableName2, statement, table2RelevantNodes); SelectOrJoin.this.catalog.log(System.out, "Eexcuted " + sql); } }); } catch (final Exception e) { this.catalog.log(System.err, "failed in fetchRelevantNodes " + e.toString()); } } /** * This is a helper method for fetchRelevantNodes. It executes append nodes * that's related to a specified table name into an ArrayList. * * @param tableName * The table name that it's looking for. * @param statement * The statement created by catalog connection. * * @throws ProgramException * Thrown if the expected column is not found in the catalog. * */ private String fetchRelevantNodesHelper( final String tableName, final Statement statement, final ArrayList<ConfigurationNode> relevantNodes) throws SQLException, ProgramException { final String sql = "SELECT * FROM DTABLES WHERE TNAME = '" + tableName + "' OR TNAME = UCASE ('" + tableName + "')"; final ResultSet set = statement.executeQuery(sql); try { while (set.next()) { final String driver = set.getString("NODEDRIVER").trim(); final String url = set.getString("NODEURL").trim(); final String user = set.getString("NODEUSER").trim(); final String password = set.getString("NODEPASSWD").trim(); final String name = "node " + set.getString("NODEID").trim(); relevantNodes.add(new ConfigurationNode(name, driver, url, user, password)); } } finally { set.close(); } return sql; } /** * Create and return a join statement according to the target columns, where * clause, and target table names. * * @param targetColumns * The list of target columns that are in the select clause. * @param whereClause * The where clause to be put in this join statement. * @param tableName1 * The first table to be joined * @param tableName2 * The second table to be joined * */ private static String joinStatement( final String sql, final String tableName1, final String tableName2) { final String replace = sql.replaceAll(tableName1, "SESSION.TEMP_" + tableName1); final String result = replace.replaceAll(tableName2, "SESSION.TEMP_" + tableName2); return result; } } /** * A runner class that implements a run method that executes the select SQL * statement on a node. */ class SelectRunner implements Runnable { /** The configuration node associated with a single thread. */ private final ConfigurationNode node; /** The SQL statement to be executed. */ private final String sql; /** The results from the query. */ private final ArrayList<String> results = new ArrayList<String>(); /** * returns the select statement results on this node as an array of strings. */ public String[] getResults() { final String[] results = new String[this.results.size()]; this.results.toArray(results); return results; } /** * Initializes a new instance of the RunSQL Runner. * * @param node * The cluster node associated with this instance. * * @param sql * The select SQL statement to execute. */ public SelectRunner(final ConfigurationNode node, final String sql) { assert null != node; assert null != sql; this.node = node; this.sql = sql; } /** * Executes the select statement on this node. */ public void run() { // Connect to the specific node and execute the select statement. try { this.node.runStatement(new StatementRunner() { public void run(final Statement statement) throws ProgramException, SQLException { // Execute the select statement. SelectRunner.this.node.log(System.out, "Executing " + SelectRunner.this.sql); final ResultSet set = statement.executeQuery(SelectRunner.this.sql); try { while (set.next()) { final StringBuilder builder = new StringBuilder(); final int count = set.getMetaData().getColumnCount(); for (int i = 1; i <= count; i++) { builder.append(set.getString(i).trim()); if (i < count) builder.append("\t"); } SelectRunner.this.results.add(builder.toString()); } // SelectRunner.this.node.log(System.out, // "Select statement executed successfully"); } finally { set.close(); } } }); } catch (final ProgramException e) { this.node.log(System.err, "SelectRunner run " + e.getMessage()); } } }