View Javadoc
1   package net.sumaris.importation.dao;
2   
3   /*-
4    * #%L
5    * SUMARiS:: Core Importation
6    * %%
7    * Copyright (C) 2018 - 2019 SUMARiS Consortium
8    * %%
9    * This program is free software: you can redistribute it and/or modify
10   * it under the terms of the GNU General Public License as
11   * published by the Free Software Foundation, either version 3 of the
12   * License, or (at your option) any later version.
13   * 
14   * This program is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU General Public License for more details.
18   * 
19   * You should have received a copy of the GNU General Public
20   * License along with this program.  If not, see
21   * <http://www.gnu.org/licenses/gpl-3.0.html>.
22   * #L%
23   */
24  
25  import com.google.common.base.Preconditions;
26  import com.google.common.collect.Lists;
27  import com.google.common.collect.Sets;
28  import net.sumaris.core.dao.technical.Daos;
29  import net.sumaris.core.dao.technical.hibernate.HibernateDaoSupport;
30  import net.sumaris.core.dao.technical.schema.*;
31  import net.sumaris.core.exception.SumarisTechnicalException;
32  import net.sumaris.importation.service.vo.DataLoadError;
33  import net.sumaris.importation.service.vo.DataLoadResult;
34  import net.sumaris.importation.util.csv.FileMessageFormatter;
35  import net.sumaris.importation.util.csv.FileReader;
36  import org.apache.commons.lang3.StringUtils;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  import org.hibernate.type.BooleanType;
40  import org.hibernate.type.DoubleType;
41  import org.hibernate.type.FloatType;
42  import org.hibernate.type.IntegerType;
43  import org.nuiton.i18n.I18n;
44  import org.springframework.beans.factory.annotation.Autowired;
45  import org.springframework.context.NoSuchMessageException;
46  import org.springframework.dao.DataRetrievalFailureException;
47  import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
48  import org.springframework.jdbc.datasource.DataSourceUtils;
49  import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
50  import org.springframework.stereotype.Repository;
51  
52  import javax.persistence.EntityManager;
53  import javax.sql.DataSource;
54  import java.io.IOException;
55  import java.io.Serializable;
56  import java.sql.*;
57  import java.text.DecimalFormatSymbols;
58  import java.text.NumberFormat;
59  import java.text.ParseException;
60  import java.util.HashSet;
61  import java.util.List;
62  import java.util.Locale;
63  import java.util.Set;
64  
65  import static net.sumaris.importation.service.vo.DataLoadError.ErrorType;
66  
67  @Repository("dataLoaderDao")
68  public class DataLoaderDaoImpl extends HibernateDaoSupport implements DataLoaderDao {
69  
70  	private static final Logger log = LoggerFactory.getLogger(DataLoaderDaoImpl.class);
71  	private static final Logger hibernateLog = LoggerFactory.getLogger("org.hibernate.SQL");
72  
73  	public final static int MAX_LOG_ERRORS = 500;
74  	private final static int BATCH_ROW_COUNT = 10000;
75  
76  	private final static Integer NULL_VALUE = -1;
77  
78  	protected static NumberFormat numberFormat = NumberFormat.getInstance();
79  	protected static char decimalSeparator = '\0';
80  	protected static char inverseDecimalSeparator = ',';
81  	static {
82  		DecimalFormatSymbols decimalFormatSymbols = new DecimalFormatSymbols();
83  		decimalSeparator = decimalFormatSymbols.getDecimalSeparator();
84  		if (decimalSeparator == ',') {
85  			inverseDecimalSeparator = '.';
86  		}
87  	}
88  
89  	@Autowired
90  	protected DataSource dataSource;
91  
92  	@Autowired
93  	protected SumarisDatabaseMetadata sumarisDatabaseMetadata;
94  
95  	@Autowired
96  	protected NamedParameterJdbcTemplate namedParameterJdbcTemplate;
97  
98  	protected SQLErrorCodeSQLExceptionTranslator sqlExceptionTranslator;
99  
100 	@Autowired
101 	protected DataSource datasource;
102 
103 	private final boolean showSql;
104 
105 	@Autowired
106 	public DataLoaderDaoImpl(EntityManager entityManager, DataSource dataSource) {
107 		super();
108 		setEntityManager(entityManager);
109 		this.dataSource = dataSource;
110 		this.sqlExceptionTranslator = new SQLErrorCodeSQLExceptionTranslator(dataSource);
111 		this.showSql = hibernateLog.isDebugEnabled();
112 	}
113 
114 
115 	@Override
116 	public DataLoadError[] validate(FileReader reader, DatabaseTableEnum table) throws IOException {
117 
118 		DataLoadResult result = new DataLoadResult();
119 		validate(reader, table, result);
120 		return result.getErrors();
121 	}
122 
123 	@Override
124 	public DataLoadError[] load(FileReader fileReader, DatabaseTableEnum table) throws IOException {
125 		return load(fileReader, table, false);
126 	}
127 
128 	public DataLoadError[] load(FileReader reader, DatabaseTableEnum table, boolean validate) throws IOException {
129 		Preconditions.checkNotNull(reader);
130 		DataLoadResult result = new DataLoadResult();
131 
132 		// Validate file, if need
133 		if (validate) {
134 			validate(reader, table, result);
135 
136 			if (!result.isSuccess()) return result.getErrors();
137 		}
138 
139 		SumarisTableMetadata tableMetadata = sumarisDatabaseMetadata.getTable(table.name());
140 
141 		// Import file :
142 		log.info(FileMessageFormatter.format(tableMetadata, null, reader.getCurrentLine(), "Importing file: " + reader.getFileName()));
143 
144 
145 		try {
146 			// Read column headers :
147 			String[] headers = reader.getHeaders();
148 			SumarisColumnMetadata[] headerColumns = getColumnMetadataFromHeaders(reader, result, tableMetadata, headers);
149 
150 			if (headerColumns == null) {
151 				return result.getErrors();
152 			}
153 
154 			Connection conn = DataSourceUtils.getConnection(dataSource);
155 			boolean isTransactional = DataSourceUtils.isConnectionTransactional(conn, dataSource);
156 
157 			try {
158 				String insertQuery = tableMetadata.getInsertQuery(headerColumns);
159 				PreparedStatement insertStatement = conn.prepareStatement(insertQuery);
160 
161 				String[] cols = null;
162 				int insertCount = 0;
163 				boolean rowHasErrors = false;
164 				while ((cols = reader.readNext()) != null) {
165 					int parameterIndex = 1;
166 
167 					// Generate a new id (only if the previous row was not skipped)
168 					if (!rowHasErrors) {
169 						Serializable id = generateIdentifier(conn, tableMetadata);
170 						insertStatement.setObject(parameterIndex++, id);
171 					} else {
172 						parameterIndex++;
173 					}
174 					if (log.isTraceEnabled()) {
175 						log.trace(FileMessageFormatter.format(tableMetadata, null, reader.getCurrentLine(), "Importing line with values:"));
176 					}
177 					rowHasErrors = false;
178 					int colIndex = 0;
179 					for (SumarisColumnMetadata columnMetadata : headerColumns) {
180 						// If column is not skipped
181 						if (columnMetadata != null) {
182 							String cellValue = null;
183 							if (colIndex < cols.length) {
184 								cellValue = cols[colIndex];
185 								if (log.isTraceEnabled()) {
186 									log.trace("\t" + columnMetadata.getName() + "=" + cellValue);
187 								}
188 							} else {
189 								cellValue = columnMetadata.getDefaultValue();
190 							}
191 							boolean parameterOK = setParameterValue(parameterIndex++, insertStatement, reader, tableMetadata, columnMetadata, colIndex,
192 									cellValue, result);
193 							rowHasErrors = rowHasErrors || !parameterOK;
194 						}
195 						colIndex++;
196 					}
197 
198 					if (rowHasErrors) {
199 						if (log.isDebugEnabled()) {
200 							log.debug( FileMessageFormatter.format(tableMetadata, null,  reader.getCurrentLine(), " Errors found -> Line skipped"));
201 						}
202 					} else {
203 						if (showSql) {
204 							hibernateLog.debug(insertQuery);
205 						}
206 						insertStatement.addBatch();
207 						insertCount++;
208 
209 						if (insertCount > 0 && insertCount % BATCH_ROW_COUNT == 0) {
210 							log.debug(FileMessageFormatter.format(tableMetadata, null,  reader.getCurrentLine(), "read " + insertCount + " lines..."));
211 							insertStatement.executeBatch();
212 						}
213 					}
214 				}
215 				if (insertCount > 0 && insertCount % BATCH_ROW_COUNT != 0) {
216 					insertStatement.executeBatch();
217 				}
218 				if (log.isInfoEnabled()) {
219 					log.info(FileMessageFormatter.format(tableMetadata, null, reader.getCurrentLine(), "INSERT count: " + insertCount));
220 				}
221 
222 				if (!isTransactional) conn.commit();
223 
224 			} catch (BatchUpdateException bue) {
225 				if (!isTransactional) conn.rollback();
226 				bue.getNextException().printStackTrace();
227 				throw bue;
228 			} finally {
229 				if (!isTransactional) DataSourceUtils.releaseConnection(conn, dataSource);
230 				reader.close();
231 			}
232 
233 			return result.getErrors();
234 		} catch (SQLException e) {
235 			log.error(FileMessageFormatter.format(tableMetadata, null, reader.getCurrentLine(), "Error during file importation: " + reader.getFileName()), e);
236 			throw sqlExceptionTranslator.translate("Importing file: " + reader.getFileName(), null, e);
237 		}
238 	}
239 
240 
241 	/* -- protected methods -- */
242 
243 	protected void validate(FileReader reader, DatabaseTableEnum table, DataLoadResult result ) throws IOException {
244 
245 		SumarisTableMetadata tableMetadata = sumarisDatabaseMetadata.getTable(table.name());
246 
247 		if (log.isInfoEnabled()) {
248 			log.info(FileMessageFormatter.format(tableMetadata, null,  reader.getCurrentLine(), "Starting file validation... " + reader.getFileName()));
249 		}
250 
251 		// Read column headers :
252 		String[] headers = reader.getHeaders();
253 		SumarisColumnMetadata[] mappedColumns = getColumnMetadataFromHeaders(reader, result, tableMetadata, headers);
254 
255 		if (!result.isSuccess() || mappedColumns == null) {
256 			return;
257 		}
258 
259 		// Read rows
260 		String[] cols;
261 		while ((cols = reader.readNext()) != null) {
262 			int colIndex = 0;
263 
264 			for (String cellValue : cols) {
265 				SumarisColumnMetadata columnMetadata = mappedColumns[colIndex++];
266 				validateColumnValue(reader, result, tableMetadata, columnMetadata, colIndex, cellValue);
267 			}
268 		}
269 	}
270 
271 
272 	private boolean setParameterValue(int parameterIndex, PreparedStatement insertStatement, FileReader reader,
273                                       SumarisTableMetadata tableMetadata,
274                                       SumarisColumnMetadata columnMetadata, int columnNumber, String cellValue,
275                                       DataLoadResult result) throws SQLException {
276 		int sqlType = columnMetadata.getTypeCode();
277 		try {
278 
279 			if (StringUtils.isBlank(cellValue)
280 					|| "NULL".equals(cellValue)
281 					|| (sqlType == Types.NUMERIC && "na".equalsIgnoreCase(cellValue))
282 					|| (sqlType == Types.NUMERIC && "n/a".equalsIgnoreCase(cellValue))) {
283 				if (StringUtils.isNotBlank(columnMetadata.getDefaultValue())) {
284 					cellValue = columnMetadata.getDefaultValue();
285 				} else {
286 					// TODO BLA: manage this - ask IRL with NULL ?
287 					insertStatement.setNull(parameterIndex, sqlType);
288 
289 					// Mandatory
290 					if (!columnMetadata.isNullable()) {
291 
292 						addError(reader, result,
293 								tableMetadata, columnMetadata,
294 								columnNumber,
295 								ErrorType.ERROR,
296 								"NULL_VALUE",
297 								I18n.t("import.validation.error.NULL_VALUE",
298 										columnMetadata.getName() ));
299 						return false;
300 					}
301 
302 					return true;
303 				}
304 			}
305 
306 			// Remove spaces in numerical values
307 			if ((sqlType == Types.NUMERIC
308 					|| sqlType == Types.BIGINT
309 					|| sqlType == Types.INTEGER
310 					|| sqlType == Types.FLOAT
311 					|| sqlType == Types.REAL)
312 					&& cellValue.indexOf(' ') != -1) {
313 				cellValue = cellValue.replaceAll(" ", "");
314 			}
315 
316 			// Special case a boolean value (1:true, 0:false)
317 			if (sqlType == BooleanType.INSTANCE.sqlType()) {
318 				if ("yes".equalsIgnoreCase(cellValue) || "true".equalsIgnoreCase(cellValue)) {
319 					cellValue = "1";
320 				} else if ("no".equalsIgnoreCase(cellValue) || "false".equalsIgnoreCase(cellValue)) {
321 					cellValue = "0";
322 				}
323 			}
324 
325 			// Length
326 			int columnSize = getColumnSize(tableMetadata, columnMetadata);
327 			int decimalDigits = columnMetadata.getDecimalDigits();
328 			if (columnSize > 0 && cellValue.trim().length() > columnSize && (sqlType != Types.NUMERIC || decimalDigits == 0)) {
329 				addError(reader, result, tableMetadata, columnMetadata,
330 						columnNumber,
331 						ErrorType.ERROR,
332 						"TOO_LONG_VALUE",
333 						I18n.t("import.validation.error.TOO_LONG_VALUE",
334 								cellValue, columnMetadata.getName(), columnSize ));
335 				insertStatement.setNull(parameterIndex, sqlType);
336 				return false;
337 			}
338 			if (sqlType == Types.NUMERIC) {
339 				cellValue = StringUtils.deleteWhitespace(cellValue);
340 				if (decimalDigits == 0) {
341 					long value = Long.parseLong(cellValue);
342 					insertStatement.setLong(parameterIndex, value);
343 				} else {
344 					if (cellValue.indexOf(inverseDecimalSeparator) != -1) {
345 						cellValue = cellValue.replace(inverseDecimalSeparator, decimalSeparator);
346 					}
347 					// If value is too long (whole length)
348 					if (columnSize > 0 && cellValue.length() > columnSize - 1) {
349 						int integerLength = columnSize - decimalDigits;
350 						if (cellValue.indexOf(decimalSeparator) > integerLength) {
351 							addError(reader, result, tableMetadata, columnMetadata,
352 									columnNumber,
353 									ErrorType.ERROR,
354 									"TOO_LONG_VALUE_WITH_SCALE",
355 									I18n.t("import.validation.error.TOO_LONG_VALUE_WITH_SCALE",
356 											cellValue, columnMetadata.getName(), columnSize, decimalDigits ));
357 							insertStatement.setNull(parameterIndex, sqlType);
358 							return false;
359 						} else {
360 							addErrorOnce(reader, result, tableMetadata, columnMetadata,
361 									columnNumber,
362 									ErrorType.WARNING,
363 									"ROUND_VALUES",
364 									I18n.t("import.validation.error.ROUND_VALUES",
365 											columnMetadata.getName(), decimalDigits ));
366 						}
367 					}
368 					Number value = numberFormat.parse(cellValue);
369 					//DecimalFormat.getInstance().parse(cellValue);
370 					insertStatement.setObject(parameterIndex, value, sqlType);
371 				}
372 				return true;
373 			}
374 
375 			// Double
376 			if (sqlType == DoubleType.INSTANCE.sqlType()) {
377 				if (cellValue.indexOf(',') != -1) {
378 					cellValue = cellValue.replace(',', '.');
379 				}
380 				double value = Double.parseDouble(cellValue);
381 				insertStatement.setDouble(parameterIndex, value);
382 				return true;
383 			}
384 
385 			// Integer
386 			if (sqlType == IntegerType.INSTANCE.sqlType()) {
387 				int value = Integer.parseInt(cellValue);
388 				insertStatement.setInt(parameterIndex, value);
389 				return true;
390 			}
391 
392 			// Decimal
393 			if (sqlType == Types.DECIMAL) {
394 				//int value = DecimalFormat.parseInt(cellValue);
395 				//insertStatement.setInt(parameterIndex, value);
396 				return true;
397 			}
398 
399 			// Float
400 			if (sqlType == FloatType.INSTANCE.sqlType()) {
401 				if (cellValue.indexOf(',') != -1) {
402 					cellValue = cellValue.replace(',', '.');
403 				}
404 				float value = Float.parseFloat(cellValue);
405 				insertStatement.setFloat(parameterIndex, value);
406 				return true;
407 			}
408 
409 			// Boolean
410 			if (sqlType == BooleanType.INSTANCE.sqlType()) {
411 				boolean value = false;
412 				if ("yes".equalsIgnoreCase(cellValue)
413 						|| "true".equalsIgnoreCase(cellValue)
414 						|| "1".equalsIgnoreCase(cellValue)) {
415 					value = true;
416 				}
417 				insertStatement.setBoolean(parameterIndex, value);
418 				return true;
419 			}
420 
421 			insertStatement.setObject(parameterIndex++, cellValue);
422 			return true;
423 		} catch (SQLDataException sqlde) {
424 			addError(reader, result, tableMetadata, columnMetadata,
425 					columnNumber,
426 					ErrorType.ERROR,
427 					"SQL_DATA_EXCEPTION",
428 					I18n.t("import.validation.error.SQL_DATA_EXCEPTION",
429 							columnMetadata.getName(), cellValue, sqlde.getMessage(), sqlde.getNextException().getMessage()));
430 			insertStatement.setNull(parameterIndex, sqlType);
431 			return false;
432 		} catch (SQLException sqle) {
433 			addError(reader, result, tableMetadata, columnMetadata,
434 					columnNumber,
435 					ErrorType.ERROR,
436 					"SQL_EXCEPTION",
437 					I18n.t("import.validation.error.SQL_EXCEPTION",
438 							columnMetadata.getName(), cellValue, sqle.getMessage() ));
439 			insertStatement.setNull(parameterIndex, sqlType);
440 			return false;
441 		}
442 		catch (ParseException | NumberFormatException pe) {
443 			addError(reader, result, tableMetadata, columnMetadata,
444 					columnNumber,
445 					ErrorType.ERROR,
446 					"SQL_EXCEPTION",
447 					I18n.t("import.validation.error.PARSE_EXCEPTION",
448 							columnMetadata.getName(), cellValue, pe.getMessage() ));
449 			insertStatement.setNull(parameterIndex, sqlType);
450 			return false;
451 		}
452 	}
453 
454 	protected SumarisColumnMetadata[] getColumnMetadataFromHeaders(FileReader reader,
455                                                                          DataLoadResult result,
456                                                                          SumarisTableMetadata tableMetadata,
457                                                                          String[] headers) {
458 		if (headers == null || headers.length == 0) {
459 
460 			addError(reader, result, tableMetadata, null,
461 					-1,
462 					ErrorType.FATAL,
463 					"NO_HEADER",
464 					I18n.t("import.validation.error.NO_HEADER", null, null));
465 			return null;
466 		}
467 
468 		Set<String> notNullColumns = new HashSet<String>();
469 		notNullColumns.addAll(tableMetadata.getNotNullNames());
470 
471 		int colIndex = 0;
472 		boolean hasBadHeaders = false;
473 		List<SumarisColumnMetadata> mappedColumns = Lists.newLinkedList();
474 		for (String columnName : headers) {
475 			SumarisColumnMetadata columnMetadata = tableMetadata.getColumnMetadata(columnName);
476 			if (columnMetadata != null) {
477 				mappedColumns.add(columnMetadata);
478 				notNullColumns.remove(columnName.toLowerCase());
479 			} else {
480 				// Insert null, to have the good index in array
481 				mappedColumns.add(null);
482 				if (columnName.trim().isEmpty()) {
483 					addError(reader, result, tableMetadata, null,
484 							-1,
485 							ErrorType.WARNING,
486 							"EMPTY_COLUMN_NAME",
487 							I18n.t("import.validation.error.EMPTY_COLUMN_NAME",
488 									colIndex, tableMetadata.getColumnNames().toString() ));
489 				} else {
490 					addError(reader, result, tableMetadata, null,
491 							-1,
492 							ErrorType.WARNING,
493 							"UNKNOWN_COLUMN_NAME",
494 							I18n.t("import.validation.error.UNKNOWN_COLUMN_NAME",
495 									columnName, tableMetadata.getColumnNames().toString() ));
496 				}
497 			}
498 		}
499 
500 		// Check if mandatory col are presents
501 		if (!hasBadHeaders && notNullColumns.size() > 0) {
502 			StringBuilder sb = new StringBuilder();
503 			for (String notNullColumn : notNullColumns) {
504 				SumarisColumnMetadata columnMetadata = tableMetadata.getColumnMetadata(notNullColumn);
505 				// If a default value is present, add as header (Insertion will use default value)
506 				if (StringUtils.isNotEmpty(columnMetadata.getDefaultValue())) {
507 					mappedColumns.add(columnMetadata);
508 				} else {
509 					sb.append(", ").append(notNullColumn.toUpperCase());
510 					hasBadHeaders = true;
511 				}
512 			}
513 			if (hasBadHeaders) {
514 				addError(reader, result, tableMetadata, null,
515 						-1,
516 						ErrorType.FATAL,
517 						"MISSING_COLUMN",
518 						I18n.t("import.validation.error.MISSING_COLUMN",
519 								sb.substring(2) ));
520 			}
521 		}
522 
523 		if (hasBadHeaders) {
524 			return null;
525 		}
526 
527 		return mappedColumns.toArray(new SumarisColumnMetadata[mappedColumns.size()]);
528 	}
529 
530 	protected Object validateColumnValue(FileReader reader, DataLoadResult result, SumarisTableMetadata tableMetadata, SumarisColumnMetadata columnMetadata,
531 										 int columnNumber, String cellValue) {
532 		boolean hasError = false;
533 
534 		// Skipped column
535 		if (columnMetadata == null) {
536 			return NULL_VALUE;
537 		}
538 
539 		int sqlType = columnMetadata.getTypeCode();
540 		if (StringUtils.isBlank(cellValue)
541 				|| "NULL".equals(cellValue)
542 				|| (sqlType == Types.NUMERIC && "na".equalsIgnoreCase(cellValue))
543 				|| (sqlType == Types.NUMERIC && "n/a".equalsIgnoreCase(cellValue))) {
544 			return NULL_VALUE;
545 		}
546 
547 		// Default value : apply default value if need
548 		if (StringUtils.isBlank(cellValue) && columnMetadata.getDefaultValue() != null) {
549 			cellValue = columnMetadata.getDefaultValue();
550 		}
551 
552 		// Mandatory
553 		if (StringUtils.isBlank(cellValue) && !columnMetadata.isNullable()) {
554 			addError(reader, result, tableMetadata, columnMetadata,
555 					columnNumber,
556 					ErrorType.ERROR,
557 					"NULL_VALUE",
558 					I18n.t("import.validation.error.NULL_VALUE",
559 							columnMetadata.getName() ));
560 			hasError = true;
561 		}
562 
563 		// Length
564 		int columnSize = getColumnSize(tableMetadata, columnMetadata);
565 		int decimalDigits = columnMetadata.getDecimalDigits();
566 		if (cellValue != null && columnSize > 0 && cellValue.trim().length() > columnSize) {
567 
568 			addError(reader, result, tableMetadata, columnMetadata,
569 					columnNumber,
570 					ErrorType.ERROR,
571 					"TOO_LONG_VALUE",
572 					I18n.t("import.validation.error.TOO_LONG_VALUE",
573 							cellValue, columnMetadata.getName(), columnSize ));
574 			hasError = true;
575 		}
576 
577 
578 		try {
579 			if (columnMetadata.getTypeCode() == DoubleType.INSTANCE.sqlType()) {
580 				if (cellValue.contains(",")) {
581 					cellValue = cellValue.replace(',', '.');
582 				}
583 				return Double.parseDouble(cellValue);
584 			}
585 
586 			if (columnMetadata.getTypeCode() == IntegerType.INSTANCE.sqlType()) {
587 				return Integer.parseInt(cellValue);
588 			}
589 		}
590 		catch (NumberFormatException pe) {
591 			addError(reader, result, tableMetadata, columnMetadata,
592 					columnNumber,
593 					ErrorType.ERROR,
594 					"SQL_EXCEPTION",
595 					I18n.t("import.validation.error.PARSE_EXCEPTION",
596 							columnMetadata.getName(), cellValue, pe.getMessage()));
597 			hasError = true;
598 		}
599 
600 		if (hasError) {
601 			return NULL_VALUE;
602 		}
603 
604 		return cellValue;
605 	}
606 
607 	public Set<List<String>> getExistingPrimaryKeys(Connection connection,
608 			SumarisTableMetadata table) throws SQLException {
609 
610 		Set<String> pkNames = table.getPkNames();
611 		int pkCount = pkNames.size();
612 		String sql = table.getExistingPrimaryKeysQuery();
613 
614 		PreparedStatement statement = connection.prepareStatement(sql);
615 
616 		Set<List<String>> result = Sets.newHashSet();
617 		try {
618 			ResultSet resultSet = statement.executeQuery();
619 			while (resultSet.next()) {
620 				List<String> pk = Lists.newArrayListWithCapacity(pkCount);
621 				for (int i = 1; i <= pkCount; i++) {
622 					pk.add(String.valueOf(resultSet.getObject(i)));
623 				}
624 				result.add(pk);
625 			}
626 			statement.close();
627 			return result;
628 		} finally {
629 			// close statement silently
630 			if (statement != null) {
631 				try {
632 					statement.close();
633 				}
634 				catch(Exception e) {}
635 			}
636 		}
637 	}
638 
639 	protected List<String> getPk(ResultSet incomingData, int[] pkIndexs) throws SQLException {
640 		List<String> result = Lists.newArrayListWithCapacity(pkIndexs.length);
641 		for (int pkIndex : pkIndexs) {
642 			Object pk = incomingData.getObject(pkIndex);
643 			result.add(pk == null ? null : String.valueOf(pk));
644 		}
645 		return result;
646 	}
647 
648 	protected int getColumnSize(SumarisTableMetadata tableMetadata, SumarisColumnMetadata columnMetadata) {
649 		StringBuilder sb = new StringBuilder();
650 		sb.append(tableMetadata.getName());
651 		sb.append('.');
652 		sb.append(columnMetadata.getName());
653 		sb.append(".length");
654 		try {
655 			String propValue = I18n.t(sb.toString(), null, Locale.getDefault());
656 			int overrideSize = Integer.parseInt(propValue);
657 			if (overrideSize != -1) {
658 				return overrideSize;
659 			}
660 		} catch (NoSuchMessageException | NumberFormatException e) {
661 			// continue
662 		}
663 		return columnMetadata.getColumnSize();
664 
665 	}
666 
667 	@Override
668 	public int removeData(DatabaseTableEnum table, String[] filteredColumns, Object[] filteredValues) {
669 		SumarisTableMetadata tableMetadata = sumarisDatabaseMetadata.getTable(table.name());
670 		Preconditions.checkNotNull(tableMetadata);
671 
672 		Connection conn = DataSourceUtils.getConnection(dataSource);
673 		String deleteQuery = null;
674 		try {
675 
676 			if (filteredColumns != null && filteredColumns.length > 0) {
677 				Preconditions.checkNotNull(filteredValues);
678 				Preconditions.checkArgument(filteredColumns.length == filteredValues.length);
679 
680 				Set<SumarisColumnMetadata> columns = Sets.newLinkedHashSet();
681 				StringBuilder logBuilder = new StringBuilder();
682 				int i = 0;
683 				for (String columnName : filteredColumns) {
684 					SumarisColumnMetadata column = tableMetadata.getColumnMetadata(columnName);
685 					if (column == null) {
686 						throw new DataRetrievalFailureException("Unknown column name '" + columnName + "' for table " + table.name());
687 					}
688 					columns.add(column);
689 					if (log.isInfoEnabled()) {
690 						logBuilder.append(columnName).append("=").append(filteredValues[i]).append(", ");
691 					}
692 					i++;
693 				}
694 
695 				if (log.isInfoEnabled()) {
696 					log.info(String.format("Removing rows on table {%s} using tripFilter {%s}", table.name(), logBuilder.substring(0, logBuilder.length() - 2)));
697 				}
698 
699 				deleteQuery = tableMetadata.getDeleteQuery(filteredColumns);
700 				PreparedStatement deleteStatement = conn.prepareStatement(deleteQuery);
701 				int paramIndex = 1;
702 				for (SumarisColumnMetadata column : columns) {
703 					deleteStatement.setObject(paramIndex, filteredValues[paramIndex - 1], column.getTypeCode());
704 					paramIndex++;
705 				}
706 				return deleteStatement.executeUpdate();
707 			} else {
708 				if (log.isInfoEnabled()) {
709 					log.info(String.format("Removing ALL rows on table {%s}", table.name()));
710 				}
711 
712 				deleteQuery = tableMetadata.getDeleteQuery();
713 				PreparedStatement deleteStatement = conn.prepareStatement(deleteQuery);
714 				return deleteStatement.executeUpdate();
715 			}
716 		} catch (SQLException e) {
717 			throw sqlExceptionTranslator.translate("Unable to delete table " + table.name(), deleteQuery, e);
718 		} finally {
719 			DataSourceUtils.releaseConnection(conn, dataSource);
720 		}
721 	}
722 
723 
724 	protected void addError(
725 			FileReader reader,
726 			DataLoadResult result,
727 			SumarisTableMetadata tableMetadata, SumarisColumnMetadata colMeta,
728 			int columnNumber,
729 			ErrorType errorType,
730 			String errorCode, String description) {
731 		DataLoadError error = DataLoadError.Builder.create(reader, tableMetadata, colMeta, description)
732 				.setColumnNumber(columnNumber != -1 ? columnNumber : null)
733 				.setErrorCode(errorCode)
734 				.setErrorType(errorType)
735 				.build();
736 
737 		result.addError(error);
738 		logError(error, result);
739 	}
740 
741 	protected void addErrorOnce(
742 			FileReader reader,
743 			DataLoadResult result,
744 			SumarisTableMetadata tableMetadata, SumarisColumnMetadata colMeta,
745 			int columnNumber,
746 			ErrorType errorType,
747 			String errorCode, String description) {
748 		DataLoadError error = DataLoadError.Builder.create(reader, tableMetadata, colMeta, description)
749 				.setColumnNumber(columnNumber != -1 ? columnNumber : null)
750 				.setErrorCode(errorCode)
751 				.setErrorType(errorType)
752 				.build();
753 
754 		result.addErrorOnce(error);
755 		logError(error, result);
756 	}
757 
758 	protected void logError(DataLoadError error, DataLoadResult result) {
759 		if (result.errorCount() < MAX_LOG_ERRORS) {
760 
761 			// log
762 			switch (error.getErrorType()) {
763 				case WARNING:
764 					log.warn(error.getDescription());
765 					break;
766 				case ERROR:
767 					log.error(error.getDescription());
768 					break;
769 				case FATAL:
770 					log.error(error.getDescription());
771 					break;
772 			}
773 		}
774 	}
775 
776 	protected Serializable generateIdentifier(Connection conn, SumarisTableMetadata table) throws SQLException {
777 		String sequenceNextValQuery = table.getSequenceNextValQuery();
778 
779 		if (sequenceNextValQuery == null) {
780 			throw new SumarisTechnicalException(String.format("No sequence found on table {%s}. Unable to generate identifier.", table.getName()));
781 		}
782 		PreparedStatement statement = conn.prepareStatement(sequenceNextValQuery);
783 		try {
784 			ResultSet rs = statement.executeQuery();
785 			rs.next();
786 			return rs.getInt(1);
787 		} catch(SQLException e){
788 			Daos.closeSilently(statement);
789 			throw e;
790 		}
791 	}
792 }