LoadCSV.java

package edu.hawaii.ics.yucheng;

import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.ArrayList;

/**
 * A class that loads data from a comma-separated (CSV) file into a distributed
 * table on the cluster. The program takes two commandline arguments clustercfg
 * and csvfile. The clustercfg file contains access information for the catalog
 * DB, the name of the table to be loaded, and the partitioning information. The
 * csvfile contains the data to be loaded.
 * 
 * @author Cheng Jade
 * @assignment ICS 421 Assignment 2-2
 * @date Feb 29, 2010
 * @bugs None
 */

public class LoadCSV {

    /** The list of column name for the specified table. */
    private static ArrayList<String> columnNames = new ArrayList<String>();

    /** The configuration information. */
    private static Configuration configuration;

    /** The catalog DBMS, which is common to all threads. */
    private static ConfigurationNode catalog;

    /** The CSV file entries. */
    private static ArrayList<CSVNode> csvList;

    /**
     * The main entry point of the application.
     * 
     * @param args
     *            The command line arguments.
     */
    public static void main(final String[] args) {
        assert null != args;

        // Check for usage errors.
        if (args.length != 2) {
            final String name = LoadCSV.class.getSimpleName();
            System.err.println("Usage: java " + name + " <cfg> <ddl>");
            System.err.println("       <cfg> path to a configuration file");
            System.err.println("       <ddl> path to a CSV file");
            System.exit(0);
            return;
        }

        // Read the configuration and CSV files.
        try {
            assert null != args[0];
            assert null != args[1];

            configuration = new Configuration(args[0]);

            csvList = readFile(args[1]);
            catalog = configuration.getCatalog();

            // connect to the catalog and obtain the nodes' information that
            // have
            // the specified table. Add these nodes to the global configuration.
            try {
                catalog.runStatement(new StatementRunner() {
                    public void run(Statement statement) throws ProgramException, SQLException {

                        ResultSet set = statement.executeQuery(
                                "SELECT * FROM DTABLES WHERE TNAME = '"
                                + configuration.getTableName() + "' OR TNAME = "
                                + "UCASE('" + configuration.getTableName() + "')");
                        while (set.next()) {
                            String driver = set.getString("NODEDRIVER").trim();
                            String url = set.getString("NODEURL").trim();
                            String user = set.getString("NODEUSER").trim();
                            String password = set.getString("NODEPASSWD").trim();
                            String name = "node" + set.getString("NODEID").trim();
                            configuration.addNode(new ConfigurationNode(name, driver, url, user, password));
                        }
                    }
                });
                catalog.log(System.out, "Successfully connected and retrieved information from catalog dtables.");
            } catch (final Exception e) {
                catalog.log(System.err, e.getMessage());
            }

            // check if the number of nodes in dtable is the same as the number
            // of partitions provided in the configuration file.
            if (configuration.nodeListsize() != configuration.partitionListsize()) {
                System.err.println("The number of nodes in the dtables and the " +
                        "number of partitions in the config file doesn't match");
                System.exit(1);
            }

            // connect to one node db to figure out the column names for the
            // table.
            final ConfigurationNode node = configuration.getNode(0);
            try {
                node.runStatement(new StatementRunner() {
                    public void run(Statement statement) throws ProgramException, SQLException {
                        ResultSet set = statement.executeQuery("SELECT * FROM "
                                + configuration.getTableName().toUpperCase());
                        ResultSetMetaData meta = set.getMetaData();
                        for (int i = 1; i <= meta.getColumnCount(); i++)
                            columnNames.add(meta.getColumnName(i));
                    }
                });
                node.log(System.out, "Successfully connected and retrieved column names from a node db.");
            } catch (final Exception e) {
                node.log(System.err, e.getMessage());
            }

            // Start threads for each configuration node.
            int count = configuration.nodeListsize();
            final Thread[] threads = new Thread[count];
            for (int i = 0; i < count; i++) {
                threads[i] = new Thread(new Runner(configuration, i, csvList, columnNames));
                threads[i].start();
            }

            // wait for threads
            for (int i = 0; i < count; i++)
                join(threads[i], configuration.getNode(i));

        } catch (final ProgramException e) {
            System.err.println(e.getMessage());
            System.exit(1);
            return;
        }

        // Exit cleanly while debugging from Eclipse.
        System.exit(0);
    }

    /**
     * Reads a CSV file and returns an array of CSV nodes.
     * 
     * @param path
     *            The path to the file.
     * 
     * @return The array of CSV nodes.
     * 
     * @throws ProgramException
     *             Thrown if there is an error reading the file or parsing the
     *             data.
     */
    private static ArrayList<CSVNode> readFile(final String path) throws ProgramException {
        assert null != path;

        ArrayList<CSVNode> nodes = new ArrayList<CSVNode>();

        try {
            Reader reader = new FileReader(path);
            String[] fields;
            while (null != (fields = CSVParser.parse(reader)))
                nodes.add(new CSVNode(fields));

            return nodes;
        } catch (final IOException e) {
            throw new ProgramException(e);
        }
    }

    /**
     * Joins the thread and checks for errors. In the case of an error, a
     * warning is displayed, but no exception is thrown.
     */
    private static void join(Thread thread, ConfigurationNode node) {
        try {
            thread.join();
        } catch (final InterruptedException e) {
            node.log(System.err, "Failed to respond");
        }
    }

}

