package edu.hawaii.ics.yucheng;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.ArrayList;
/**
* A class that loads data from a comma-separated (CSV) file into a distributed
* table on the cluster. The program takes two commandline arguments clustercfg
* and csvfile. The clustercfg file contains access information for the catalog
* DB, the name of the table to be loaded, and the partitioning information. The
* csvfile contains the data to be loaded.
*
* @author Cheng Jade
* @assignment ICS 421 Assignment 2-2
* @date Feb 29, 2010
* @bugs None
*/
public class LoadCSV {
/** The list of column name for the specified table. */
private static ArrayList<String> columnNames = new ArrayList<String>();
/** The configuration information. */
private static Configuration configuration;
/** The catalog DBMS, which is common to all threads. */
private static ConfigurationNode catalog;
/** The CSV file entries. */
private static ArrayList<CSVNode> csvList;
/**
* The main entry point of the application.
*
* @param args
* The command line arguments.
*/
public static void main(final String[] args) {
assert null != args;
// Check for usage errors.
if (args.length != 2) {
final String name = LoadCSV.class.getSimpleName();
System.err.println("Usage: java " + name + " <cfg> <ddl>");
System.err.println(" <cfg> path to a configuration file");
System.err.println(" <ddl> path to a CSV file");
System.exit(0);
return;
}
// Read the configuration and CSV files.
try {
assert null != args[0];
assert null != args[1];
configuration = new Configuration(args[0]);
csvList = readFile(args[1]);
catalog = configuration.getCatalog();
// connect to the catalog and obtain the nodes' information that
// have
// the specified table. Add these nodes to the global configuration.
try {
catalog.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
ResultSet set = statement.executeQuery(
"SELECT * FROM DTABLES WHERE TNAME = '"
+ configuration.getTableName() + "' OR TNAME = "
+ "UCASE('" + configuration.getTableName() + "')");
while (set.next()) {
String driver = set.getString("NODEDRIVER").trim();
String url = set.getString("NODEURL").trim();
String user = set.getString("NODEUSER").trim();
String password = set.getString("NODEPASSWD").trim();
String name = "node" + set.getString("NODEID").trim();
configuration.addNode(new ConfigurationNode(name, driver, url, user, password));
}
}
});
catalog.log(System.out, "Successfully connected and retrieved information from catalog dtables.");
} catch (final Exception e) {
catalog.log(System.err, e.getMessage());
}
// check if the number of nodes in dtable is the same as the number
// of partitions provided in the configuration file.
if (configuration.nodeListsize() != configuration.partitionListsize()) {
System.err.println("The number of nodes in the dtables and the " +
"number of partitions in the config file doesn't match");
System.exit(1);
}
// connect to one node db to figure out the column names for the
// table.
final ConfigurationNode node = configuration.getNode(0);
try {
node.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
ResultSet set = statement.executeQuery("SELECT * FROM "
+ configuration.getTableName().toUpperCase());
ResultSetMetaData meta = set.getMetaData();
for (int i = 1; i <= meta.getColumnCount(); i++)
columnNames.add(meta.getColumnName(i));
}
});
node.log(System.out, "Successfully connected and retrieved column names from a node db.");
} catch (final Exception e) {
node.log(System.err, e.getMessage());
}
// Start threads for each configuration node.
int count = configuration.nodeListsize();
final Thread[] threads = new Thread[count];
for (int i = 0; i < count; i++) {
threads[i] = new Thread(new Runner(configuration, i, csvList, columnNames));
threads[i].start();
}
// wait for threads
for (int i = 0; i < count; i++)
join(threads[i], configuration.getNode(i));
} catch (final ProgramException e) {
System.err.println(e.getMessage());
System.exit(1);
return;
}
// Exit cleanly while debugging from Eclipse.
System.exit(0);
}
/**
* Reads a CSV file and returns an array of CSV nodes.
*
* @param path
* The path to the file.
*
* @return The array of CSV nodes.
*
* @throws ProgramException
* Thrown if there is an error reading the file or parsing the
* data.
*/
private static ArrayList<CSVNode> readFile(final String path) throws ProgramException {
assert null != path;
ArrayList<CSVNode> nodes = new ArrayList<CSVNode>();
try {
Reader reader = new FileReader(path);
String[] fields;
while (null != (fields = CSVParser.parse(reader)))
nodes.add(new CSVNode(fields));
return nodes;
} catch (final IOException e) {
throw new ProgramException(e);
}
}
/**
* Joins the thread and checks for errors. In the case of an error, a
* warning is displayed, but no exception is thrown.
*/
private static void join(Thread thread, ConfigurationNode node) {
try {
thread.join();
} catch (final InterruptedException e) {
node.log(System.err, "Failed to respond");
}
}
}
// each runner works on one node. It takes a node info, the CSV list, column
// names, which column is used for partition, and the partition method. It loop
// over the CSV list compute whether this CSV node should be added to this db,
// if yes, create a insert statement using the column names and the CSV node,
// then execute the insert statement. Then it should connect to the catalog db
// and update the corresponding entry's partition info for this table this node.
class Runner implements Runnable {
/** Index of the node id for a single thread. */
private final int index;
/** The CSV list to be evaluated and inserted. */
private final ArrayList<CSVNode> csvList;
/** The column names used to make the insert statement. */
private final ArrayList<String> columnNames;
/** The column names used to make the insert statement. */
private final Configuration configuration;
public Runner(final Configuration configuration,
int index,
ArrayList<CSVNode> csvList,
ArrayList<String> columnNames) {
assert null != configuration;
assert null != csvList;
assert null != columnNames;
this.configuration = configuration;
this.index = index;
this.csvList = csvList;
this.columnNames = columnNames;
}
/**
* Executes the DDL commands for the node associated with this instance.
*/
public void run() {
final ConfigurationNode node = configuration.getNode(index);
final ConfigurationNode catalog = configuration.getCatalog();
// loop over csv list and load the matching rows.
try {
node.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
int count = 0;
for (int i = 0; i < csvList.size(); i++) {
CSVNode csvNode = csvList.get(i);
int partitionTo = csvNode.partitionTo(configuration, columnNames);
if (!node.name.endsWith(Integer.toString(partitionTo)))
continue;
String insertStatement = insertStatement(csvNode);
// test code
node.log(System.out, "Executing a insert statement " + insertStatement);
statement.execute(insertStatement);
count++;
}
node.log(System.out, +count + " items inserted.");
}
});
} catch (final Exception e) {
node.log(System.err, e.getMessage());
System.exit(1);
}
// connect to the catalog db and update the corresponding entry with the
// partition related columns.
try {
catalog.runStatement(new StatementRunner() {
public void run(Statement statement) throws ProgramException, SQLException {
// dispatch if the partition method is hash.
if (configuration.getPartitionMethod().equalsIgnoreCase("hash")) {
String updateStatement = updateStatement("2",
configuration.getPartitionColumn(),
configuration.getPartition(0).param1,
configuration.getPartition(0).param2, null);
// test code
catalog.log(System.out, "Executing a update statement " + updateStatement);
statement.execute(updateStatement);
catalog.log(System.out, "Updated for '" + node.hostname + "'");
return;
}
// dispatch if the partition method is range.
if (configuration.getPartitionMethod().equalsIgnoreCase("range")) {
for (int i = 0; i < configuration.nodeListsize(); i++) {
String updateStatement = updateStatement("1",
configuration.getPartitionColumn(),
configuration.getPartition(i).param1,
configuration.getPartition(i).param2,
configuration.getNode(i));
// test code
catalog.log(System.out, "Executing a update statement " + updateStatement);
statement.execute(updateStatement);
}
catalog.log(System.out, "Updated for '" + node.hostname + "'");
return;
}
// no other partition method is supported.
throw new ProgramException("Support only range and hash two partition methods.");
}
});
} catch (final Exception e) {
catalog.log(System.err, e.getMessage());
}
}
// create a insert statement based on the csv info.
private String insertStatement(CSVNode csvNode) {
StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO ");
builder.append(configuration.getTableName().toUpperCase());
builder.append(" VALUES (");
for (int i = 0; i < csvNode.size() - 1; i++) {
builder.append(quoteString(csvNode.get(i)));
builder.append(", ");
}
builder.append(quoteString(csvNode.get(csvNode.size() - 1)));
builder.append(")");
return builder.toString();
}
// create a update statement based on the partition info to modify dtables
private String updateStatement(
String partmtd,
String partcol,
String partparam1,
String partparam2,
ConfigurationNode node) {
assert null != partmtd;
assert null != partcol;
assert null != partparam1;
partparam2 = partparam2 == null ? "" : partparam2;
StringBuilder builder = new StringBuilder();
builder.append("UPDATE DTABLES");
builder.append(" SET PARTMTD = ");
builder.append(quoteString(partmtd));
builder.append(", PARTCOL = ");
builder.append(quoteString(partcol));
builder.append(", PARTPARAM1 = ");
builder.append(quoteString(partparam1));
builder.append(", PARTPARAM2 = ");
builder.append(quoteString(partparam2));
builder.append(" WHERE (TNAME = ");
builder.append(quoteString(configuration.getTableName()));
builder.append(" OR TNAME = UCASE(");
builder.append(quoteString(configuration.getTableName()) + "))");
if (node == null)
return builder.toString();
builder.append(" AND NODEID = ");
builder.append(quoteString(node.name.substring(4)));
return builder.toString();
}
// a helper method to put quotes.
private static String quoteString(String toQuote) {
assert null != toQuote;
return "'" + toQuote + "'";
}
}