package edu.hawaii.ics.yucheng; import java.io.PrintStream; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.Statement; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; /** * A class that executes a given set of DDL statements on a cluster of * computers, each running an instance of a DBMS. This program executes the same * DDL statements on the database instance of each of the computers on the * cluster concurrently using threads. * * @author Cheng Jade * @assignment ICS 421 Assignment 1 * @date Feb 10, 2010 * @bugs None */ class RunDDL implements Runnable { /** The catalog DBMS, which is common to all threads. */ private static ConfigurationNode catalog; /** The list of commands to execute, which is common to all threads. */ private static CommandList commands; /** The configuration node associated with a single thread. */ private final ConfigurationNode node; /** The thread executing the commands on a single node. */ private final Thread thread; /** A flag indicating if all commands executed successfully by the thread. */ private boolean success = false; /** * The main entry point of the application. * * @param args * The command line arguments. */ public static void main(final String[] args) { assert null != args; // Check for usage errors. if (args.length != 2) { final String name = RunDDL.class.getSimpleName(); System.out.println("Usage: java " + name + " <cfg> <ddl>"); System.out.println(" <cfg> path to a configuration file"); System.out.println(" <ddl> path to a DDL file"); System.exit(0); return; } // Read the configuration and DDL command files. try { assert null != args[0]; assert null != args[1]; final Configuration configuration = new Configuration(args[0]); commands = new CommandList(args[1]); catalog = configuration.getCatalog(); // Prepare the 'DTABLES' in the catalog database. If the table does // not exist, a new one is created. prepareCatalog(); // Start threads for each configuration node. final ArrayList<RunDDL> instances = new ArrayList<RunDDL>(); for (final ConfigurationNode node : configuration.getNodes()) instances.add(new RunDDL(node)); // Wait for all threads to terminate. boolean success = true; for (final RunDDL instance : instances) { instance.join(); // Check that all commands completed successfully. success &= instance.success; } // Print message indicating if all commands completed successfully. if (success) System.out.println("All DDL commands executed successfully."); else System.out.println("Not all DDL commands executed successfully."); } catch (final ProgramException e) { System.err.println(e.getMessage()); System.exit(1); return; } // Exit cleanly while debugging from Eclipse. System.exit(0); } /** * Initializes a new instance of the RunDLL class. * * @param node * The cluster node associated with this instance. */ private RunDDL(final ConfigurationNode node) { assert null != node; // Assign the class field to the parameter values. this.node = node; // Create a new thread for this instance. this.thread = new Thread(this); // Start the thread. this.thread.start(); } /** * Executes the DDL commands for the node associated with this instance. */ public void run() { try { StatementFactory.newStatement(this.node, new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { for (final String command : commands) { log(System.out, "Executing statement: " + command); statement.executeUpdate(command); updateCatalog(command); log(System.out, "Statement executed successfully."); } } }); // Arriving here indicates all commands executed successfully. this.success = true; } catch (final Exception e) { this.log(System.err, e.getMessage()); } } /** * Ensures the catalog 'DTABLES' table exists. * * @throws ProgramException * Thrown if there is a problem verifying the table exists or if * the table cannot be created. */ private static void prepareCatalog() throws ProgramException { assert null != catalog; // Open the connection, check if the table exists, and create the table // if it does not exist. try { StatementFactory.newStatement(catalog, new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { if (!tableExists(statement.getConnection(), "DTABLES")) { System.out.println("Creating table CATALOG.DTABLES."); statement.executeUpdate(newCreateTableCommand()); } } }); } catch (final Exception e) { throw new ProgramException(e); } } /** * Returns true if a specified command is of a specified type. * * @param command * The DDL command, e.g. "CREATE TABLE ...". * @param type * The command type, e.g. "CREATE". */ private static boolean isCommandType( final String command, final String type) { assert null != command; assert null != type; return command.toUpperCase().startsWith(type.toUpperCase()); } /** * Creates and returns a new DDL command to create the 'DTABLES' catalog * table. * * @return The new DDL command to create the 'DTABLES' catalog table. */ private static String newCreateTableCommand() { final StringBuilder builder = new StringBuilder(); builder.append("CREATE TABLE DTABLES ("); builder.append("TNAME CHAR(32), "); builder.append("NODEDRIVER CHAR(64), "); builder.append("NODEURL CHAR(128), "); builder.append("NODEUSER CHAR(16), "); builder.append("NODEPASSWD CHAR(16), "); builder.append("PARTMTD INT, "); builder.append("NODEID INT, "); builder.append("PARTCOL CHAR(32), "); builder.append("PARTPARAM1 CHAR(32), "); builder.append("PARTPARAM2 CHAR(32))"); return builder.toString(); } /** * Surrounds a specified string with single quotes. * * @param text * Some text to quote. * * @return The quoted text. */ private static String quote(final String text) { assert null != text; return "'" + text + "'"; } /** * Returns true if a specified table exists in a database. * * @param connection * The DBMS connection. * * @param tableName * The name of the table. * * @return True indicates the table exists. * * @throws ProgramException * Thrown if there is an error checking the meta-data of the * DBMS connection. */ private static boolean tableExists( final Connection connection, final String tableName) throws ProgramException { assert null != connection; assert null != tableName; try { // Get the tables matching the specified name. Upper-case seems to // work best! final DatabaseMetaData meta = connection.getMetaData(); final ResultSet set = meta.getTables( null, null, tableName.toUpperCase(), null); try { // Return true if there are any records found, i.e. the table // exists. return set.next(); } finally { set.close(); } } catch (final SQLException e) { throw new ProgramException(e); } } /** * Parses a command and returns the table name for the command, or null if * there is none. * * @param command * The DDL command. * * @return The table name within the command. */ private static String tableName(final String command) { assert null != command; // Work with the command in upper-case letters. String result = command.toUpperCase(); // Search for the word TABLE. int index; if (-1 == (index = result.indexOf("TABLE") + 5)) return null; // Remove whitespace around the remaining portion of the command. result = result.substring(index).trim(); // Look for the first occurrence of a special character. If a '.' is // encountered, remove this portion of the text, which is most likely // the schema. index = 0; while (index < result.length()) { final char ch = result.charAt(index); if (ch == '.') { result = result.substring(index + 1); index = 0; continue; } if (!Character.isLetterOrDigit(result.charAt(index))) break; index++; } // The remaining portion is the table name. result = result.substring(0, index); return result.length() == 0 ? null : result; } /** * Logs a message to a specified output stream. * * @param stream * The output stream, e.g. System.out. * * @param message * The message to log. */ private void log(final PrintStream stream, final String message) { assert null != stream; assert null != message; stream.println("[" + node.hostname + "] " + message); } /** * Joins the thread and checks for errors. In the case of an error, a * warning is displayed, but no exception is thrown. */ private void join() { try { this.thread.join(); } catch (final InterruptedException e) { System.err.println( "WARNING: Thread [" + this.node.name + "] failed to respond."); } } /** * Creates and returns a new DDL command to delete a row from the 'DTABLES' * catalog table. * * @param tableName * The name of the table to delete. * * @return A new DDL command to delete a row from the 'DTABLES' catalog * table. */ private String newDeleteRowCommand(final String tableName) { assert null != tableName; final StringBuilder builder = new StringBuilder(); builder.append("DELETE FROM DTABLES WHERE TNAME = "); builder.append(quote(tableName)); builder.append(" AND NODEURL = "); builder.append(quote(this.node.hostname)); return builder.toString(); } /** * Creates and returns a new DDL command to insert a row from the 'DTABLES' * catalog table. * * @param tableName * The name of the table to insert. * * @return A new DDL command to insert a row from the 'DTABLES' catalog * table. */ private String newInsertRowCommand(final String tableName) { assert null != tableName; final StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO DTABLES VALUES ("); builder.append(quote(tableName)); builder.append(", "); builder.append(quote(this.node.driver)); builder.append(", "); builder.append(quote(this.node.hostname)); builder.append(", "); builder.append(quote(this.node.username)); builder.append(", "); builder.append(quote(this.node.password)); builder.append(", NULL, "); builder.append(this.node.name.substring(4)); builder.append(", NULL"); builder.append(", NULL"); builder.append(", NULL)"); return builder.toString(); } /** * Updates the catalog database based on a successfully executed DDL command * by one of the threads. * * @param command * The DDL command that just completed successfully. * * @throws ProgramException * Thrown if there is an error updating the catalog database. */ private void updateCatalog(final String command) throws ProgramException { assert null != command; // Get the table name. If there is none, return immediately. final String tableName; if (null == (tableName = tableName(command))) return; // Open a connection to the catalog database. try { StatementFactory.newStatement(catalog, new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { // Check for CREATE and DROP commands; in these cases, // add or remove a row from the DTABLES table, respectively. final String catalogCommand; if (isCommandType(command, "CREATE")) catalogCommand = newInsertRowCommand(tableName); else if (isCommandType(command, "DROP")) catalogCommand = newDeleteRowCommand(tableName); else { log(System.err, "Unsupported command: " + command); return; } log(System.out, "Updating catalog: " + catalogCommand); statement.executeUpdate(catalogCommand); } }); // Arriving here indicates all commands executed successfully. this.success = true; } catch (final Exception e) { this.log(System.err, "Failed to update catalog dtables. " + e.getMessage()); } } }