package edu.hawaii.ics.yucheng;
import java.io.StringReader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
/**
* A class implements Runnable. The run method takes a list of CSV entries and
* upload them into corresponding tables on the distributed database system.
*
* @author Cheng Jade
* @assignment ICS 421 Project
* @date Mar 22, 2010
* @bugs None
*/
public class LoadCSV implements Runnable {
/* the create or drop configuration object. */
public final LoadCSVConfiguration configuration;
/* the catalog node extracted from the overall configuration. */
public final ConfigurationNode catalog;
/* a list of SQL commands to be executed. */
private final ArrayList<String[]> csvs = new ArrayList<String[]>();
/* a record of whether all CSV have been loaded successfully. */
private Boolean success = null;
/**
* Initialize a new instance of this object
*/
public LoadCSV(final LoadCSVConfiguration configuration, final ArrayList<String> rawCSV) throws ProgramException {
if (null == configuration)
throw new NullPointerException("configuration");
if (null == rawCSV)
throw new NullPointerException("rawCSV");
// initialize configuration and catalog.
this.configuration = configuration;
this.catalog = this.configuration.catalog;
// stitch rawCSV lines into one big string.
final StringBuilder builder = new StringBuilder();
for (final String item : rawCSV)
builder.append(item + "\n");
final StringReader reader = new StringReader(builder.toString());
// parse the string and obtain all CSV entries and fields of each entry.
String[] fields;
while (null != (fields = CSVParser.parse(reader)))
this.csvs.add(fields);
}
/**
* The main routine that insert rows to corresponding nodes and update the
* catalog dtables.
*/
public void run() {
try {
// For each node, construct a list of CSVs that will be partitioned
// to this node.
final ArrayList<CSVsNodePair> csvsToNodes = new ArrayList<CSVsNodePair>();
for (int i = 0; i < this.configuration.nodes.size(); i++)
csvsToNodes.add(new CSVsNodePair(this.configuration.nodes.get(i).node));
// Exam every CSV and decide which list this CSV belongs.
final ArrayList<String> columnNames = this.columnNames();
for (final String[] item : this.csvs) {
final int toID = this.partitionTo(columnNames, item);
for (final CSVsNodePair pair : csvsToNodes) {
if (Integer.parseInt(pair.node.name.substring(4)) == toID)
pair.nodeCSVs.add(item);
}
}
// Create a bulk insert statement for each node and start a thread
// on each node to execute this insert statement. Threads also
// update the catalog dtables.
final int count = csvsToNodes.size();
final Thread[] threads = new Thread[count];
for (int i = 0; i < count; i++) {
final CSVsNodePair pair = csvsToNodes.get(i);
final String bulkInsertStatement = this.bulkInsertStatement(pair);
threads[i] = new Thread(new Runner(pair.node, bulkInsertStatement));
threads[i].start();
}
// wait for threads.
for (int i = 0; i < csvsToNodes.size(); i++)
DistributedDB.join(threads[i], csvsToNodes.get(i).node);
// Print message indicating if all commands completed successfully.
if (this.success)
System.out.println("All CSV were loaded successfully.");
else
System.out.println("Not all CSV were loaded successfully.");
} catch (final ProgramException e) {
System.err.println(e.getMessage());
System.exit(1);
return;
}
}
/**
* Connect to a node and retrieve the columns names of this table.
*
* @return The list of column names in the target table.
*/
private ArrayList<String> columnNames() throws ProgramException {
final ConfigurationNode node = this.configuration.nodes.get(0).node;
final ArrayList<String> columnNames = new ArrayList<String>();
try {
node.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
final ResultSet set = statement.executeQuery("SELECT * FROM " + LoadCSV.this.configuration.tableName);
final ResultSetMetaData meta = set.getMetaData();
for (int i = 1; i <= meta.getColumnCount(); i++)
columnNames.add(meta.getColumnName(i));
}
});
node.log(System.out, "Successfully connected and retrieved column names from a node db.");
return columnNames;
} catch (final Exception e) {
node.log(System.err, e.getMessage());
throw new ProgramException("columnNames " + e);
}
}
/**
* Calculates the node ID that a specified CSV entry will be partitioned to.
*
* @param columnNames
* The list of columns names retrieved from target table.
*
* @param csv
* The specified CSV entry to be calculated.
*
* @return The node ID that this CSV entry will be partitioned to.
*
* @throws ProgramException
* Thrown if there are any errors processing the CSV file.
* @throws NullPointerException
* Thrown if the argument is null.
*/
private int partitionTo(final ArrayList<String> columnNames, final String[] csv) throws ProgramException {
if (null == columnNames)
throw new NullPointerException("columnNames");
if (null == csv)
throw new NullPointerException("csv");
int indexOfPartitionColumn = -1;
final String partitionColumn = this.configuration.partitionColumn;
for (int i = 0; i < columnNames.size(); i++) {
if (columnNames.get(i).equalsIgnoreCase(partitionColumn)) {
indexOfPartitionColumn = i;
break;
}
}
if (indexOfPartitionColumn == -1)
throw new ProgramException("Mismatch partition table name and the dtables");
final String partitionValue = csv[indexOfPartitionColumn];
try {
final int intPartitionValue = (int) Float.parseFloat(partitionValue);
if (this.configuration.partitionMethod.equalsIgnoreCase("range")) {
for (int i = 0; i < this.configuration.nodes.size(); i++) {
final int param1 = (int) Float.parseFloat(this.configuration.nodes.get(i).param1);
final int param2 = (int) Float.parseFloat(this.configuration.nodes.get(i).param2);
if (intPartitionValue > param1 && intPartitionValue <= param2)
return i + 1;
}
throw new ProgramException("Partition value out of range");
}
if (this.configuration.partitionMethod.equalsIgnoreCase("hash")) {
final int param1 = Integer.parseInt(this.configuration.nodes.get(0).param1);
return (intPartitionValue % param1) + 1;
}
throw new ProgramException("Support only range and hash partition methods.");
} catch (final NumberFormatException e) {
throw new ProgramException("Support only numeric partion values");
}
}
/**
* A private class that groups a node and its list of CSVs together.
*/
private class CSVsNodePair {
public final ConfigurationNode node;
public final ArrayList<String[]> nodeCSVs = new ArrayList<String[]>();
public CSVsNodePair(final ConfigurationNode node) {
this.node = node;
}
}
/**
* Create and return a bulk insert statement.
*
* @param pair
* A CSVsNodepair used to generate insert statement.
*
* @return The bulk insert statement created.
*/
private String bulkInsertStatement(final CSVsNodePair pair) {
final StringBuilder builder = new StringBuilder();
builder.append("INSERT INTO " + this.configuration.tableName + " VALUES \n");
final int nodeNum = pair.nodeCSVs.size();
for (int i = 0; i < nodeNum; i++) {
final String[] csv = pair.nodeCSVs.get(i);
builder.append("\t\t\t\t(");
for (int j = 0; j < csv.length - 1; j++)
builder.append(DistributedDB.quote(csv[j]) + ", ");
builder.append(DistributedDB.quote(csv[csv.length - 1]) + ")");
if (i != nodeNum - 1)
builder.append(", \n");
}
return builder.toString();
}
/**
* Create and return an update statement based on the partition information.
* This SQL statement will be used to modify dtables.
*
* @param partmtd
* An dtable entry, 1 for range, 2 for hash
*
* @param partparam1
* An dtable entry, partition parameter 1
*
* @param partparam2
* An dtable entry, partition parameter 2
*
* @param node
* A ConfigurationNode to figure out the node ID entry of dtables
*
* @return The the update statement
*/
private String updateStatement(final String partmtd, final String partparam1, String partparam2, final ConfigurationNode node) {
assert null != partmtd;
assert null != partparam1;
partparam2 = partparam2 == null ? "" : partparam2;
final StringBuilder builder = new StringBuilder();
builder.append("UPDATE DTABLES");
builder.append(" SET PARTMTD = ");
builder.append(DistributedDB.quote(partmtd));
builder.append(", PARTCOL = ");
builder.append(DistributedDB.quote(this.configuration.partitionColumn));
builder.append(", PARTPARAM1 = ");
builder.append(DistributedDB.quote(partparam1));
builder.append(", PARTPARAM2 = ");
builder.append(DistributedDB.quote(partparam2));
builder.append(" WHERE (TNAME = ");
builder.append(DistributedDB.quote(this.configuration.tableName));
builder.append(" OR TNAME = UCASE(");
builder.append(DistributedDB.quote(this.configuration.tableName) + "))");
if (node == null)
return builder.toString();
builder.append(" AND NODEID = ");
builder.append(DistributedDB.quote(node.name.substring(4)));
return builder.toString();
}
/**
* A private runner object.
*/
private class Runner implements Runnable {
/** The configuration node associated with a single thread. */
private final ConfigurationNode node;
/** The SQL statement to be executed. */
private final String bulkInsertStatement;
/**
* Initializes a new instance of the RunSQL Runner.
*
* @param node
* The cluster node associated with this instance.
*
* @param command
* The command to execute.
*/
public Runner(final ConfigurationNode node, final String bulkInsertStatement) {
assert null != node;
assert null != bulkInsertStatement;
this.node = node;
this.bulkInsertStatement = bulkInsertStatement;
}
/**
* Executes the insert commands for the node associated with this
* instance, and update the catalog datables when insertion finishes.
*/
public void run() {
try {
// connect to the node and execute the bulk insert statement.
this.node.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
Runner.this.node.log(System.out, "Executing: " + Runner.this.bulkInsertStatement);
statement.execute(Runner.this.bulkInsertStatement);
Runner.this.node.log(System.out, "Statement executed successfully.");
}
});
// connect to the catalog and update dtables.
LoadCSV.this.catalog.runStatement(new StatementRunner() {
public void run(final Statement statement) throws ProgramException, SQLException {
// dispatch if the partition method is hash.
if (LoadCSV.this.configuration.partitionMethod.equalsIgnoreCase("hash")) {
final String updateStatement = LoadCSV.this.updateStatement("2", LoadCSV.this.configuration.nodes.get(0).param1, LoadCSV.this.configuration.nodes.get(0).param2, null);
// test code
LoadCSV.this.catalog.log(System.out, "Executing a update statement " + updateStatement);
statement.execute(updateStatement);
LoadCSV.this.catalog.log(System.out, "Updated for '" + Runner.this.node.hostname + "'");
if (LoadCSV.this.success == null)
LoadCSV.this.success = true;
return;
}
// dispatch if the partition method is range.
if (LoadCSV.this.configuration.partitionMethod.equalsIgnoreCase("range")) {
for (int i = 0; i < LoadCSV.this.configuration.nodes.size(); i++) {
final String updateStatement = LoadCSV.this.updateStatement("1", LoadCSV.this.configuration.nodes.get(i).param1, LoadCSV.this.configuration.nodes.get(i).param2, LoadCSV.this.configuration.nodes.get(i).node);
// test code
LoadCSV.this.catalog.log(System.out, "Executing a update statement " + updateStatement);
statement.execute(updateStatement);
}
LoadCSV.this.catalog.log(System.out, "Updated for '" + Runner.this.node.hostname + "'");
if (LoadCSV.this.success == null)
LoadCSV.this.success = true;
return;
}
// no other partition method is supported.
throw new ProgramException("Support only range and hash two partition methods.");
}
});
} catch (final ProgramException e) {
LoadCSV.this.success = false;
this.node.log(System.err, e.getMessage());
}
}
}
@Override
public String toString() {
return this.configuration.toString();
}
}