package edu.hawaii.ics.yucheng; import java.io.FileReader; import java.io.IOException; import java.io.Reader; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; /** * A class that loads data from a comma-separated (CSV) file into a distributed * table on the cluster. The program takes two commandline arguments clustercfg * and csvfile. The clustercfg file contains access information for the catalog * DB, the name of the table to be loaded, and the partitioning information. The * csvfile contains the data to be loaded. * * @author Cheng Jade * @assignment ICS 421 Assignment 2-2 * @date Feb 29, 2010 * @bugs None */ public class LoadCSV { /** The list of column name for the specified table. */ private static ArrayList<String> columnNames = new ArrayList<String>(); /** The configuration information. */ private static Configuration configuration; /** The catalog DBMS, which is common to all threads. */ private static ConfigurationNode catalog; /** The CSV file entries. */ private static ArrayList<CSVNode> csvList; /** * The main entry point of the application. * * @param args * The command line arguments. */ public static void main(final String[] args) { assert null != args; // Check for usage errors. if (args.length != 2) { final String name = LoadCSV.class.getSimpleName(); System.err.println("Usage: java " + name + " <cfg> <ddl>"); System.err.println(" <cfg> path to a configuration file"); System.err.println(" <ddl> path to a CSV file"); System.exit(0); return; } // Read the configuration and CSV files. try { assert null != args[0]; assert null != args[1]; configuration = new Configuration(args[0]); csvList = readFile(args[1]); catalog = configuration.getCatalog(); // connect to the catalog and obtain the nodes' information that // have // the specified table. Add these nodes to the global configuration. try { catalog.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { ResultSet set = statement.executeQuery( "SELECT * FROM DTABLES WHERE TNAME = '" + configuration.getTableName() + "' OR TNAME = " + "UCASE('" + configuration.getTableName() + "')"); while (set.next()) { String driver = set.getString("NODEDRIVER").trim(); String url = set.getString("NODEURL").trim(); String user = set.getString("NODEUSER").trim(); String password = set.getString("NODEPASSWD").trim(); String name = "node" + set.getString("NODEID").trim(); configuration.addNode(new ConfigurationNode(name, driver, url, user, password)); } } }); catalog.log(System.out, "Successfully connected and retrieved information from catalog dtables."); } catch (final Exception e) { catalog.log(System.err, e.getMessage()); } // check if the number of nodes in dtable is the same as the number // of partitions provided in the configuration file. if (configuration.nodeListsize() != configuration.partitionListsize()) { System.err.println("The number of nodes in the dtables and the " + "number of partitions in the config file doesn't match"); System.exit(1); } // connect to one node db to figure out the column names for the // table. final ConfigurationNode node = configuration.getNode(0); try { node.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { ResultSet set = statement.executeQuery("SELECT * FROM " + configuration.getTableName().toUpperCase()); ResultSetMetaData meta = set.getMetaData(); for (int i = 1; i <= meta.getColumnCount(); i++) columnNames.add(meta.getColumnName(i)); } }); node.log(System.out, "Successfully connected and retrieved column names from a node db."); } catch (final Exception e) { node.log(System.err, e.getMessage()); } // Start threads for each configuration node. int count = configuration.nodeListsize(); final Thread[] threads = new Thread[count]; for (int i = 0; i < count; i++) { threads[i] = new Thread(new Runner(configuration, i, csvList, columnNames)); threads[i].start(); } // wait for threads for (int i = 0; i < count; i++) join(threads[i], configuration.getNode(i)); } catch (final ProgramException e) { System.err.println(e.getMessage()); System.exit(1); return; } // Exit cleanly while debugging from Eclipse. System.exit(0); } /** * Reads a CSV file and returns an array of CSV nodes. * * @param path * The path to the file. * * @return The array of CSV nodes. * * @throws ProgramException * Thrown if there is an error reading the file or parsing the * data. */ private static ArrayList<CSVNode> readFile(final String path) throws ProgramException { assert null != path; ArrayList<CSVNode> nodes = new ArrayList<CSVNode>(); try { Reader reader = new FileReader(path); String[] fields; while (null != (fields = CSVParser.parse(reader))) nodes.add(new CSVNode(fields)); return nodes; } catch (final IOException e) { throw new ProgramException(e); } } /** * Joins the thread and checks for errors. In the case of an error, a * warning is displayed, but no exception is thrown. */ private static void join(Thread thread, ConfigurationNode node) { try { thread.join(); } catch (final InterruptedException e) { node.log(System.err, "Failed to respond"); } } } // each runner works on one node. It takes a node info, the CSV list, column // names, which column is used for partition, and the partition method. It loop // over the CSV list compute whether this CSV node should be added to this db, // if yes, create a insert statement using the column names and the CSV node, // then execute the insert statement. Then it should connect to the catalog db // and update the corresponding entry's partition info for this table this node. class Runner implements Runnable { /** Index of the node id for a single thread. */ private final int index; /** The CSV list to be evaluated and inserted. */ private final ArrayList<CSVNode> csvList; /** The column names used to make the insert statement. */ private final ArrayList<String> columnNames; /** The column names used to make the insert statement. */ private final Configuration configuration; public Runner(final Configuration configuration, int index, ArrayList<CSVNode> csvList, ArrayList<String> columnNames) { assert null != configuration; assert null != csvList; assert null != columnNames; this.configuration = configuration; this.index = index; this.csvList = csvList; this.columnNames = columnNames; } /** * Executes the DDL commands for the node associated with this instance. */ public void run() { final ConfigurationNode node = configuration.getNode(index); final ConfigurationNode catalog = configuration.getCatalog(); // loop over csv list and load the matching rows. try { node.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { int count = 0; for (int i = 0; i < csvList.size(); i++) { CSVNode csvNode = csvList.get(i); int partitionTo = csvNode.partitionTo(configuration, columnNames); if (!node.name.endsWith(Integer.toString(partitionTo))) continue; String insertStatement = insertStatement(csvNode); // test code node.log(System.out, "Executing a insert statement " + insertStatement); statement.execute(insertStatement); count++; } node.log(System.out, +count + " items inserted."); } }); } catch (final Exception e) { node.log(System.err, e.getMessage()); System.exit(1); } // connect to the catalog db and update the corresponding entry with the // partition related columns. try { catalog.runStatement(new StatementRunner() { public void run(Statement statement) throws ProgramException, SQLException { // dispatch if the partition method is hash. if (configuration.getPartitionMethod().equalsIgnoreCase("hash")) { String updateStatement = updateStatement("2", configuration.getPartitionColumn(), configuration.getPartition(0).param1, configuration.getPartition(0).param2, null); // test code catalog.log(System.out, "Executing a update statement " + updateStatement); statement.execute(updateStatement); catalog.log(System.out, "Updated for '" + node.hostname + "'"); return; } // dispatch if the partition method is range. if (configuration.getPartitionMethod().equalsIgnoreCase("range")) { for (int i = 0; i < configuration.nodeListsize(); i++) { String updateStatement = updateStatement("1", configuration.getPartitionColumn(), configuration.getPartition(i).param1, configuration.getPartition(i).param2, configuration.getNode(i)); // test code catalog.log(System.out, "Executing a update statement " + updateStatement); statement.execute(updateStatement); } catalog.log(System.out, "Updated for '" + node.hostname + "'"); return; } // no other partition method is supported. throw new ProgramException("Support only range and hash two partition methods."); } }); } catch (final Exception e) { catalog.log(System.err, e.getMessage()); } } // create a insert statement based on the csv info. private String insertStatement(CSVNode csvNode) { StringBuilder builder = new StringBuilder(); builder.append("INSERT INTO "); builder.append(configuration.getTableName().toUpperCase()); builder.append(" VALUES ("); for (int i = 0; i < csvNode.size() - 1; i++) { builder.append(quoteString(csvNode.get(i))); builder.append(", "); } builder.append(quoteString(csvNode.get(csvNode.size() - 1))); builder.append(")"); return builder.toString(); } // create a update statement based on the partition info to modify dtables private String updateStatement( String partmtd, String partcol, String partparam1, String partparam2, ConfigurationNode node) { assert null != partmtd; assert null != partcol; assert null != partparam1; partparam2 = partparam2 == null ? "" : partparam2; StringBuilder builder = new StringBuilder(); builder.append("UPDATE DTABLES"); builder.append(" SET PARTMTD = "); builder.append(quoteString(partmtd)); builder.append(", PARTCOL = "); builder.append(quoteString(partcol)); builder.append(", PARTPARAM1 = "); builder.append(quoteString(partparam1)); builder.append(", PARTPARAM2 = "); builder.append(quoteString(partparam2)); builder.append(" WHERE (TNAME = "); builder.append(quoteString(configuration.getTableName())); builder.append(" OR TNAME = UCASE("); builder.append(quoteString(configuration.getTableName()) + "))"); if (node == null) return builder.toString(); builder.append(" AND NODEID = "); builder.append(quoteString(node.name.substring(4))); return builder.toString(); } // a helper method to put quotes. private static String quoteString(String toQuote) { assert null != toQuote; return "'" + toQuote + "'"; } }