package edu.hawaii.ics.yucheng; import java.io.StringReader; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; /** * A class implements Runnable. The run method takes a list of CSV entries and * upload them into corresponding tables on the distributed database system. * * @author Cheng Jade * @assignment ICS 421 Assignment 4 * @date Mar 22, 2010 * @bugs None */ public class LoadCSV implements Runnable { /* the create or drop configuration object. */ public final LoadCSVConfiguration configuration; /* the catalog node extracted from the overall configuration. */ public final ConfigurationNode catalog; /* a list of SQL commands to be executed. */ private final ArrayList<String[]> csvs = new ArrayList<String[]>(); /* a record of whether all CSV have been loaded successfully. */ private Boolean success = null; /** * Initialize a new instance of this object */ public LoadCSV( final LoadCSVConfiguration configuration, final ArrayList<String> rawCSV) throws ProgramException { if (null == configuration) throw new NullPointerException("configuration"); if (null == rawCSV) throw new NullPointerException("rawCSV"); // initialize configuration and catalog. this.configuration = configuration; this.catalog = this.configuration.catalog; // stitch rawCSV lines into one big string. final StringBuilder builder = new StringBuilder(); for (String item : rawCSV) builder.append(item + "\n"); final StringReader reader = new StringReader(builder.toString()); // parse the string and obtain all CSV entries and fields of each entry. String[] fields; while (null != (fields = CSVParser.parse(reader))) csvs.add(fields); } /** * The main routine that insert rows to corresponding nodes and update the * catalog dtables. */ public void run() { try { // For each node, construct a list of CSVs that will be partitioned // to this node. final ArrayList<CSVsNodePair> csvsToNodes = new ArrayList<CSVsNodePair>(); for (int i = 0; i < this.configuration.nodes.size(); i++) csvsToNodes.add(new CSVsNodePair(this.configuration.nodes.get(i).node)); // Exam every CSV and decide which list this CSV belongs. final ArrayList<String> columnNames = columnNames(); for (String[] item : csvs) { final int toID = partitionTo(columnNames, item); for (CSVsNodePair pair : csvsToNodes) { if (Integer.parseInt(pair.node.name.substring(4)) == toID) pair.nodeCSVs.add(item); } } // Create a bulk insert statement for each node and start a thread // on each node to execute this insert statement. Threads also // update the catalog dtables. final int count = csvsToNodes.size(); final Thread[] threads = new Thread[count]; for (int i = 0; i < count; i++) { CSVsNodePair pair = csvsToNodes.get(i); final String bulkInsertStatement = bulkInsertStatement(pair); threads[i] = new Thread(new Runner(pair.node, bulkInsertStatement)); threads[i].start(); } // wait for threads. for (int i = 0; i < csvsToNodes.size(); i++) DistributedDB.join(threads[i], csvsToNodes.get(i).node); // Print message indicating if all commands completed successfully. if (this.success) System.out.println("All CSV were loaded successfully."); else System.out.println("Not all CSV were loaded successfully."); } catch (final ProgramException e) { System.err.println(e.getMessage()); System.exit(1); return; } } /** * Connect to a node and retrieve the columns names of this table. * * @return The list of column names in the target table. */ private ArrayList<String> columnNames() throws ProgramException { final ConfigurationNode node = configuration.nodes.get(0).node; final ArrayList<String> columnNames = new ArrayList<String>(); try { node.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { ResultSet set = statement.executeQuery("SELECT * FROM " + configuration.tableName); ResultSetMetaData meta = set.getMetaData(); for (int i = 1; i <= meta.getColumnCount(); i++) columnNames.add(meta.getColumnName(i)); } }); node.log(System.out, "Successfully connected and retrieved column names from a node db."); return columnNames; } catch (final Exception e) { node.log(System.err, e.getMessage()); throw new ProgramException("columnNames " + e); } } /** * Calculates the node ID that a specified CSV entry will be partitioned to. * * @param columnNames * The list of columns names retrieved from target table. * * @param csv * The specified CSV entry to be calculated. * * @return The node ID that this CSV entry will be partitioned to. * * @throws ProgramException * Thrown if there are any errors processing the CSV file. * @throws NullPointerException * Thrown if the argument is null. */ private int partitionTo( final ArrayList<String> columnNames, final String[] csv) throws ProgramException { if (null == columnNames) throw new NullPointerException("columnNames"); if (null == csv) throw new NullPointerException("csv"); int indexOfPartitionColumn = -1; String partitionColumn = this.configuration.partitionColumn; for (int i = 0; i < columnNames.size(); i++) { if (columnNames.get(i).equalsIgnoreCase(partitionColumn)) { indexOfPartitionColumn = i; break; } } if (indexOfPartitionColumn == -1) throw new ProgramException("Mismatch partition table name and the dtables"); String partitionValue = csv[indexOfPartitionColumn]; try { int intPartitionValue = Integer.parseInt(partitionValue); if (this.configuration.partitionMethod.equalsIgnoreCase("range")) { for (int i = 0; i < this.configuration.nodes.size(); i++) { int param1 = Integer.parseInt(this.configuration.nodes.get(i).param1); int param2 = Integer.parseInt(this.configuration.nodes.get(i).param2); if (intPartitionValue > param1 && intPartitionValue <= param2) return i + 1; } throw new ProgramException("Partition value out of range"); } if (configuration.partitionMethod.equalsIgnoreCase("hash")) { int param1 = Integer.parseInt(configuration.nodes.get(0).param1); return (intPartitionValue % param1) + 1; } throw new ProgramException("Support only range and hash partition methods."); } catch (NumberFormatException e) { throw new ProgramException("Support only numeric partion values"); } } /** * A private class that groups a node and its list of CSVs together. */ private class CSVsNodePair { public final ConfigurationNode node; public final ArrayList<String[]> nodeCSVs = new ArrayList<String[]>(); public CSVsNodePair(ConfigurationNode node) { this.node = node; } } /** * Create and return a bulk insert statement. * * @param pair * A CSVsNodepair used to generate insert statement. * * @return The bulk insert statement created. */ private String bulkInsertStatement(CSVsNodePair pair) { final StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO " + this.configuration.tableName + " VALUES \n"); final int nodeNum = pair.nodeCSVs.size(); for (int i = 0; i < nodeNum; i++) { String[] csv = pair.nodeCSVs.get(i); builder.append("\t\t\t\t("); for (int j = 0; j < csv.length - 1; j++) builder.append(DistributedDB.quote(csv[j]) + ", "); builder.append(DistributedDB.quote(csv[csv.length - 1]) + ")"); if (i != nodeNum - 1) builder.append(", \n"); } return builder.toString(); } /** * Create and return an update statement based on the partition information. * This SQL statement will be used to modify dtables. * * @param partmtd * An dtable entry, 1 for range, 2 for hash * * @param partparam1 * An dtable entry, partition parameter 1 * * @param partparam2 * An dtable entry, partition parameter 2 * * @param node * A ConfigurationNode to figure out the node ID entry of dtables * * @return The the update statement */ private String updateStatement( final String partmtd, final String partparam1, String partparam2, final ConfigurationNode node) { assert null != partmtd; assert null != partparam1; partparam2 = partparam2 == null ? "" : partparam2; StringBuilder builder = new StringBuilder(); builder.append("UPDATE DTABLES"); builder.append(" SET PARTMTD = "); builder.append(DistributedDB.quote(partmtd)); builder.append(", PARTCOL = "); builder.append(DistributedDB.quote(this.configuration.partitionColumn)); builder.append(", PARTPARAM1 = "); builder.append(DistributedDB.quote(partparam1)); builder.append(", PARTPARAM2 = "); builder.append(DistributedDB.quote(partparam2)); builder.append(" WHERE (TNAME = "); builder.append(DistributedDB.quote(this.configuration.tableName)); builder.append(" OR TNAME = UCASE("); builder.append(DistributedDB.quote(configuration.tableName) + "))"); if (node == null) return builder.toString(); builder.append(" AND NODEID = "); builder.append(DistributedDB.quote(node.name.substring(4))); return builder.toString(); } /** * A private runner object. */ private class Runner implements Runnable { /** The configuration node associated with a single thread. */ private final ConfigurationNode node; /** The SQL statement to be executed. */ private final String bulkInsertStatement; /** * Initializes a new instance of the RunSQL Runner. * * @param node * The cluster node associated with this instance. * * @param command * The command to execute. */ public Runner(final ConfigurationNode node, final String bulkInsertStatement) { assert null != node; assert null != bulkInsertStatement; this.node = node; this.bulkInsertStatement = bulkInsertStatement; } /** * Executes the insert commands for the node associated with this * instance, and update the catalog datables when insertion finishes. */ public void run() { try { // connect to the node and execute the bulk insert statement. this.node.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { node.log(System.out, "Executing: " + bulkInsertStatement); statement.execute(bulkInsertStatement); node.log(System.out, "Statement executed successfully."); } }); // connect to the catalog and update dtables. catalog.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { // dispatch if the partition method is hash. if (configuration.partitionMethod.equalsIgnoreCase("hash")) { String updateStatement = updateStatement("2", configuration.nodes.get(0).param1, configuration.nodes.get(0).param2, null); // test code catalog.log(System.out, "Executing a update statement " + updateStatement); statement.execute(updateStatement); catalog.log(System.out, "Updated for '" + node.hostname + "'"); if (success == null) success = true; return; } // dispatch if the partition method is range. if (configuration.partitionMethod.equalsIgnoreCase("range")) { for (int i = 0; i < configuration.nodes.size(); i++) { String updateStatement = updateStatement("1", configuration.nodes.get(i).param1, configuration.nodes.get(i).param2, configuration.nodes.get(i).node); // test code catalog.log(System.out, "Executing a update statement " + updateStatement); statement.execute(updateStatement); } catalog.log(System.out, "Updated for '" + node.hostname + "'"); if (success == null) success = true; return; } // no other partition method is supported. throw new ProgramException("Support only range and hash two partition methods."); } }); } catch (final ProgramException e) { success = false; this.node.log(System.err, e.getMessage()); } } } public String toString() { return this.configuration.toString(); } }