// each runner works on one node. It takes a node info, the CSV list, column
// names, which column is used for partition, and the partition method. It loop
// over the CSV list compute whether this CSV node should be added to this db,
// if yes, create a insert statement using the column names and the CSV node,
// then execute the insert statement. Then it should connect to the catalog db
// and update the corresponding entry's partition info for this table this node.
class Runner implements Runnable {

    /** Index of the node id for a single thread. */
    private final int index;

    /** The CSV list to be evaluated and inserted. */
    private final ArrayList<CSVNode> csvList;

    /** The column names used to make the insert statement. */
    private final ArrayList<String> columnNames;

    /** The column names used to make the insert statement. */
    private final Configuration configuration;

    public Runner(final Configuration configuration,
            int index,
            ArrayList<CSVNode> csvList,
            ArrayList<String> columnNames) {
        assert null != configuration;
        assert null != csvList;
        assert null != columnNames;

        this.configuration = configuration;
        this.index = index;
        this.csvList = csvList;
        this.columnNames = columnNames;
    }

    /**
     * Executes the DDL commands for the node associated with this instance.
     */
    public void run() {
        final ConfigurationNode node = configuration.getNode(index);
        final ConfigurationNode catalog = configuration.getCatalog();

        // loop over csv list and load the matching rows.
        try {
            node.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    int count = 0;
                    for (int i = 0; i < csvList.size(); i++) {
                        CSVNode csvNode = csvList.get(i);
                        int partitionTo = csvNode.partitionTo(configuration, columnNames);
                        if (!node.name.endsWith(Integer.toString(partitionTo)))
                            continue;
                        String insertStatement = insertStatement(csvNode);
                        // test code
                        node.log(System.out, "Executing a insert statement " + insertStatement);
                        statement.execute(insertStatement);
                        count++;
                    }
                    node.log(System.out, +count + " items inserted.");
                }
            });

        } catch (final Exception e) {
            node.log(System.err, e.getMessage());
            System.exit(1);
        }

        // connect to the catalog db and update the corresponding entry with the
        // partition related columns.
        try {
            catalog.runStatement(new StatementRunner() {
                public void run(Statement statement) throws ProgramException, SQLException {

                    // dispatch if the partition method is hash.
                    if (configuration.getPartitionMethod().equalsIgnoreCase("hash")) {
                        String updateStatement = updateStatement("2",
                                configuration.getPartitionColumn(),
                                configuration.getPartition(0).param1,
                                configuration.getPartition(0).param2, null);
                        // test code
                        catalog.log(System.out, "Executing a update statement " + updateStatement);
                        statement.execute(updateStatement);
                        catalog.log(System.out, "Updated for '" + node.hostname + "'");
                        return;
                    }

                    // dispatch if the partition method is range.
                    if (configuration.getPartitionMethod().equalsIgnoreCase("range")) {
                        for (int i = 0; i < configuration.nodeListsize(); i++) {
                            String updateStatement = updateStatement("1",
                                    configuration.getPartitionColumn(),
                                    configuration.getPartition(i).param1,
                                    configuration.getPartition(i).param2,
                                    configuration.getNode(i));
                            // test code
                            catalog.log(System.out, "Executing a update statement " + updateStatement);
                            statement.execute(updateStatement);
                        }
                        catalog.log(System.out, "Updated for '" + node.hostname + "'");
                        return;
                    }
                    // no other partition method is supported.
                    throw new ProgramException("Support only range and hash two partition methods.");
                }
            });

        } catch (final Exception e) {
            catalog.log(System.err, e.getMessage());
        }
    }

    // create a insert statement based on the csv info.
    private String insertStatement(CSVNode csvNode) {
        StringBuilder builder = new StringBuilder();
        builder.append("INSERT INTO ");
        builder.append(configuration.getTableName().toUpperCase());
        builder.append(" VALUES (");
        for (int i = 0; i < csvNode.size() - 1; i++) {
            builder.append(quoteString(csvNode.get(i)));
            builder.append(", ");
        }
        builder.append(quoteString(csvNode.get(csvNode.size() - 1)));
        builder.append(")");
        return builder.toString();
    }

    // create a update statement based on the partition info to modify dtables
    private String updateStatement(
            String partmtd,
            String partcol,
            String partparam1,
            String partparam2,
            ConfigurationNode node) {
        assert null != partmtd;
        assert null != partcol;
        assert null != partparam1;

        partparam2 = partparam2 == null ? "" : partparam2;

        StringBuilder builder = new StringBuilder();
        builder.append("UPDATE DTABLES");
        builder.append(" SET PARTMTD = ");
        builder.append(quoteString(partmtd));
        builder.append(", PARTCOL = ");
        builder.append(quoteString(partcol));
        builder.append(", PARTPARAM1 = ");
        builder.append(quoteString(partparam1));
        builder.append(", PARTPARAM2 = ");
        builder.append(quoteString(partparam2));
        builder.append(" WHERE (TNAME = ");
        builder.append(quoteString(configuration.getTableName()));
        builder.append(" OR TNAME = UCASE(");
        builder.append(quoteString(configuration.getTableName()) + "))");
        if (node == null)
            return builder.toString();
        builder.append(" AND NODEID = ");
        builder.append(quoteString(node.name.substring(4)));
        return builder.toString();

    }

    // a helper method to put quotes.
    private static String quoteString(String toQuote) {
        assert null != toQuote;
        return "'" + toQuote + "'";
    }

}
Valid HTML 4.01 Valid CSS