1 package fr.ifremer.quadrige2.synchro.service.data;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.base.Preconditions;
27 import com.google.common.collect.*;
28 import fr.ifremer.common.synchro.SynchroTechnicalException;
29 import fr.ifremer.common.synchro.config.SynchroConfiguration;
30 import fr.ifremer.common.synchro.dao.DaoFactory;
31 import fr.ifremer.common.synchro.dao.Daos;
32 import fr.ifremer.common.synchro.dao.SynchroTableDao;
33 import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
34 import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
35 import fr.ifremer.common.synchro.service.*;
36 import fr.ifremer.common.synchro.type.ProgressionModel;
37 import fr.ifremer.quadrige2.core.dao.technical.Dates;
38 import fr.ifremer.quadrige2.core.exception.Quadrige2TechnicalException;
39 import fr.ifremer.quadrige2.core.config.Quadrige2Configuration;
40 import fr.ifremer.quadrige2.core.dao.ObjectTypes;
41 import fr.ifremer.quadrige2.core.dao.system.synchronization.SynchronizationStatus;
42 import fr.ifremer.quadrige2.core.dao.technical.hibernate.ConfigurationHelper;
43 import fr.ifremer.quadrige2.synchro.meta.DatabaseColumns;
44 import fr.ifremer.quadrige2.synchro.meta.data.DataSynchroTables;
45 import fr.ifremer.quadrige2.synchro.service.SynchroDirection;
46 import org.apache.commons.collections4.CollectionUtils;
47 import org.apache.commons.collections4.MapUtils;
48 import org.apache.commons.io.IOUtils;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52 import org.hibernate.dialect.Dialect;
53 import org.springframework.beans.factory.annotation.Autowired;
54 import org.springframework.context.annotation.Lazy;
55 import org.springframework.stereotype.Service;
56
57 import javax.sql.DataSource;
58 import java.beans.PropertyChangeEvent;
59 import java.beans.PropertyChangeListener;
60 import java.io.File;
61 import java.sql.Connection;
62 import java.sql.ResultSet;
63 import java.sql.SQLException;
64 import java.sql.Timestamp;
65 import java.util.*;
66
67 import static org.nuiton.i18n.I18n.t;
68
69
70
71
72
73
74
75 @Service("dataSynchroService")
76 @Lazy
77 public class DataSynchroServiceImpl
78 extends SynchroServiceImpl
79 implements DataSynchroService {
80
81 private static final Log log = LogFactory.getLog(DataSynchroServiceImpl.class);
82
83 private static boolean DISABLE_INTEGRITY_CONSTRAINTS = false;
84 private static boolean ALLOW_MISSING_OPTIONAL_COLUMN = true;
85 private static boolean ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA = false;
86 private static boolean KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS = true;
87
88 private final int exportUpdateDateDelayInSecond;
89
90
91
92
93
94
95
96
97
98
99
100 @Autowired
101 public DataSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
102 super(dataSource, config,
103 DISABLE_INTEGRITY_CONSTRAINTS,
104 ALLOW_MISSING_OPTIONAL_COLUMN,
105 ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
106 KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
107 exportUpdateDateDelayInSecond = Quadrige2Configuration.getInstance().getExportDataUpdateDateDelayInSecond();
108 }
109
110
111
112
113
114
115 public DataSynchroServiceImpl() {
116 super(DISABLE_INTEGRITY_CONSTRAINTS,
117 ALLOW_MISSING_OPTIONAL_COLUMN,
118 ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
119 KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
120 exportUpdateDateDelayInSecond = Quadrige2Configuration.getInstance().getExportDataUpdateDateDelayInSecond();
121 }
122
123
124 @Override
125 public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId) {
126 Preconditions.checkNotNull(sourceDbDirectory);
127 Preconditions.checkArgument(sourceDbDirectory.exists() && sourceDbDirectory.isDirectory());
128 return createSynchroContext(sourceDbDirectory, direction, userId, null, true, true);
129 }
130
131
132 @Override
133 public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId,
134 Timestamp lastSynchronizationDate) {
135 return createSynchroContext(sourceDbDirectory, direction, userId, lastSynchronizationDate, true, true);
136 }
137
138
139 @Override
140 public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId,
141 Timestamp lastSynchronizationDate, boolean enableDelete, boolean enableInsertUpdate) {
142 SynchroContext delegate = super.createSynchroContext(sourceDbDirectory, DataSynchroTables.getImportTablesIncludes());
143
144
145 delegate.getTarget().putAllProperties(Quadrige2Configuration.getInstance().getConnectionProperties());
146
147 DataSynchroContext result = new DataSynchroContext(delegate, direction, userId);
148 result.setLastSynchronizationDate(lastSynchronizationDate);
149 result.setEnableDelete(enableDelete);
150 result.setEnableInsertOrUpdate(enableInsertUpdate);
151 initContext(result);
152 return result;
153 }
154
155
156 @Override
157 public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId) {
158 return createSynchroContext(sourceConnectionProperties, direction, userId, null, true, true);
159 }
160
161
162 @Override
163 public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId,
164 Timestamp lastSynchronizationDate) {
165 return createSynchroContext(sourceConnectionProperties, direction, userId, lastSynchronizationDate, true, true);
166 }
167
168
169 @Override
170 public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId,
171 Timestamp lastSynchronizationDate, boolean enableDelete, boolean enableInsertUpdate) {
172 Preconditions.checkNotNull(sourceConnectionProperties);
173 SynchroContext delegate = super.createSynchroContext(sourceConnectionProperties, DataSynchroTables.getImportTablesIncludes());
174
175
176 delegate.getTarget().putAllProperties(Quadrige2Configuration.getInstance().getConnectionProperties());
177
178 DataSynchroContext result = new DataSynchroContext(delegate, direction, userId);
179 result.setLastSynchronizationDate(lastSynchronizationDate);
180 result.setEnableDelete(enableDelete);
181 result.setEnableInsertOrUpdate(enableInsertUpdate);
182 initContext(result);
183 return result;
184 }
185
186
187 @Override
188 public void prepare(SynchroContext synchroContext) {
189 Preconditions.checkArgument(synchroContext != null && synchroContext instanceof DataSynchroContext,
190 String.format("The context must be a instance of %s", DataSynchroContext.class.getName()));
191
192 DataSynchroContext dataSynchroContext = (DataSynchroContext) synchroContext;
193 SynchroResult result = dataSynchroContext.getResult();
194 SynchroDirection direction = dataSynchroContext.getDirection();
195
196 DataSynchroDatabaseConfiguration target = (DataSynchroDatabaseConfiguration) dataSynchroContext.getTarget();
197 DataSynchroDatabaseConfiguration source = (DataSynchroDatabaseConfiguration) dataSynchroContext.getSource();
198
199
200 if (direction == SynchroDirection.IMPORT_SERVER2TEMP) {
201
202
203 source.excludeMeasurementUnusedColumns();
204
205
206 target.setIsMirrorDatabase(true);
207
208
209 target.excludeUnusedColumns();
210
211
212 {
213 disableIntegrityConstraints(target.getConnectionProperties(), result);
214 if (!result.isSuccess()) {
215 return;
216 }
217
218
219 source.setCheckUniqueConstraintBetweenInputRows(false);
220 target.setCheckUniqueConstraintBetweenInputRows(false);
221 }
222
223
224 if (dataSynchroContext.getDataEndDate() == null) {
225 fillDefaultDataPeriod(dataSynchroContext);
226 }
227
228
229 if (dataSynchroContext.getPkIncludes() == null) {
230 fillDefaultPkIncludes(dataSynchroContext);
231 }
232 }
233
234
235 else if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
236
237
238 target.setIsMirrorDatabase(false);
239
240
241 source.setCheckUniqueConstraintBetweenInputRows(false);
242 target.setCheckUniqueConstraintBetweenInputRows(false);
243 }
244
245
246 if (direction == SynchroDirection.IMPORT_NO_TEMP) {
247
248 throw new Quadrige2TechnicalException("Direction IMPORT_NO_TEMP not implemented yet for Data tables");
249 }
250
251
252 else if (direction == SynchroDirection.EXPORT_LOCAL2TEMP) {
253
254
255 target.setIsMirrorDatabase(true);
256
257
258 {
259 disableIntegrityConstraints(target.getConnectionProperties(), result);
260 if (!result.isSuccess()) {
261 return;
262 }
263 }
264
265
266 source.setCheckUniqueConstraintBetweenInputRows(false);
267 target.setCheckUniqueConstraintBetweenInputRows(false);
268 }
269
270
271 else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
272
273
274 source.setFullMetadataEnable(true);
275 source.excludeUnusedColumns();
276
277
278 target.setIsMirrorDatabase(false);
279 target.setFullMetadataEnable(false);
280
281
282 fillSystemTimestampWithDelay(result, target);
283 source.setSystemTimestamp(target.getSystemTimestamp());
284 }
285
286
287 else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
288
289
290 target.setIsMirrorDatabase(true);
291 }
292
293
294 else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
295
296
297 target.setIsMirrorDatabase(false);
298 }
299
300 super.prepare(synchroContext);
301 }
302
303
304 @Override
305 public void synchronize(SynchroContext synchroContext) {
306 Preconditions.checkArgument(synchroContext != null && synchroContext instanceof DataSynchroContext,
307 String.format("The context must be a instance of %s", DataSynchroContext.class.getName()));
308
309 DataSynchroContext dataSynchroContext = (DataSynchroContext) synchroContext;
310 SynchroResult result = dataSynchroContext.getResult();
311 List<Multimap<String, String>> pkIncludesListForBatch = null;
312
313 DataSynchroDatabaseConfiguration target = (DataSynchroDatabaseConfiguration) dataSynchroContext.getTarget();
314 DataSynchroDatabaseConfiguration source = (DataSynchroDatabaseConfiguration) dataSynchroContext.getSource();
315
316
317 if (dataSynchroContext.getDirection() == SynchroDirection.IMPORT_SERVER2TEMP) {
318
319
320 target.removeColumnExclude(target.getColumnSynchronizationStatus());
321 target.removeColumnExclude(target.getColumnRemoteId());
322
323
324 {
325 disableIntegrityConstraints(dataSynchroContext.getTarget().getConnectionProperties(), result);
326 if (!result.isSuccess()) {
327 return;
328 }
329 }
330 }
331
332
333 else if (dataSynchroContext.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER) {
334
335
336 source.removeColumnExclude(source.getColumnSynchronizationStatus());
337 source.removeColumnExclude(source.getColumnRemoteId());
338 }
339
340
341 else if (dataSynchroContext.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL) {
342
343
344
345
346
347 if (dataSynchroContext.isEnableInsertOrUpdate()
348 && (dataSynchroContext.getPkIncludes() == null || dataSynchroContext.getPkIncludes().isEmpty())) {
349 pkIncludesListForBatch = computePkIncludesListForBatch(result, dataSynchroContext);
350 }
351 }
352
353 boolean synchronizeUsingBatch = CollectionUtils.isNotEmpty(pkIncludesListForBatch);
354
355
356 if (!synchronizeUsingBatch
357 || !dataSynchroContext.isEnableInsertOrUpdate()) {
358 super.synchronize(synchroContext);
359 }
360
361
362 else {
363 synchronizeUsingBatch(synchroContext, pkIncludesListForBatch);
364 }
365 }
366
367
368 @Override
369 public void finish(SynchroContext synchroContext,
370 SynchroResult resultWithPendingOperation,
371 Map<RejectedRow.Cause, RejectedRow.ResolveStrategy> rejectStrategies) {
372 super.finish(synchroContext, resultWithPendingOperation, rejectStrategies);
373 }
374
375 @Override
376 protected void cleanSynchroResult(SynchroContext context, SynchroResult result) {
377 super.cleanSynchroResult(context, result);
378 }
379
380
381
382
383 @Override
384 protected void prepareRootTable(
385 DaoFactory sourceDaoFactory,
386 DaoFactory targetDaoFactory,
387 SynchroTableMetadata table,
388 SynchroContext context,
389 SynchroResult result) throws SQLException {
390
391 DataSynchroContext dataContext = (DataSynchroContext) context;
392
393
394 if (dataContext.isEnableInsertOrUpdate()) {
395 super.prepareRootTable(sourceDaoFactory,
396 targetDaoFactory,
397 table,
398 context,
399 result);
400 }
401
402
403 if (dataContext.isEnableDelete()) {
404 prepareDeletedRootTable(sourceDaoFactory,
405 targetDaoFactory,
406 table,
407 context,
408 result);
409 }
410 }
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428 protected void prepareDeletedRootTable(
429 DaoFactory sourceDaoFactory,
430 DaoFactory targetDaoFactory,
431 SynchroTableMetadata table,
432 SynchroContext context,
433 SynchroResult result) throws SQLException {
434
435 String tableName = table.getName();
436 Set<String> objectTypeFks = ObjectTypes.getObjectTypeFromTableName(tableName);
437
438 if (CollectionUtils.isEmpty(objectTypeFks)) {
439 return;
440 }
441
442 SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.DELETED_ITEM_HISTORY.name());
443
444 List<List<Object>> columnValues = Lists.newArrayListWithCapacity(objectTypeFks.size());
445 for (String objectTypeFk : objectTypeFks) {
446 columnValues.add(ImmutableList.<Object> of(objectTypeFk));
447 }
448
449
450 Map<String, Object> bindings = createSelectBindingsForTable(context, DataSynchroTables.DELETED_ITEM_HISTORY.name());
451 long count = dihSourceDao.countDataByFks(
452 ImmutableSet.of(DatabaseColumns.OBJECT_TYPE_CD.name()),
453 columnValues,
454 bindings
455 );
456 if (count > 0) {
457 result.addRows(tableName, (int) count);
458 }
459 }
460
461
462 @Override
463 protected Map<String, Object> createDefaultSelectBindings(SynchroContext context) {
464
465 Map<String, Object> bindings = super.createDefaultSelectBindings(context);
466
467 if (context instanceof DataSynchroContext) {
468 DataSynchroContext dataContext = (DataSynchroContext) context;
469
470
471 bindings.put("userId", dataContext.getUserId());
472
473
474 if (dataContext.getDataStartDate() != null) {
475 bindings.put("startDate", new Timestamp(dataContext.getDataStartDate().getTime()));
476 }
477
478 if (dataContext.getDataEndDate() != null) {
479 bindings.put("endDate", new Timestamp(dataContext.getDataEndDate().getTime()));
480 }
481 }
482
483 return bindings;
484 }
485
486
487
488
489
490
491
492 @Override
493 protected Map<String, Object> createSelectBindingsForTable(SynchroContext context, String tableName) {
494
495 Map<String, Object> bindings = Maps.newHashMap(getSelectBindings(context));
496
497 Timestamp lastSynchronizationDate = context.getLastSynchronizationDate();
498 boolean notEmptyTargetTable = (context.getResult().getUpdateDate(tableName) != null);
499 boolean enableUpdateDateFilter = (lastSynchronizationDate != null)
500 && (context.getTarget().isMirrorDatabase() || notEmptyTargetTable);
501
502 if (enableUpdateDateFilter) {
503 bindings.put(SynchroTableMetadata.UPDATE_DATE_BINDPARAM, lastSynchronizationDate);
504 }
505
506 return bindings;
507 }
508
509
510
511
512
513
514
515
516
517
518
519 protected void fillSystemTimestampWithDelay(SynchroResult result, SynchroDatabaseConfiguration databaseConfiguration) {
520
521 result.getProgressionModel().setMessage(t("quadrige2.synchro.synchronizeData.initSystemTimestamp"));
522 if (log.isInfoEnabled()) {
523 log.info(t("quadrige2.synchro.synchronizeData.initSystemTimestamp"));
524 }
525
526 Connection connection = null;
527 try {
528 connection = createConnection(databaseConfiguration);
529 Dialect dialect = databaseConfiguration.getDialect();
530 Timestamp currentTimestamp = Daos.getCurrentTimestamp(connection, dialect);
531
532
533 Dates.addSeconds(currentTimestamp, exportUpdateDateDelayInSecond);
534
535 if (log.isDebugEnabled()) {
536 log.debug(String.format("Current timestamp: %s", currentTimestamp));
537 }
538
539 databaseConfiguration.setSystemTimestamp(currentTimestamp);
540 } catch (Quadrige2TechnicalException e) {
541 result.setError(e);
542 } catch (SQLException e) {
543 result.setError(e);
544 } finally {
545 closeSilently(connection);
546 }
547 }
548
549
550
551
552
553
554
555
556
557 protected void fillDefaultDataPeriod(DataSynchroContext dataSynchroContext) {
558 int nbYearDataHistory = Quadrige2Configuration.getInstance().getImportNbYearDataHistory();
559
560
561 Date dataStartDate = Dates.addYears(new Date(), -1 * nbYearDataHistory);
562 dataStartDate = Dates.truncate(dataStartDate, Calendar.YEAR);
563 dataSynchroContext.setDataStartDate(dataStartDate);
564
565
566 Date dataEndDate = Dates.lastSecondOfTheDay(new Date());
567 dataSynchroContext.setDataEndDate(dataEndDate);
568 }
569
570
571
572
573
574
575
576
577
578 protected void fillDefaultPkIncludes(DataSynchroContext dataSynchroContext) {
579 String confProperty = Quadrige2Configuration.getInstance().getImportDataPkIncludes();
580 Multimap<String, String> pkIncludes = ConfigurationHelper.getMultimap(confProperty);
581 dataSynchroContext.setPkIncludes(pkIncludes);
582 }
583
584
585 @Override
586 protected List<SynchroTableOperation> getRootOperations(
587 DaoFactory sourceDaoFactory,
588 DaoFactory targetDaoFactory,
589 SynchroDatabaseMetadata dbMeta,
590 SynchroContext context) throws SQLException {
591 DataSynchroContext dataContext = (DataSynchroContext) context;
592 List<SynchroTableOperation> result = Lists.newArrayList();
593
594
595
596 if (dataContext.isEnableDelete()
597 && (dataContext.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER
598 || dataContext.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL)) {
599
600 Collection<SynchroTableOperation> deletedItemOperations = getDeleteOperations(sourceDaoFactory, targetDaoFactory, dbMeta, dataContext);
601 result.addAll(deletedItemOperations);
602 }
603
604
605 if (dataContext.isEnableInsertOrUpdate()) {
606 Collection<SynchroTableOperation> defaultOperations = super.getRootOperations(sourceDaoFactory, targetDaoFactory, dbMeta, context);
607 result.addAll(defaultOperations);
608 }
609
610 return result;
611 }
612
613 private Collection<SynchroTableOperation> getDeleteOperations(
614 DaoFactory sourceDaoFactory,
615 DaoFactory targetDaoFactory,
616 SynchroDatabaseMetadata dbMeta,
617 DataSynchroContext context) throws SQLException {
618 Preconditions.checkArgument(dbMeta.getConfiguration().isFullMetadataEnable());
619
620 Deque<SynchroTableOperation> result = Queues.newArrayDeque();
621
622 SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.DELETED_ITEM_HISTORY.name());
623 Set<String> includedDataTables = DataSynchroTables.getImportTablesIncludes();
624 if (CollectionUtils.isEmpty(includedDataTables)) {
625 return result;
626 }
627
628
629 Map<String, Object> bindings = createSelectBindingsForTable(context, DataSynchroTables.DELETED_ITEM_HISTORY.name());
630 ResultSet dihResultSet = dihSourceDao.getData(bindings);
631
632 List<Object> dihIdsToRemove = Lists.newArrayList();
633
634 try {
635 dihResultSet = dihSourceDao.getData(bindings);
636
637 while (dihResultSet.next()) {
638
639 long dihId = dihResultSet.getLong(DatabaseColumns.DEL_ITEM_HIST_ID.name());
640 String objectType = dihResultSet.getString(DatabaseColumns.OBJECT_TYPE_CD.name());
641 String tableName = ObjectTypes.getTableNameFromObjectType(objectType);
642
643 boolean isDataTable = StringUtils.isNotBlank(tableName)
644 && includedDataTables.contains(tableName.toUpperCase());
645
646 if (isDataTable) {
647 SynchroTableDao targetDao = targetDaoFactory.getSourceDao(tableName);
648 SynchroTableMetadata table = targetDao.getTable();
649 boolean shouldDeleteUsingRemoteId = (context.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL);
650
651 String objectCode = dihResultSet.getString(DatabaseColumns.OBJECT_CD.name());
652 String objectId = dihResultSet.getString(DatabaseColumns.OBJECT_ID.name());
653
654
655 if (StringUtils.isNotBlank(objectCode) || StringUtils.isNotBlank(objectId)) {
656 List<Object> pk = StringUtils.isNotBlank(objectCode)
657 ? ImmutableList.<Object> of(objectCode)
658 : ImmutableList.<Object> of(objectId);
659
660 boolean hasChildTables = targetDao.getTable().hasChildJoins();
661
662 SynchroTableOperation operation = new SynchroTableOperation(tableName, context);
663 operation.setEnableProgress(true);
664 result.add(operation);
665
666
667 if (shouldDeleteUsingRemoteId) {
668 if (table.getColumnIndex(DatabaseColumns.REMOTE_ID.name()) != -1) {
669 operation.addChildrenToDeleteFromOneColumn(tableName, DatabaseColumns.REMOTE_ID.name(), pk);
670
671
672 dihIdsToRemove.add(dihId);
673 }
674 else if (log.isWarnEnabled()) {
675
676 log.warn(String
677 .format("[%s] Found deleted items on table [%s] (pk: [%s]) but no column [%s] exists on this table. Skipping this deleted items.",
678 DataSynchroTables.DELETED_ITEM_HISTORY.name(),
679 tableName,
680 SynchroTableMetadata.toPkStr(pk),
681 DatabaseColumns.REMOTE_ID.name()
682 ));
683 }
684
685
686
687 }
688
689
690 else {
691 operation.addMissingDelete(pk);
692
693
694 if (hasChildTables) {
695 addDeleteChildrenToDeque(table, ImmutableList.of(pk), result, context);
696 }
697 }
698 }
699 }
700 }
701 } finally {
702 Daos.closeSilently(dihResultSet);
703 }
704
705 if (CollectionUtils.isNotEmpty(dihIdsToRemove)) {
706 SynchroTableOperation operation = new SynchroTableOperation(DataSynchroTables.DELETED_ITEM_HISTORY.name(), context);
707 operation.addChildrenToDeleteFromOneColumn(DataSynchroTables.DELETED_ITEM_HISTORY.name(), DatabaseColumns.REMOTE_ID.name(),
708 dihIdsToRemove);
709 result.add(operation);
710 }
711
712 return result;
713 }
714
715
716 @Override
717 protected void resolveTableRow(SynchroTableMetadata table,
718 RejectedRow reject,
719 RejectedRow.ResolveStrategy rejectStrategy,
720 DaoFactory daoFactory,
721 SynchroTableOperation operation,
722 SynchroContext context,
723 SynchroResult result) {
724
725
726 super.resolveTableRow(
727 table,
728 reject,
729 rejectStrategy,
730 daoFactory,
731 operation,
732 context,
733 result
734 );
735
736 boolean hasSynchronizationStatus = table.getColumnNames().contains(DatabaseColumns.SYNCHRONIZATION_STATUS.name());
737 DataSynchroContext dataSynchroContext = (DataSynchroContext) context;
738
739
740
741
742 if (hasSynchronizationStatus
743 && dataSynchroContext.getDirection().isExport()
744 && reject.cause == RejectedRow.Cause.BAD_UPDATE_DATE
745 && rejectStrategy == RejectedRow.ResolveStrategy.KEEP_LOCAL) {
746
747 operation.addMissingColumnUpdate(
748 DatabaseColumns.SYNCHRONIZATION_STATUS.name(),
749 reject.targetPkStr,
750 SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue());
751 }
752 }
753
754
755 @Override
756 protected void detachRows(
757 SynchroTableDao targetDao,
758 List<List<Object>> pks,
759 SynchroContext context,
760 SynchroResult result,
761 Deque<SynchroTableOperation> pendingOperations)
762 throws SQLException {
763
764
765 super.detachRows(targetDao, pks, context, result, pendingOperations);
766
767
768
769 SynchroTableMetadata table = targetDao.getTable();
770 boolean hasSynchronizationStatus = table.getColumnNames().contains(DatabaseColumns.SYNCHRONIZATION_STATUS.name());
771 boolean hasRemoteId = table.getColumnNames().contains(DatabaseColumns.REMOTE_ID.name());
772 if (!hasSynchronizationStatus && !hasRemoteId) {
773 return;
774 }
775
776 String tableName = table.getName();
777 String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
778 String readyToSynchronize = SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue();
779 int countR = 0;
780
781
782 if (hasRemoteId) {
783 for (List<Object> pk : pks) {
784 targetDao.executeUpdateColumn(DatabaseColumns.REMOTE_ID.name(), pk, null);
785 countR++;
786 reportProgress(result, targetDao, countR, tablePrefix);
787 }
788 }
789
790
791
792 if (hasSynchronizationStatus) {
793 for (List<Object> pk : pks) {
794 targetDao.executeUpdateColumn(DatabaseColumns.SYNCHRONIZATION_STATUS.name(), pk, readyToSynchronize);
795 countR++;
796 reportProgress(result, targetDao, countR, tablePrefix);
797 }
798 }
799
800 if (countR == 0) {
801 return;
802 }
803
804 targetDao.flush();
805
806 result.addTableName(tableName);
807 result.addUpdates(tableName, countR);
808
809 if (log.isInfoEnabled()) {
810 log.info(String.format("%s done: %s (detachment: %s)", tablePrefix, countR, countR));
811 }
812
813 if (targetDao.getCurrentOperation().isEnableProgress()) {
814 result.getProgressionModel().increments(countR % batchSize);
815 }
816
817 }
818
819
820
821
822
823
824
825
826
827
828
829 protected void initContext(SynchroContext context) {
830
831
832 context.getSource().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
833 context.getTarget().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
834
835 if (context instanceof DataSynchroContext) {
836 DataSynchroDatabaseConfiguration source = (DataSynchroDatabaseConfiguration) context.getSource();
837 DataSynchroDatabaseConfiguration target = (DataSynchroDatabaseConfiguration) context.getTarget();
838
839
840 source.setColumnRemoteId(DatabaseColumns.REMOTE_ID.name().toLowerCase());
841 target.setColumnRemoteId(DatabaseColumns.REMOTE_ID.name().toLowerCase());
842
843
844 source.setColumnSynchronizationStatus(DatabaseColumns.SYNCHRONIZATION_STATUS.name().toLowerCase());
845 target.setColumnSynchronizationStatus(DatabaseColumns.SYNCHRONIZATION_STATUS.name().toLowerCase());
846 }
847 }
848
849
850
851
852
853
854
855
856
857
858 protected List<Multimap<String, String>> computePkIncludesListForBatch(SynchroResult resultAfterPrepare,
859 SynchroContext context) {
860 int maxRootRowCount = Quadrige2Configuration.getInstance().getImportDataMaxRootRowCount();
861 if (MapUtils.isEmpty(resultAfterPrepare.getUpdateDateHits())
862 || maxRootRowCount <= 0
863 || resultAfterPrepare.getTotalRows() <= maxRootRowCount) {
864 return null;
865 }
866
867 Preconditions.checkNotNull(context);
868 if (log.isDebugEnabled()) {
869 log.debug(String.format("Compute PKs to import, by batch of %s rows", maxRootRowCount));
870 }
871
872 SynchroDatabaseConfiguration source = context.getSource();
873 Preconditions.checkNotNull(source);
874
875 Connection sourceConnection = null;
876 DaoFactory sourceDaoFactory = null;
877 SynchroDatabaseMetadata dbMeta = null;
878 ResultSet dataToUpdate = null;
879
880 try {
881
882 Set<String> tableNames = resultAfterPrepare.getUpdateDateHits().keySet();
883 List<Multimap<String, String>> result = Lists.newArrayList();
884
885
886 sourceConnection = createConnection(source);
887
888 log.debug("Loading source database metadata...");
889 boolean isFullMetadataEnable = source.isFullMetadataEnable();
890 source.setFullMetadataEnable(true);
891 dbMeta = loadDatabaseMetadata(
892 sourceConnection,
893 source,
894 tableNames);
895 source.setFullMetadataEnable(isFullMetadataEnable);
896
897
898 sourceDaoFactory = newDaoFactory(sourceConnection, source, dbMeta);
899
900
901 Multimap<String, String> currentBatch = ArrayListMultimap.create();
902 int currentBatchSize = 0;
903 for (String tableName : tableNames) {
904
905 SynchroTableDao sourceDao = sourceDaoFactory.getSourceDao(tableName);
906
907 Map<String, Object> bindings = createSelectBindingsForTable(context, tableName);
908 dataToUpdate = sourceDao.getData(bindings);
909
910 while (dataToUpdate.next()) {
911
912 String pkStr = SynchroTableMetadata.toPkStr(sourceDao.getPk(dataToUpdate));
913
914
915 if (currentBatchSize == maxRootRowCount) {
916 result.add(currentBatch);
917 currentBatch = ArrayListMultimap.create();
918 currentBatchSize = 0;
919 }
920
921 currentBatch.put(tableName, pkStr);
922 currentBatchSize++;
923 }
924
925
926 Daos.closeSilently(dataToUpdate);
927 dataToUpdate = null;
928 }
929
930
931 if (currentBatchSize > 0) {
932 result.add(currentBatch);
933 }
934
935 return result;
936
937 } catch (Exception e) {
938 if (log.isDebugEnabled()) {
939 log.debug(e);
940 }
941 throw new SynchroTechnicalException(e);
942 } finally {
943 Daos.closeSilently(dataToUpdate);
944 IOUtils.closeQuietly(sourceDaoFactory);
945 closeSilently(dbMeta);
946 closeSilently(sourceConnection);
947 }
948 }
949
950
951
952
953
954
955
956
957
958
959
960 protected void synchronizeUsingBatch(SynchroContext synchroContext,
961 List<Multimap<String, String>> pkIncludesListForBatch) {
962
963 DataSynchroContext dataSynchroContext = (DataSynchroContext) synchroContext;
964 SynchroResult result = dataSynchroContext.getResult();
965
966 boolean savedEnableDelete = dataSynchroContext.isEnableDelete();
967 boolean currentBatchEnableDelete = savedEnableDelete;
968
969
970
971 ProgressionModel progressionModel = result.getProgressionModel();
972 progressionModel.setCurrent(0);
973 progressionModel.setTotal(result.getTotalRows() + pkIncludesListForBatch.size());
974
975
976 result.clear();
977
978 for (Multimap<String, String> currentBatchPkIncludes : pkIncludesListForBatch) {
979
980
981 SynchroResult currentBatchResult = new SynchroResult(result.getLocalUrl(), result.getRemoteUrl());
982 for (String tableName : currentBatchPkIncludes.keySet()) {
983 currentBatchResult.setUpdateDate(tableName, null);
984 currentBatchResult.addRows(tableName, currentBatchPkIncludes.get(tableName).size());
985 }
986 dataSynchroContext.setResult(currentBatchResult);
987 dataSynchroContext.setPkIncludes(currentBatchPkIncludes);
988 dataSynchroContext.setEnableDelete(currentBatchEnableDelete);
989
990 int batchProgressionModelOffset = progressionModel.getCurrent();
991 addProgressionListeners(progressionModel,
992 currentBatchResult.getProgressionModel(),
993 batchProgressionModelOffset);
994
995
996 super.synchronize(dataSynchroContext);
997
998
999 result.addAll(currentBatchResult);
1000
1001
1002
1003
1004
1005 if (!currentBatchResult.isSuccess()) {
1006 result.setError(currentBatchResult.getError());
1007 break;
1008 }
1009
1010
1011
1012 currentBatchEnableDelete = false;
1013
1014
1015 try {
1016 commitAndFreeMemory(synchroContext.getTarget());
1017 } catch (SQLException e) {
1018 result.setError(e);
1019 break;
1020 }
1021 }
1022
1023
1024 progressionModel.setCurrent(progressionModel.getTotal());
1025
1026
1027 dataSynchroContext.setResult(result);
1028 dataSynchroContext.setPkIncludes(null);
1029 dataSynchroContext.setEnableDelete(savedEnableDelete);
1030 }
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041 protected void addProgressionListeners(
1042 final ProgressionModel mainProgressionModel,
1043 final ProgressionModel batchProgressionModel,
1044 final int batchProgressionModelOffset) {
1045
1046 batchProgressionModel.addPropertyChangeListener(ProgressionModel.PROPERTY_CURRENT,
1047 new PropertyChangeListener() {
1048 @Override
1049 public void propertyChange(PropertyChangeEvent evt) {
1050 Integer current = (Integer) evt.getNewValue();
1051 mainProgressionModel.setCurrent(batchProgressionModelOffset + current);
1052 }
1053 });
1054
1055
1056 batchProgressionModel.addPropertyChangeListener(ProgressionModel.PROPERTY_MESSAGE,
1057 new PropertyChangeListener() {
1058 @Override
1059 public void propertyChange(PropertyChangeEvent evt) {
1060 String message = (String) evt.getNewValue();
1061 mainProgressionModel.setMessage(message);
1062 }
1063 });
1064 }
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076 protected void commitAndFreeMemory(SynchroDatabaseConfiguration dbConfig) throws SQLException {
1077
1078 Connection connection = null;
1079 try {
1080 connection = createConnection(dbConfig);
1081 connection.commit();
1082 System.gc();
1083 } finally {
1084 closeSilently(connection);
1085 }
1086 }
1087
1088 @Override
1089 protected Connection createConnection(String jdbcUrl, String user, String password) throws SQLException {
1090 Connection connection = super.createConnection(jdbcUrl, user, password);
1091
1092
1093 fr.ifremer.quadrige2.core.dao.technical.Daos.setTimezone(connection, Quadrige2Configuration.getInstance().getDbTimezone());
1094
1095 return connection;
1096 }
1097 }