View Javadoc
1   package fr.ifremer.quadrige2.synchro.service.data;
2   
3   /*-
4    * #%L
5    * Quadrige2 Core :: Quadrige2 Synchro Core
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2017 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import com.google.common.base.Preconditions;
27  import com.google.common.collect.*;
28  import fr.ifremer.common.synchro.SynchroTechnicalException;
29  import fr.ifremer.common.synchro.config.SynchroConfiguration;
30  import fr.ifremer.common.synchro.dao.DaoFactory;
31  import fr.ifremer.common.synchro.dao.Daos;
32  import fr.ifremer.common.synchro.dao.SynchroTableDao;
33  import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
34  import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
35  import fr.ifremer.common.synchro.service.*;
36  import fr.ifremer.common.synchro.type.ProgressionModel;
37  import fr.ifremer.quadrige2.core.dao.technical.Dates;
38  import fr.ifremer.quadrige2.core.exception.Quadrige2TechnicalException;
39  import fr.ifremer.quadrige2.core.config.Quadrige2Configuration;
40  import fr.ifremer.quadrige2.core.dao.ObjectTypes;
41  import fr.ifremer.quadrige2.core.dao.system.synchronization.SynchronizationStatus;
42  import fr.ifremer.quadrige2.core.dao.technical.hibernate.ConfigurationHelper;
43  import fr.ifremer.quadrige2.synchro.meta.DatabaseColumns;
44  import fr.ifremer.quadrige2.synchro.meta.data.DataSynchroTables;
45  import fr.ifremer.quadrige2.synchro.service.SynchroDirection;
46  import org.apache.commons.collections4.CollectionUtils;
47  import org.apache.commons.collections4.MapUtils;
48  import org.apache.commons.io.IOUtils;
49  import org.apache.commons.lang3.StringUtils;
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  import org.hibernate.dialect.Dialect;
53  import org.springframework.beans.factory.annotation.Autowired;
54  import org.springframework.context.annotation.Lazy;
55  import org.springframework.stereotype.Service;
56  
57  import javax.sql.DataSource;
58  import java.beans.PropertyChangeEvent;
59  import java.beans.PropertyChangeListener;
60  import java.io.File;
61  import java.sql.Connection;
62  import java.sql.ResultSet;
63  import java.sql.SQLException;
64  import java.sql.Timestamp;
65  import java.util.*;
66  
67  import static org.nuiton.i18n.I18n.t;
68  
69  /**
70   * <p>
71   * DataSynchroServiceImpl class.
72   * </p>
73   * 
74   */
75  @Service("dataSynchroService")
76  @Lazy
77  public class DataSynchroServiceImpl
78  		extends SynchroServiceImpl
79  		implements DataSynchroService {
80  
81  	private static final Log log = LogFactory.getLog(DataSynchroServiceImpl.class);
82  
83  	private static boolean DISABLE_INTEGRITY_CONSTRAINTS = false;
84  	private static boolean ALLOW_MISSING_OPTIONAL_COLUMN = true;
85  	private static boolean ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA = false;
86  	private static boolean KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS = true;
87  
88  	private final int exportUpdateDateDelayInSecond;
89  
90  	/**
91  	 * <p>
92  	 * Constructor for DataSynchroServiceImpl.
93  	 * </p>
94  	 * 
95  	 * @param dataSource
96  	 *            a {@link javax.sql.DataSource} object.
97  	 * @param config
98  	 *            a {@link fr.ifremer.common.synchro.config.SynchroConfiguration} object.
99  	 */
100 	@Autowired
101 	public DataSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
102 		super(dataSource, config,
103 				DISABLE_INTEGRITY_CONSTRAINTS,
104 				ALLOW_MISSING_OPTIONAL_COLUMN,
105 				ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
106 				KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
107 		exportUpdateDateDelayInSecond = Quadrige2Configuration.getInstance().getExportDataUpdateDateDelayInSecond();
108 	}
109 
110 	/**
111 	 * <p>
112 	 * Constructor for DataSynchroServiceImpl.
113 	 * </p>
114 	 */
115 	public DataSynchroServiceImpl() {
116 		super(DISABLE_INTEGRITY_CONSTRAINTS,
117 				ALLOW_MISSING_OPTIONAL_COLUMN,
118 				ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
119 				KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
120 		exportUpdateDateDelayInSecond = Quadrige2Configuration.getInstance().getExportDataUpdateDateDelayInSecond();
121 	}
122 
123 	/** {@inheritDoc} */
124 	@Override
125 	public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId) {
126 		Preconditions.checkNotNull(sourceDbDirectory);
127 		Preconditions.checkArgument(sourceDbDirectory.exists() && sourceDbDirectory.isDirectory());
128 		return createSynchroContext(sourceDbDirectory, direction, userId, null, true, true);
129 	}
130 
131 	/** {@inheritDoc} */
132 	@Override
133 	public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId,
134 			Timestamp lastSynchronizationDate) {
135 		return createSynchroContext(sourceDbDirectory, direction, userId, lastSynchronizationDate, true, true);
136 	}
137 
138 	/** {@inheritDoc} */
139 	@Override
140 	public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId,
141 			Timestamp lastSynchronizationDate, boolean enableDelete, boolean enableInsertUpdate) {
142 		SynchroContext delegate = super.createSynchroContext(sourceDbDirectory, DataSynchroTables.getImportTablesIncludes());
143 
144 		// Make sure q2 connection properties are always used
145 		delegate.getTarget().putAllProperties(Quadrige2Configuration.getInstance().getConnectionProperties());
146 
147 		DataSynchroContext result = new DataSynchroContext(delegate, direction, userId);
148 		result.setLastSynchronizationDate(lastSynchronizationDate);
149 		result.setEnableDelete(enableDelete);
150 		result.setEnableInsertOrUpdate(enableInsertUpdate);
151 		initContext(result);
152 		return result;
153 	}
154 
155 	/** {@inheritDoc} */
156 	@Override
157 	public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId) {
158 		return createSynchroContext(sourceConnectionProperties, direction, userId, null, true, true);
159 	}
160 
161 	/** {@inheritDoc} */
162 	@Override
163 	public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId,
164 			Timestamp lastSynchronizationDate) {
165 		return createSynchroContext(sourceConnectionProperties, direction, userId, lastSynchronizationDate, true, true);
166 	}
167 
168 	/** {@inheritDoc} */
169 	@Override
170 	public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId,
171 			Timestamp lastSynchronizationDate, boolean enableDelete, boolean enableInsertUpdate) {
172 		Preconditions.checkNotNull(sourceConnectionProperties);
173 		SynchroContext delegate = super.createSynchroContext(sourceConnectionProperties, DataSynchroTables.getImportTablesIncludes());
174 
175 		// Make sure q2 connection properties are always used
176 		delegate.getTarget().putAllProperties(Quadrige2Configuration.getInstance().getConnectionProperties());
177 
178 		DataSynchroContext result = new DataSynchroContext(delegate, direction, userId);
179 		result.setLastSynchronizationDate(lastSynchronizationDate);
180 		result.setEnableDelete(enableDelete);
181 		result.setEnableInsertOrUpdate(enableInsertUpdate);
182 		initContext(result);
183 		return result;
184 	}
185 
186 	/** {@inheritDoc} */
187 	@Override
188 	public void prepare(SynchroContext synchroContext) {
189 		Preconditions.checkArgument(synchroContext != null && synchroContext instanceof DataSynchroContext,
190 				String.format("The context must be a instance of %s", DataSynchroContext.class.getName()));
191 
192 		DataSynchroContext dataSynchroContext = (DataSynchroContext) synchroContext;
193 		SynchroResult result = dataSynchroContext.getResult();
194 		SynchroDirection direction = dataSynchroContext.getDirection();
195 
196 		DataSynchroDatabaseConfiguration target = (DataSynchroDatabaseConfiguration) dataSynchroContext.getTarget();
197 		DataSynchroDatabaseConfiguration source = (DataSynchroDatabaseConfiguration) dataSynchroContext.getSource();
198 
199 		// If server DB -> Temp DB
200 		if (direction == SynchroDirection.IMPORT_SERVER2TEMP) {
201 
202 			// Ignore some not used columns, on measurement tables
203 			source.excludeMeasurementUnusedColumns();
204 
205 			// Set target database as mirror AND temporary
206 			target.setIsMirrorDatabase(true);
207 
208 			// Ignore some not used columns
209 			target.excludeUnusedColumns();
210 
211 			// Disable integrity constraints
212 			{
213 				disableIntegrityConstraints(target.getConnectionProperties(), result);
214 				if (!result.isSuccess()) {
215 					return;
216 				}
217 
218 				// No need to check unicity of input rows (server rows are safe)
219 				source.setCheckUniqueConstraintBetweenInputRows(false);
220 				target.setCheckUniqueConstraintBetweenInputRows(false);
221 			}
222 
223 			// Compute the default period, if need
224 			if (dataSynchroContext.getDataEndDate() == null) {
225 				fillDefaultDataPeriod(dataSynchroContext);
226 			}
227 
228 			// Compute the default PK restriction, if need
229 			if (dataSynchroContext.getPkIncludes() == null) {
230 				fillDefaultPkIncludes(dataSynchroContext);
231 			}
232 		}
233 
234 		// If Temp DB -> Local DB
235 		else if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
236 
237 			// Set target database as not a Mirror
238 			target.setIsMirrorDatabase(false);
239 
240 			// No need to check unicity of input rows (rows come from server and are safe)
241 			source.setCheckUniqueConstraintBetweenInputRows(false);
242 			target.setCheckUniqueConstraintBetweenInputRows(false);
243 		}
244 
245 		// If server DB -> Local DB
246 		if (direction == SynchroDirection.IMPORT_NO_TEMP) {
247 			// TODO : add implement
248 			throw new Quadrige2TechnicalException("Direction IMPORT_NO_TEMP not implemented yet for Data tables");
249 		}
250 
251 		// If local -> temporary database
252 		else if (direction == SynchroDirection.EXPORT_LOCAL2TEMP) {
253 
254 			// Set target database as mirror AND temporary
255 			target.setIsMirrorDatabase(true);
256 
257 			// Disable integrity constraints
258 			{
259 				disableIntegrityConstraints(target.getConnectionProperties(), result);
260 				if (!result.isSuccess()) {
261 					return;
262 				}
263 			}
264 
265 			// No need to check unicity of input rows (will be done on temp->server)
266 			source.setCheckUniqueConstraintBetweenInputRows(false);
267 			target.setCheckUniqueConstraintBetweenInputRows(false);
268 		}
269 
270 		// If Temp DB -> server DB
271 		else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
272 
273 			// Ignore some not used columns
274 			source.setFullMetadataEnable(true);
275 			source.excludeUnusedColumns();
276 
277 			// Set target database as not a Mirror
278 			target.setIsMirrorDatabase(false);
279 			target.setFullMetadataEnable(false);
280 
281 			// Fill system timestamp into context, and copy it to source
282 			fillSystemTimestampWithDelay(result, target);
283 			source.setSystemTimestamp(target.getSystemTimestamp());
284 		}
285 
286 		// If Local DB -> file
287 		else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
288 
289 			// Set target database as a Mirror
290 			target.setIsMirrorDatabase(true);
291 		}
292 
293 		// If file -> Local DB
294 		else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
295 
296 			// Set target database as a NOT a mirror (to avoid constraints disabling)
297 			target.setIsMirrorDatabase(false);
298 		}
299 
300 		super.prepare(synchroContext);
301 	}
302 
303 	/** {@inheritDoc} */
304 	@Override
305 	public void synchronize(SynchroContext synchroContext) {
306 		Preconditions.checkArgument(synchroContext != null && synchroContext instanceof DataSynchroContext,
307 				String.format("The context must be a instance of %s", DataSynchroContext.class.getName()));
308 
309 		DataSynchroContext dataSynchroContext = (DataSynchroContext) synchroContext;
310 		SynchroResult result = dataSynchroContext.getResult();
311 		List<Multimap<String, String>> pkIncludesListForBatch = null;
312 
313 		DataSynchroDatabaseConfiguration target = (DataSynchroDatabaseConfiguration) dataSynchroContext.getTarget();
314 		DataSynchroDatabaseConfiguration source = (DataSynchroDatabaseConfiguration) dataSynchroContext.getSource();
315 
316 		// If server DB -> temp DB
317 		if (dataSynchroContext.getDirection() == SynchroDirection.IMPORT_SERVER2TEMP) {
318 
319 			// Enable protected columns
320 			target.removeColumnExclude(target.getColumnSynchronizationStatus());
321 			target.removeColumnExclude(target.getColumnRemoteId());
322 
323 			// Disable integrity constraints
324 			{
325 				disableIntegrityConstraints(dataSynchroContext.getTarget().getConnectionProperties(), result);
326 				if (!result.isSuccess()) {
327 					return;
328 				}
329 			}
330 		}
331 
332 		// If temp DB -> server DB
333 		else if (dataSynchroContext.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER) {
334 
335 			// Enable protected columns
336 			source.removeColumnExclude(source.getColumnSynchronizationStatus());
337 			source.removeColumnExclude(source.getColumnRemoteId());
338 		}
339 
340 		// If Temp DB -> Local DB
341 		else if (dataSynchroContext.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL) {
342 
343 			// Use batch (split import - see mantis #30765)
344 			// only if:
345 			// - insert/update are enable
346 			// - pk filter not already exists
347 			if (dataSynchroContext.isEnableInsertOrUpdate()
348 					&& (dataSynchroContext.getPkIncludes() == null || dataSynchroContext.getPkIncludes().isEmpty())) {
349 				pkIncludesListForBatch = computePkIncludesListForBatch(result, dataSynchroContext);
350 			}
351 		}
352 
353 		boolean synchronizeUsingBatch = CollectionUtils.isNotEmpty(pkIncludesListForBatch);
354 
355 		// Synchronize all
356 		if (!synchronizeUsingBatch
357 				|| !dataSynchroContext.isEnableInsertOrUpdate()) {
358 			super.synchronize(synchroContext);
359 		}
360 
361 		// Synchronize using batch (see mantis #30765)
362 		else {
363 			synchronizeUsingBatch(synchroContext, pkIncludesListForBatch);
364 		}
365 	}
366 
367 	/** {@inheritDoc} */
368 	@Override
369 	public void finish(SynchroContext synchroContext,
370 			SynchroResult resultWithPendingOperation,
371 			Map<RejectedRow.Cause, RejectedRow.ResolveStrategy> rejectStrategies) {
372 		super.finish(synchroContext, resultWithPendingOperation, rejectStrategies);
373 	}
374 
375 	@Override
376 	protected void cleanSynchroResult(SynchroContext context, SynchroResult result) {
377 		super.cleanSynchroResult(context, result);
378 	}
379 
380 	/* -- Internal methods -- */
381 
382 	/** {@inheritDoc} */
383 	@Override
384 	protected void prepareRootTable(
385 			DaoFactory sourceDaoFactory,
386 			DaoFactory targetDaoFactory,
387 			SynchroTableMetadata table,
388 			SynchroContext context,
389 			SynchroResult result) throws SQLException {
390 
391 		DataSynchroContext dataContext = (DataSynchroContext) context;
392 
393 		// Prepare from super class (insert and update)
394 		if (dataContext.isEnableInsertOrUpdate()) {
395 			super.prepareRootTable(sourceDaoFactory,
396 					targetDaoFactory,
397 					table,
398 					context,
399 					result);
400 		}
401 
402 		// Add deleted items
403 		if (dataContext.isEnableDelete()) {
404 			prepareDeletedRootTable(sourceDaoFactory,
405 					targetDaoFactory,
406 					table,
407 					context,
408 					result);
409 		}
410 	}
411 
412 	/**
413 	 * Count number of row to delete, for the given table
414 	 * 
415 	 * @param sourceDaoFactory
416 	 *            a {@link fr.ifremer.common.synchro.dao.DaoFactory} object.
417 	 * @param targetDaoFactory
418 	 *            a {@link fr.ifremer.common.synchro.dao.DaoFactory} object.
419 	 * @param table
420 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
421 	 * @param context
422 	 *            a {@link fr.ifremer.common.synchro.service.SynchroContext} object.
423 	 * @param result
424 	 *            a {@link fr.ifremer.common.synchro.service.SynchroResult} object.
425 	 * @throws java.sql.SQLException
426 	 *             if any.
427 	 */
428 	protected void prepareDeletedRootTable(
429 			DaoFactory sourceDaoFactory,
430 			DaoFactory targetDaoFactory,
431 			SynchroTableMetadata table,
432 			SynchroContext context,
433 			SynchroResult result) throws SQLException {
434 
435 		String tableName = table.getName();
436 		Set<String> objectTypeFks = ObjectTypes.getObjectTypeFromTableName(tableName);
437 
438 		if (CollectionUtils.isEmpty(objectTypeFks)) {
439 			return;
440 		}
441 
442 		SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.DELETED_ITEM_HISTORY.name());
443 
444 		List<List<Object>> columnValues = Lists.newArrayListWithCapacity(objectTypeFks.size());
445 		for (String objectTypeFk : objectTypeFks) {
446 			columnValues.add(ImmutableList.<Object> of(objectTypeFk));
447 		}
448 
449 		// Read rows of DELETED_ITEM_HISTORY (from the temp DB)
450 		Map<String, Object> bindings = createSelectBindingsForTable(context, DataSynchroTables.DELETED_ITEM_HISTORY.name());
451 		long count = dihSourceDao.countDataByFks(
452 				ImmutableSet.of(DatabaseColumns.OBJECT_TYPE_CD.name()),
453 				columnValues,
454 				bindings
455 				);
456 		if (count > 0) {
457 			result.addRows(tableName, (int) count);
458 		}
459 	}
460 
461 	/** {@inheritDoc} */
462 	@Override
463 	protected Map<String, Object> createDefaultSelectBindings(SynchroContext context) {
464 
465 		Map<String, Object> bindings = super.createDefaultSelectBindings(context);
466 
467 		if (context instanceof DataSynchroContext) {
468 			DataSynchroContext dataContext = (DataSynchroContext) context;
469 
470 			// Fill with the current person_fk (need for Dao and insert into TempQueryParemeter)
471 			bindings.put("userId", dataContext.getUserId());
472 
473 			// Fill the period (using in interceptor queries - see SurveyInterceptor)
474 			if (dataContext.getDataStartDate() != null) {
475 				bindings.put("startDate", new Timestamp(dataContext.getDataStartDate().getTime()));
476 			}
477 			// (endDate could be null - see Mantis #27242)
478 			if (dataContext.getDataEndDate() != null) {
479 				bindings.put("endDate", new Timestamp(dataContext.getDataEndDate().getTime()));
480 			}
481 		}
482 
483 		return bindings;
484 	}
485 
486 	/**
487 	 * {@inheritDoc}
488 	 * 
489 	 * Override the default method, to disable this binding: <br/>
490 	 * <code>bindings.put(SynchroTableMetadata.UPDATE_DATE_BINDPARAM, context.getResult().getUpdateDate(tableName))</code>
491 	 */
492 	@Override
493 	protected Map<String, Object> createSelectBindingsForTable(SynchroContext context, String tableName) {
494 		// Get defaults binding from context
495 		Map<String, Object> bindings = Maps.newHashMap(getSelectBindings(context));
496 
497 		Timestamp lastSynchronizationDate = context.getLastSynchronizationDate();
498 		boolean notEmptyTargetTable = (context.getResult().getUpdateDate(tableName) != null);
499 		boolean enableUpdateDateFilter = (lastSynchronizationDate != null)
500 				&& (context.getTarget().isMirrorDatabase() || notEmptyTargetTable);
501 
502 		if (enableUpdateDateFilter) {
503 			bindings.put(SynchroTableMetadata.UPDATE_DATE_BINDPARAM, lastSynchronizationDate);
504 		}
505 
506 		return bindings;
507 	}
508 
509 	/**
510 	 * <p>
511 	 * fillSystemTimestampWithDelay.
512 	 * </p>
513 	 * 
514 	 * @param result
515 	 *            a {@link fr.ifremer.common.synchro.service.SynchroResult} object.
516 	 * @param databaseConfiguration
517 	 *            a {@link fr.ifremer.common.synchro.service.SynchroDatabaseConfiguration} object.
518 	 */
519 	protected void fillSystemTimestampWithDelay(SynchroResult result, SynchroDatabaseConfiguration databaseConfiguration) {
520 
521 		result.getProgressionModel().setMessage(t("quadrige2.synchro.synchronizeData.initSystemTimestamp"));
522 		if (log.isInfoEnabled()) {
523 			log.info(t("quadrige2.synchro.synchronizeData.initSystemTimestamp"));
524 		}
525 
526 		Connection connection = null;
527 		try {
528 			connection = createConnection(databaseConfiguration);
529 			Dialect dialect = databaseConfiguration.getDialect();
530 			Timestamp currentTimestamp = Daos.getCurrentTimestamp(connection, dialect);
531 
532 			// Add a delay, to allow concurrent importation to re-import this data later
533 			Dates.addSeconds(currentTimestamp, exportUpdateDateDelayInSecond);
534 
535 			if (log.isDebugEnabled()) {
536 				log.debug(String.format("Current timestamp: %s", currentTimestamp));
537 			}
538 
539 			databaseConfiguration.setSystemTimestamp(currentTimestamp);
540 		} catch (Quadrige2TechnicalException e) {
541 			result.setError(e);
542 		} catch (SQLException e) {
543 			result.setError(e);
544 		} finally {
545 			closeSilently(connection);
546 		}
547 	}
548 
549 	/**
550 	 * <p>
551 	 * fillDefaultDataPeriod.
552 	 * </p>
553 	 * 
554 	 * @param dataSynchroContext
555 	 *            a {@link fr.ifremer.quadrige2.synchro.service.data.DataSynchroContext} object.
556 	 */
557 	protected void fillDefaultDataPeriod(DataSynchroContext dataSynchroContext) {
558 		int nbYearDataHistory = Quadrige2Configuration.getInstance().getImportNbYearDataHistory();
559 
560 		// Compute the start date : X years ago, starting at January 1th
561 		Date dataStartDate = Dates.addYears(new Date(), -1 * nbYearDataHistory);
562 		dataStartDate = Dates.truncate(dataStartDate, Calendar.YEAR);
563 		dataSynchroContext.setDataStartDate(dataStartDate);
564 
565 		// Compute the end : last hour of the day
566 		Date dataEndDate = Dates.lastSecondOfTheDay(new Date());
567 		dataSynchroContext.setDataEndDate(dataEndDate);
568 	}
569 
570 	/**
571 	 * <p>
572 	 * fillDefaultPkIncludes.
573 	 * </p>
574 	 * 
575 	 * @param dataSynchroContext
576 	 *            a {@link fr.ifremer.quadrige2.synchro.service.data.DataSynchroContext} object.
577 	 */
578 	protected void fillDefaultPkIncludes(DataSynchroContext dataSynchroContext) {
579 		String confProperty = Quadrige2Configuration.getInstance().getImportDataPkIncludes();
580 		Multimap<String, String> pkIncludes = ConfigurationHelper.getMultimap(confProperty);
581 		dataSynchroContext.setPkIncludes(pkIncludes);
582 	}
583 
584 	/** {@inheritDoc} */
585 	@Override
586 	protected List<SynchroTableOperation> getRootOperations(
587 			DaoFactory sourceDaoFactory,
588 			DaoFactory targetDaoFactory,
589 			SynchroDatabaseMetadata dbMeta,
590 			SynchroContext context) throws SQLException {
591 		DataSynchroContext dataContext = (DataSynchroContext) context;
592 		List<SynchroTableOperation> result = Lists.newArrayList();
593 
594 		// Add delete items history operation - if enable in context
595 		// NOTE : deletion should be done BEFORE insertion (to avoid duplication error)
596 		if (dataContext.isEnableDelete()
597 				&& (dataContext.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER
598 				|| dataContext.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL)) {
599 
600 			Collection<SynchroTableOperation> deletedItemOperations = getDeleteOperations(sourceDaoFactory, targetDaoFactory, dbMeta, dataContext);
601 			result.addAll(deletedItemOperations);
602 		}
603 
604 		// Add default operations (insert/update) - if enable in context
605 		if (dataContext.isEnableInsertOrUpdate()) {
606 			Collection<SynchroTableOperation> defaultOperations = super.getRootOperations(sourceDaoFactory, targetDaoFactory, dbMeta, context);
607 			result.addAll(defaultOperations);
608 		}
609 
610 		return result;
611 	}
612 
613 	private Collection<SynchroTableOperation> getDeleteOperations(
614 			DaoFactory sourceDaoFactory,
615 			DaoFactory targetDaoFactory,
616 			SynchroDatabaseMetadata dbMeta,
617 			DataSynchroContext context) throws SQLException {
618 		Preconditions.checkArgument(dbMeta.getConfiguration().isFullMetadataEnable());
619 
620 		Deque<SynchroTableOperation> result = Queues.newArrayDeque();
621 
622 		SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.DELETED_ITEM_HISTORY.name());
623 		Set<String> includedDataTables = DataSynchroTables.getImportTablesIncludes();
624 		if (CollectionUtils.isEmpty(includedDataTables)) {
625 			return result;
626 		}
627 
628 		// Read rows of DELETED_ITEM_HISTORY (from the temp DB)
629 		Map<String, Object> bindings = createSelectBindingsForTable(context, DataSynchroTables.DELETED_ITEM_HISTORY.name());
630 		ResultSet dihResultSet = dihSourceDao.getData(bindings);
631 
632 		List<Object> dihIdsToRemove = Lists.newArrayList();
633 
634 		try {
635 			dihResultSet = dihSourceDao.getData(bindings);
636 
637 			while (dihResultSet.next()) {
638 
639 				long dihId = dihResultSet.getLong(DatabaseColumns.DEL_ITEM_HIST_ID.name());
640 				String objectType = dihResultSet.getString(DatabaseColumns.OBJECT_TYPE_CD.name());
641 				String tableName = ObjectTypes.getTableNameFromObjectType(objectType);
642 
643 				boolean isDataTable = StringUtils.isNotBlank(tableName)
644 						&& includedDataTables.contains(tableName.toUpperCase());
645 
646 				if (isDataTable) {
647 					SynchroTableDao targetDao = targetDaoFactory.getSourceDao(tableName);
648 					SynchroTableMetadata table = targetDao.getTable();
649 					boolean shouldDeleteUsingRemoteId = (context.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL);
650 
651 					String objectCode = dihResultSet.getString(DatabaseColumns.OBJECT_CD.name());
652 					String objectId = dihResultSet.getString(DatabaseColumns.OBJECT_ID.name());
653 
654 					// Delete by a PK (id or code)
655 					if (StringUtils.isNotBlank(objectCode) || StringUtils.isNotBlank(objectId)) {
656 						List<Object> pk = StringUtils.isNotBlank(objectCode)
657 								? ImmutableList.<Object> of(objectCode)
658 								: ImmutableList.<Object> of(objectId);
659 
660 						boolean hasChildTables = targetDao.getTable().hasChildJoins();
661 
662 						SynchroTableOperation operation = new SynchroTableOperation(tableName, context);
663 						operation.setEnableProgress(true);
664 						result.add(operation);
665 
666 						// If import to local DB : object_id = table's remote_id
667 						if (shouldDeleteUsingRemoteId) {
668 							if (table.getColumnIndex(DatabaseColumns.REMOTE_ID.name()) != -1) {
669 								operation.addChildrenToDeleteFromOneColumn(tableName, DatabaseColumns.REMOTE_ID.name(), pk);
670 
671 								// Make sure DIH is deleted, if exported from this local DB
672 								dihIdsToRemove.add(dihId);
673 							}
674 							else if (log.isWarnEnabled()) {
675 								// This should never happen !
676 								log.warn(String
677 										.format("[%s] Found deleted items on table [%s] (pk: [%s]) but no column [%s] exists on this table. Skipping this deleted items.",
678 												DataSynchroTables.DELETED_ITEM_HISTORY.name(),
679 												tableName,
680 												SynchroTableMetadata.toPkStr(pk),
681 												DatabaseColumns.REMOTE_ID.name()
682 										));
683 							}
684 
685 							// Children table should NOT be add here - mantis #23799
686 							// This is done later, inside method SynchroServiceImpl.synchronizeChildrenToDeletes()
687 						}
688 
689 						// If export to server DB : object_id = table's id
690 						else {
691 							operation.addMissingDelete(pk);
692 
693 							// If has children, add child deletion to result
694 							if (hasChildTables) {
695 								addDeleteChildrenToDeque(table, ImmutableList.of(pk), result, context);
696 							}
697 						}
698 					}
699 				}
700 			}
701 		} finally {
702 			Daos.closeSilently(dihResultSet);
703 		}
704 
705 		if (CollectionUtils.isNotEmpty(dihIdsToRemove)) {
706 			SynchroTableOperation operation = new SynchroTableOperation(DataSynchroTables.DELETED_ITEM_HISTORY.name(), context);
707 			operation.addChildrenToDeleteFromOneColumn(DataSynchroTables.DELETED_ITEM_HISTORY.name(), DatabaseColumns.REMOTE_ID.name(),
708 					dihIdsToRemove);
709 			result.add(operation);
710 		}
711 
712 		return result;
713 	}
714 
715 	/** {@inheritDoc} */
716 	@Override
717 	protected void resolveTableRow(SynchroTableMetadata table,
718 			RejectedRow reject,
719 			RejectedRow.ResolveStrategy rejectStrategy,
720 			DaoFactory daoFactory,
721 			SynchroTableOperation operation,
722 			SynchroContext context,
723 			SynchroResult result) {
724 
725 		// Execute default implementation
726 		super.resolveTableRow(
727 				table,
728 				reject,
729 				rejectStrategy,
730 				daoFactory,
731 				operation,
732 				context,
733 				result
734 				);
735 
736 		boolean hasSynchronizationStatus = table.getColumnNames().contains(DatabaseColumns.SYNCHRONIZATION_STATUS.name());
737 		DataSynchroContext dataSynchroContext = (DataSynchroContext) context;
738 
739 		// Add a special case for Exportation :
740 		// when keep local data : make sure all entity will be re-export,
741 		// by changing synchronizaton_status to READ_TO_SYNC
742 		if (hasSynchronizationStatus
743 				&& dataSynchroContext.getDirection().isExport()
744 				&& reject.cause == RejectedRow.Cause.BAD_UPDATE_DATE
745 				&& rejectStrategy == RejectedRow.ResolveStrategy.KEEP_LOCAL) {
746 
747 			operation.addMissingColumnUpdate(
748 					DatabaseColumns.SYNCHRONIZATION_STATUS.name(),
749 					reject.targetPkStr,
750 					SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue());
751 		}
752 	}
753 
754 	/** {@inheritDoc} */
755 	@Override
756 	protected void detachRows(
757 			SynchroTableDao targetDao,
758 			List<List<Object>> pks,
759 			SynchroContext context,
760 			SynchroResult result,
761 			Deque<SynchroTableOperation> pendingOperations)
762 			throws SQLException {
763 
764 		// Call default implementation
765 		super.detachRows(targetDao, pks, context, result, pendingOperations);
766 
767 		// Then override to reset remote_id:
768 
769 		SynchroTableMetadata table = targetDao.getTable();
770 		boolean hasSynchronizationStatus = table.getColumnNames().contains(DatabaseColumns.SYNCHRONIZATION_STATUS.name());
771 		boolean hasRemoteId = table.getColumnNames().contains(DatabaseColumns.REMOTE_ID.name());
772 		if (!hasSynchronizationStatus && !hasRemoteId) {
773 			return;
774 		}
775 
776 		String tableName = table.getName();
777 		String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
778 		String readyToSynchronize = SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue();
779 		int countR = 0;
780 
781 		// For each pk to detach: reset remote_id
782 		if (hasRemoteId) {
783 			for (List<Object> pk : pks) {
784 				targetDao.executeUpdateColumn(DatabaseColumns.REMOTE_ID.name(), pk, null);
785 				countR++;
786 				reportProgress(result, targetDao, countR, tablePrefix);
787 			}
788 		}
789 
790 		// For each pk to detach: set synchronization_status to READY_TO_SYNC
791 		// (in case the status was SYNC)
792 		if (hasSynchronizationStatus) {
793 			for (List<Object> pk : pks) {
794 				targetDao.executeUpdateColumn(DatabaseColumns.SYNCHRONIZATION_STATUS.name(), pk, readyToSynchronize);
795 				countR++;
796 				reportProgress(result, targetDao, countR, tablePrefix);
797 			}
798 		}
799 
800 		if (countR == 0) {
801 			return;
802 		}
803 
804 		targetDao.flush();
805 
806 		result.addTableName(tableName);
807 		result.addUpdates(tableName, countR);
808 
809 		if (log.isInfoEnabled()) {
810 			log.info(String.format("%s done: %s (detachment: %s)", tablePrefix, countR, countR));
811 		}
812 
813 		if (targetDao.getCurrentOperation().isEnableProgress()) {
814 			result.getProgressionModel().increments(countR % batchSize);
815 		}
816 
817 	}
818 
819 	/* -- internal methods -- */
820 
821 	/**
822 	 * <p>
823 	 * initContext.
824 	 * </p>
825 	 * 
826 	 * @param context
827 	 *            a {@link fr.ifremer.common.synchro.service.SynchroContext} object.
828 	 */
829 	protected void initContext(SynchroContext context) {
830 
831 		// Set update date
832 		context.getSource().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
833 		context.getTarget().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
834 
835 		if (context instanceof DataSynchroContext) {
836 			DataSynchroDatabaseConfiguration source = (DataSynchroDatabaseConfiguration) context.getSource();
837 			DataSynchroDatabaseConfiguration target = (DataSynchroDatabaseConfiguration) context.getTarget();
838 
839 			// Set the column remote_id name
840 			source.setColumnRemoteId(DatabaseColumns.REMOTE_ID.name().toLowerCase());
841 			target.setColumnRemoteId(DatabaseColumns.REMOTE_ID.name().toLowerCase());
842 
843 			// Set the column synchronization_status name
844 			source.setColumnSynchronizationStatus(DatabaseColumns.SYNCHRONIZATION_STATUS.name().toLowerCase());
845 			target.setColumnSynchronizationStatus(DatabaseColumns.SYNCHRONIZATION_STATUS.name().toLowerCase());
846 		}
847 	}
848 
849 	/**
850 	 * Check if a split is need, for import (see mantis #30765)
851 	 * 
852 	 * @param resultAfterPrepare
853 	 *            a {@link fr.ifremer.common.synchro.service.SynchroResult} object.
854 	 * @param context
855 	 *            a {@link fr.ifremer.common.synchro.service.SynchroContext} object.
856 	 * @return a {@link java.util.List} object.
857 	 */
858 	protected List<Multimap<String, String>> computePkIncludesListForBatch(SynchroResult resultAfterPrepare,
859 			SynchroContext context) {
860 		int maxRootRowCount = Quadrige2Configuration.getInstance().getImportDataMaxRootRowCount();
861 		if (MapUtils.isEmpty(resultAfterPrepare.getUpdateDateHits())
862 				|| maxRootRowCount <= 0 /* Skip if disable */
863 				|| resultAfterPrepare.getTotalRows() <= maxRootRowCount) {
864 			return null;
865 		}
866 
867 		Preconditions.checkNotNull(context);
868 		if (log.isDebugEnabled()) {
869 			log.debug(String.format("Compute PKs to import, by batch of %s rows", maxRootRowCount));
870 		}
871 
872 		SynchroDatabaseConfiguration source = context.getSource();
873 		Preconditions.checkNotNull(source);
874 
875 		Connection sourceConnection = null;
876 		DaoFactory sourceDaoFactory = null;
877 		SynchroDatabaseMetadata dbMeta = null;
878 		ResultSet dataToUpdate = null;
879 
880 		try {
881 
882 			Set<String> tableNames = resultAfterPrepare.getUpdateDateHits().keySet();
883 			List<Multimap<String, String>> result = Lists.newArrayList();
884 
885 			// create source Connection
886 			sourceConnection = createConnection(source);
887 
888 			log.debug("Loading source database metadata...");
889 			boolean isFullMetadataEnable = source.isFullMetadataEnable();
890 			source.setFullMetadataEnable(true);
891 			dbMeta = loadDatabaseMetadata(
892 					sourceConnection,
893 					source,
894 					tableNames);
895 			source.setFullMetadataEnable(isFullMetadataEnable);
896 
897 			// Create DAO factories
898 			sourceDaoFactory = newDaoFactory(sourceConnection, source, dbMeta);
899 
900 			// For each table found during preparation phase
901 			Multimap<String, String> currentBatch = ArrayListMultimap.create();
902 			int currentBatchSize = 0;
903 			for (String tableName : tableNames) {
904 
905 				SynchroTableDao sourceDao = sourceDaoFactory.getSourceDao(tableName);
906 
907 				Map<String, Object> bindings = createSelectBindingsForTable(context, tableName);
908 				dataToUpdate = sourceDao.getData(bindings);
909 
910 				while (dataToUpdate.next()) {
911 
912 					String pkStr = SynchroTableMetadata.toPkStr(sourceDao.getPk(dataToUpdate));
913 
914 					// If current batch is full: add it to result list
915 					if (currentBatchSize == maxRootRowCount) {
916 						result.add(currentBatch);
917 						currentBatch = ArrayListMultimap.create();
918 						currentBatchSize = 0;
919 					}
920 
921 					currentBatch.put(tableName, pkStr);
922 					currentBatchSize++;
923 				}
924 
925 				// Close the result set
926 				Daos.closeSilently(dataToUpdate);
927 				dataToUpdate = null;
928 			}
929 
930 			// Add last batch to result
931 			if (currentBatchSize > 0) {
932 				result.add(currentBatch);
933 			}
934 
935 			return result;
936 
937 		} catch (Exception e) {
938 			if (log.isDebugEnabled()) {
939 				log.debug(e);
940 			}
941 			throw new SynchroTechnicalException(e);
942 		} finally {
943 			Daos.closeSilently(dataToUpdate);
944 			IOUtils.closeQuietly(sourceDaoFactory);
945 			closeSilently(dbMeta);
946 			closeSilently(sourceConnection);
947 		}
948 	}
949 
950 	/**
951 	 * <p>
952 	 * synchronizeUsingBatch.
953 	 * </p>
954 	 * 
955 	 * @param synchroContext
956 	 *            a {@link fr.ifremer.common.synchro.service.SynchroContext} object.
957 	 * @param pkIncludesListForBatch
958 	 *            a {@link java.util.List} object.
959 	 */
960 	protected void synchronizeUsingBatch(SynchroContext synchroContext,
961 			List<Multimap<String, String>> pkIncludesListForBatch) {
962 
963 		DataSynchroContext dataSynchroContext = (DataSynchroContext) synchroContext;
964 		SynchroResult result = dataSynchroContext.getResult();
965 
966 		boolean savedEnableDelete = dataSynchroContext.isEnableDelete();
967 		boolean currentBatchEnableDelete = savedEnableDelete;
968 
969 		// Init the progression model (only current+total, because message will be delegate to a child progression
970 		// model)
971 		ProgressionModel progressionModel = result.getProgressionModel();
972 		progressionModel.setCurrent(0);
973 		progressionModel.setTotal(result.getTotalRows() + pkIncludesListForBatch.size());
974 
975 		// Clear result (will be updated after each batch, by calling 'addAll()')
976 		result.clear();
977 
978 		for (Multimap<String, String> currentBatchPkIncludes : pkIncludesListForBatch) {
979 
980 			// Update the context to process only PK included in the current batch
981 			SynchroResult currentBatchResult = new SynchroResult(result.getLocalUrl(), result.getRemoteUrl());
982 			for (String tableName : currentBatchPkIncludes.keySet()) {
983 				currentBatchResult.setUpdateDate(tableName, null);
984 				currentBatchResult.addRows(tableName, currentBatchPkIncludes.get(tableName).size());
985 			}
986 			dataSynchroContext.setResult(currentBatchResult);
987 			dataSynchroContext.setPkIncludes(currentBatchPkIncludes);
988 			dataSynchroContext.setEnableDelete(currentBatchEnableDelete);
989 
990 			int batchProgressionModelOffset = progressionModel.getCurrent();
991 			addProgressionListeners(progressionModel,
992 					currentBatchResult.getProgressionModel(),
993 					batchProgressionModelOffset);
994 
995 			// Call synchronize
996 			super.synchronize(dataSynchroContext);
997 
998 			// Add the current result to the final result
999 			result.addAll(currentBatchResult);
1000 
1001 			// Update progression model
1002 			// progressionModel.setCurrent(batchProgressionModelOffset + currentBatchResult.getTotalRows());
1003 
1004 			// If error, then store this error into final result, and stop iteration
1005 			if (!currentBatchResult.isSuccess()) {
1006 				result.setError(currentBatchResult.getError());
1007 				break;
1008 			}
1009 
1010 			// Disable deletions import, for the next iteration
1011 			// because all deletions processed during the first iteration
1012 			currentBatchEnableDelete = false;
1013 
1014 			// Do an intermediate commit (+free memory)
1015 			try {
1016 				commitAndFreeMemory(synchroContext.getTarget());
1017 			} catch (SQLException e) {
1018 				result.setError(e);
1019 				break;
1020 			}
1021 		}
1022 
1023 		// Update progression model
1024 		progressionModel.setCurrent(progressionModel.getTotal());
1025 
1026 		// Restore the context: result and configuration (pkIncludes and enableDelete)
1027 		dataSynchroContext.setResult(result);
1028 		dataSynchroContext.setPkIncludes(null);
1029 		dataSynchroContext.setEnableDelete(savedEnableDelete);
1030 	}
1031 
1032 	/**
1033 	 * Remap batch progression model to the main progression model
1034 	 * 
1035 	 * @param mainProgressionModel
1036 	 *            a {@link fr.ifremer.common.synchro.type.ProgressionModel} object.
1037 	 * @param batchProgressionModel
1038 	 * @param batchProgressionModelOffset
1039 	 *            a int.
1040 	 */
1041 	protected void addProgressionListeners(
1042 			final ProgressionModel mainProgressionModel,
1043 			final ProgressionModel batchProgressionModel,
1044 			final int batchProgressionModelOffset) {
1045 		// Listen 'current' attribute changes
1046 		batchProgressionModel.addPropertyChangeListener(ProgressionModel.PROPERTY_CURRENT,
1047 				new PropertyChangeListener() {
1048 					@Override
1049 					public void propertyChange(PropertyChangeEvent evt) {
1050 						Integer current = (Integer) evt.getNewValue();
1051 						mainProgressionModel.setCurrent(batchProgressionModelOffset + current);
1052 					}
1053 				});
1054 
1055 		// Listen message changes
1056 		batchProgressionModel.addPropertyChangeListener(ProgressionModel.PROPERTY_MESSAGE,
1057 				new PropertyChangeListener() {
1058 					@Override
1059 					public void propertyChange(PropertyChangeEvent evt) {
1060 						String message = (String) evt.getNewValue();
1061 						mainProgressionModel.setMessage(message);
1062 					}
1063 				});
1064 	}
1065 
1066 	/**
1067 	 * THis method will do an intermediate commit on current connection.
1068 	 * If HsqlDB, will compact the database.
1069 	 * At the end, GC is calling
1070 	 * 
1071 	 * @param dbConfig
1072 	 *            a {@link fr.ifremer.common.synchro.service.SynchroDatabaseConfiguration} object.
1073 	 * @throws java.sql.SQLException
1074 	 *             if any.
1075 	 */
1076 	protected void commitAndFreeMemory(SynchroDatabaseConfiguration dbConfig) throws SQLException {
1077 		// Do an intermediate commit
1078 		Connection connection = null;
1079 		try {
1080 			connection = createConnection(dbConfig);
1081 			connection.commit();
1082 			System.gc();
1083 		} finally {
1084 			closeSilently(connection);
1085 		}
1086 	}
1087 
1088 	@Override
1089 	protected Connection createConnection(String jdbcUrl, String user, String password) throws SQLException {
1090 		Connection connection = super.createConnection(jdbcUrl, user, password);
1091 
1092 		// Set timezone - mantis #36465
1093 		fr.ifremer.quadrige2.core.dao.technical.Daos.setTimezone(connection, Quadrige2Configuration.getInstance().getDbTimezone());
1094 
1095 		return connection;
1096 	}
1097 }