1 package fr.ifremer.quadrige3.synchro.service.data;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.collect.*;
27 import fr.ifremer.common.synchro.SynchroTechnicalException;
28 import fr.ifremer.common.synchro.config.SynchroConfiguration;
29 import fr.ifremer.common.synchro.dao.DaoFactory;
30 import fr.ifremer.common.synchro.dao.SynchroTableDao;
31 import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
32 import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
33 import fr.ifremer.common.synchro.service.*;
34 import fr.ifremer.common.synchro.type.ProgressionModel;
35 import fr.ifremer.quadrige3.core.config.QuadrigeConfiguration;
36 import fr.ifremer.quadrige3.core.dao.ObjectTypes;
37 import fr.ifremer.quadrige3.core.dao.system.synchronization.SynchronizationStatus;
38 import fr.ifremer.quadrige3.core.dao.technical.Assert;
39 import fr.ifremer.quadrige3.core.dao.technical.Daos;
40 import fr.ifremer.quadrige3.core.dao.technical.Dates;
41 import fr.ifremer.quadrige3.core.dao.technical.hibernate.ConfigurationHelper;
42 import fr.ifremer.quadrige3.core.exception.QuadrigeTechnicalException;
43 import fr.ifremer.quadrige3.synchro.meta.DatabaseColumns;
44 import fr.ifremer.quadrige3.synchro.meta.data.DataSynchroTables;
45 import fr.ifremer.quadrige3.synchro.service.AbstractSynchroDatabaseConfiguration;
46 import fr.ifremer.quadrige3.synchro.service.SynchroDirection;
47 import oracle.jdbc.OracleConnection;
48 import org.apache.commons.collections4.CollectionUtils;
49 import org.apache.commons.collections4.MapUtils;
50 import org.apache.commons.io.IOUtils;
51 import org.apache.commons.lang3.StringUtils;
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.hibernate.dialect.Dialect;
55 import org.postgis.PGgeometry;
56 import org.postgresql.PGConnection;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.context.annotation.Lazy;
59 import org.springframework.stereotype.Service;
60
61 import javax.sql.DataSource;
62 import java.io.File;
63 import java.sql.Connection;
64 import java.sql.ResultSet;
65 import java.sql.SQLException;
66 import java.sql.Timestamp;
67 import java.util.*;
68
69 import static org.nuiton.i18n.I18n.t;
70
71
72
73
74
75
76
77 @Service("dataSynchroService")
78 @Lazy
79 public class DataSynchroServiceImpl
80 extends SynchroServiceImpl<DataSynchroDatabaseConfiguration, DataSynchroContext>
81 implements DataSynchroService {
82
83 private static final Log log = LogFactory.getLog(DataSynchroServiceImpl.class);
84
85 private static final boolean DISABLE_INTEGRITY_CONSTRAINTS = false;
86 private static final boolean ALLOW_MISSING_OPTIONAL_COLUMN = true;
87 private static final boolean ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA = false;
88 private static final boolean KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS = true;
89
90 private final int exportUpdateDateDelayInSecond;
91
92
93
94
95
96
97
98
99
100
101
102 @Autowired
103 public DataSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
104 super(dataSource, config,
105 DISABLE_INTEGRITY_CONSTRAINTS,
106 ALLOW_MISSING_OPTIONAL_COLUMN,
107 ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
108 KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
109 exportUpdateDateDelayInSecond = QuadrigeConfiguration.getInstance().getExportDataUpdateDateDelayInSecond();
110 }
111
112
113
114
115
116
117 public DataSynchroServiceImpl() {
118 super(DISABLE_INTEGRITY_CONSTRAINTS,
119 ALLOW_MISSING_OPTIONAL_COLUMN,
120 ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
121 KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
122 exportUpdateDateDelayInSecond = QuadrigeConfiguration.getInstance().getExportDataUpdateDateDelayInSecond();
123 }
124
125
126 @Override
127 public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId) {
128 Assert.notNull(sourceDbDirectory);
129 Assert.isTrue(sourceDbDirectory.isDirectory());
130 return createSynchroContext(sourceDbDirectory, direction, userId, null, true, true);
131 }
132
133
134 @Override
135 public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId,
136 Timestamp lastSynchronizationDate) {
137 return createSynchroContext(sourceDbDirectory, direction, userId, lastSynchronizationDate, true, true);
138 }
139
140
141 @Override
142 public DataSynchroContext createSynchroContext(File sourceDbDirectory, SynchroDirection direction, int userId,
143 Timestamp lastSynchronizationDate, boolean enableDelete, boolean enableInsertUpdate) {
144
145 DataSynchroContext context = super.createSynchroContext(sourceDbDirectory, DataSynchroTables.getImportTablesIncludes());
146
147 context.getTarget().putAllProperties(QuadrigeConfiguration.getInstance().getConnectionProperties());
148 context.setDirection(direction);
149 context.setUserId(userId);
150 context.setLastSynchronizationDate(lastSynchronizationDate);
151 context.setEnableDelete(enableDelete);
152 context.setEnableInsertOrUpdate(enableInsertUpdate);
153 initContext(context);
154 return context;
155 }
156
157
158 @Override
159 public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId) {
160 return createSynchroContext(sourceConnectionProperties, direction, userId, null, true, true);
161 }
162
163
164 @Override
165 public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId,
166 Timestamp lastSynchronizationDate) {
167 return createSynchroContext(sourceConnectionProperties, direction, userId, lastSynchronizationDate, true, true);
168 }
169
170
171 @Override
172 public DataSynchroContext createSynchroContext(Properties sourceConnectionProperties, SynchroDirection direction, int userId,
173 Timestamp lastSynchronizationDate, boolean enableDelete, boolean enableInsertUpdate) {
174 Assert.notNull(sourceConnectionProperties);
175
176 DataSynchroContext context = super.createSynchroContext(sourceConnectionProperties, DataSynchroTables.getImportTablesIncludes());
177
178 context.getTarget().putAllProperties(QuadrigeConfiguration.getInstance().getConnectionProperties());
179 context.setDirection(direction);
180 context.setUserId(userId);
181 context.setLastSynchronizationDate(lastSynchronizationDate);
182 context.setEnableDelete(enableDelete);
183 context.setEnableInsertOrUpdate(enableInsertUpdate);
184 initContext(context);
185 return context;
186 }
187
188
189 @Override
190 public void prepare(DataSynchroContext synchroContext) {
191 Assert.isInstanceOf(DataSynchroContext.class, synchroContext,
192 String.format("The context must be a instance of %s", DataSynchroContext.class.getName()));
193
194 SynchroResult result = synchroContext.getResult();
195 SynchroDirection direction = synchroContext.getDirection();
196
197 DataSynchroDatabaseConfiguration target = synchroContext.getTarget();
198 DataSynchroDatabaseConfiguration source = synchroContext.getSource();
199
200
201 if (direction == SynchroDirection.IMPORT_SERVER2TEMP) {
202
203
204 source.excludeMeasurementUnusedColumns();
205
206
207 target.setIsMirrorDatabase(true);
208
209
210 target.excludeUnusedColumns();
211
212
213 {
214 disableIntegrityConstraints(target.getConnectionProperties(), result);
215 if (!result.isSuccess()) {
216 return;
217 }
218
219
220 source.setCheckUniqueConstraintBetweenInputRows(false);
221 target.setCheckUniqueConstraintBetweenInputRows(false);
222 }
223
224
225 if (synchroContext.getDataEndDate() == null) {
226 fillDefaultDataPeriod(synchroContext);
227 }
228
229
230 if (synchroContext.getPkIncludes() == null) {
231 fillDefaultPkIncludes(synchroContext);
232 }
233
234
235 if (source.getDbPhotoDirectory() == null) {
236 source.setDbPhotoDirectory(QuadrigeConfiguration.getInstance().getDbPhotoDirectory());
237 }
238 }
239
240
241 else if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
242
243
244 target.setIsMirrorDatabase(false);
245
246
247 source.setCheckUniqueConstraintBetweenInputRows(false);
248 target.setCheckUniqueConstraintBetweenInputRows(false);
249
250
251 if (target.getDbPhotoDirectory() == null) {
252 target.setDbPhotoDirectory(QuadrigeConfiguration.getInstance().getDbPhotoDirectory());
253 }
254 }
255
256
257 if (direction == SynchroDirection.IMPORT_NO_TEMP) {
258
259 throw new QuadrigeTechnicalException("Direction IMPORT_NO_TEMP not implemented yet for Data tables");
260 }
261
262
263 else if (direction == SynchroDirection.EXPORT_LOCAL2TEMP) {
264
265
266 target.setIsMirrorDatabase(true);
267
268
269 {
270 disableIntegrityConstraints(target.getConnectionProperties(), result);
271 if (!result.isSuccess()) {
272 return;
273 }
274 }
275
276
277 source.setCheckUniqueConstraintBetweenInputRows(false);
278 target.setCheckUniqueConstraintBetweenInputRows(false);
279
280
281 if (source.getDbPhotoDirectory() == null) {
282 source.setDbPhotoDirectory(QuadrigeConfiguration.getInstance().getDbPhotoDirectory());
283 }
284 }
285
286
287 else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
288
289
290 source.setFullMetadataEnable(true);
291 source.excludeUnusedColumns();
292
293
294 target.setIsMirrorDatabase(false);
295 target.setFullMetadataEnable(false);
296
297
298 fillSystemTimestampWithDelay(result, target);
299 source.setSystemTimestamp(target.getSystemTimestamp());
300
301
302 if (target.getDbPhotoDirectory() == null) {
303 target.setDbPhotoDirectory(QuadrigeConfiguration.getInstance().getDbPhotoDirectory());
304 }
305 }
306
307
308 else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
309
310
311 target.setIsMirrorDatabase(true);
312 }
313
314
315 else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
316
317
318 target.setIsMirrorDatabase(false);
319 }
320
321 super.prepare(synchroContext);
322 }
323
324
325 @Override
326 public void synchronize(DataSynchroContext synchroContext) {
327 Assert.isInstanceOf(DataSynchroContext.class, synchroContext,
328 String.format("The context must be a instance of %s", DataSynchroContext.class.getName()));
329
330 SynchroResult result = synchroContext.getResult();
331 List<Multimap<String, String>> pkIncludesListForBatch = null;
332
333 DataSynchroDatabaseConfiguration target = synchroContext.getTarget();
334 DataSynchroDatabaseConfiguration source = synchroContext.getSource();
335
336
337 if (synchroContext.getDirection() == SynchroDirection.IMPORT_SERVER2TEMP) {
338
339
340 target.removeColumnExclude(target.getColumnSynchronizationStatus());
341 target.removeColumnExclude(target.getColumnRemoteId());
342
343
344 {
345 disableIntegrityConstraints(synchroContext.getTarget().getConnectionProperties(), result);
346 if (!result.isSuccess()) {
347 return;
348 }
349 }
350
351
352 if (synchroContext.isEnableInsertOrUpdate()
353 && (synchroContext.getPkIncludes() == null || synchroContext.getPkIncludes().isEmpty())) {
354 pkIncludesListForBatch = computePkIncludesListForBatch(result, synchroContext, true);
355 }
356
357 }
358
359
360 else if (synchroContext.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER) {
361
362
363 source.removeColumnExclude(source.getColumnSynchronizationStatus());
364 source.removeColumnExclude(source.getColumnRemoteId());
365 }
366
367
368 else if (synchroContext.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL) {
369
370
371
372
373
374 if (synchroContext.isEnableInsertOrUpdate()
375 && (synchroContext.getPkIncludes() == null || synchroContext.getPkIncludes().isEmpty())) {
376 pkIncludesListForBatch = computePkIncludesListForBatch(result, synchroContext, false);
377 }
378 }
379
380 boolean synchronizeUsingBatch = CollectionUtils.isNotEmpty(pkIncludesListForBatch);
381
382
383 if (!synchronizeUsingBatch
384 || !synchroContext.isEnableInsertOrUpdate()) {
385 super.synchronize(synchroContext);
386 }
387
388
389 else {
390 synchronizeUsingBatch(synchroContext, pkIncludesListForBatch);
391 }
392 }
393
394
395
396
397 @Override
398 protected void prepareRootTable(
399 DaoFactory sourceDaoFactory,
400 DaoFactory targetDaoFactory,
401 SynchroTableMetadata table,
402 DataSynchroContext context,
403 SynchroResult result) throws SQLException {
404
405
406 if (context.isEnableInsertOrUpdate()) {
407 super.prepareRootTable(sourceDaoFactory,
408 targetDaoFactory,
409 table,
410 context,
411 result);
412
413
414 if (table.getName().toUpperCase().equals(DataSynchroTables.SURVEY.name())
415 && context.isEnablePhoto()
416 && result.getNbRows(DataSynchroTables.SURVEY.name()) > 0
417 && result.getNbRows(DataSynchroTables.PHOTO.name()) == 0) {
418 preparePhotoTable(sourceDaoFactory, context, result);
419 }
420 }
421
422
423 if (context.isEnableDelete()) {
424 prepareDeletedRootTable(sourceDaoFactory,
425 targetDaoFactory,
426 table,
427 context,
428 result);
429 }
430 }
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448 protected void prepareDeletedRootTable(
449 DaoFactory sourceDaoFactory,
450 DaoFactory targetDaoFactory,
451 SynchroTableMetadata table,
452 DataSynchroContext context,
453 SynchroResult result) throws SQLException {
454
455 String tableName = table.getName();
456 Set<String> objectTypeFks = ObjectTypes.getObjectTypeFromTableName(tableName);
457
458 if (CollectionUtils.isEmpty(objectTypeFks)) {
459 return;
460 }
461
462 SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.DELETED_ITEM_HISTORY.name());
463
464 List<List<Object>> columnValues = Lists.newArrayListWithCapacity(objectTypeFks.size());
465 for (String objectTypeFk : objectTypeFks) {
466 columnValues.add(ImmutableList.of(objectTypeFk));
467 }
468
469
470 Map<String, Object> bindings = createSelectBindingsForTable(context, DataSynchroTables.DELETED_ITEM_HISTORY.name());
471 long count = dihSourceDao.countDataByFks(
472 ImmutableSet.of(DatabaseColumns.OBJECT_TYPE_CD.name()),
473 columnValues,
474 bindings
475 );
476 if (count > 0) {
477 result.addRows(tableName, (int) count);
478 }
479 }
480
481 protected void preparePhotoTable(
482 DaoFactory sourceDaoFactory,
483 DataSynchroContext context,
484 SynchroResult result) throws SQLException {
485
486 SynchroTableDao surveySourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.SURVEY.name());
487 SynchroTableDao photoSourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.PHOTO.name());
488
489
490 Map<String, Object> bindings = createSelectBindingsForTable(context, DataSynchroTables.SURVEY.name());
491 List<List<Object>> surveyPks = new ArrayList<>();
492 try (ResultSet resultSet = surveySourceDao.getData(bindings)) {
493 while (resultSet.next()) surveyPks.add(surveySourceDao.getPk(resultSet));
494 }
495
496
497 long photoCount = photoSourceDao.countDataByFks(
498 ImmutableSet.of(DatabaseColumns.SURVEY_ID.name()),
499 surveyPks,
500 new HashMap<>()
501 );
502
503 result.addRows(DataSynchroTables.PHOTO.name(), (int) photoCount);
504
505 }
506
507
508 @Override
509 protected Map<String, Object> createDefaultSelectBindings(DataSynchroContext context) {
510
511 Map<String, Object> bindings = super.createDefaultSelectBindings(context);
512
513
514 bindings.put("userId", context.getUserId());
515
516
517 if (context.getDataStartDate() != null) {
518 bindings.put("startDate", new Timestamp(context.getDataStartDate().getTime()));
519 }
520
521 if (context.getDataEndDate() != null) {
522 bindings.put("endDate", new Timestamp(context.getDataEndDate().getTime()));
523 }
524
525 return bindings;
526 }
527
528
529
530
531
532
533
534 @Override
535 protected Map<String, Object> createSelectBindingsForTable(DataSynchroContext context, String tableName) {
536
537 Map<String, Object> bindings = Maps.newHashMap(getSelectBindings(context));
538
539 Timestamp lastSynchronizationDate = context.getLastSynchronizationDate();
540 boolean notEmptyTargetTable = (context.getResult().getUpdateDate(tableName) != null);
541 boolean enableUpdateDateFilter = (lastSynchronizationDate != null)
542 && (context.getTarget().isMirrorDatabase() || notEmptyTargetTable);
543
544 if (enableUpdateDateFilter) {
545 bindings.put(SynchroTableMetadata.UPDATE_DATE_BINDPARAM, lastSynchronizationDate);
546 }
547
548 return bindings;
549 }
550
551
552
553
554
555
556
557
558
559
560
561 protected void fillSystemTimestampWithDelay(SynchroResult result, SynchroDatabaseConfiguration databaseConfiguration) {
562
563 result.getProgressionModel().setMessage(t("quadrige3.synchro.synchronizeData.initSystemTimestamp"));
564 if (log.isInfoEnabled()) {
565 log.info(t("quadrige3.synchro.synchronizeData.initSystemTimestamp"));
566 }
567
568 Connection connection = null;
569 try {
570 connection = createConnection(databaseConfiguration);
571 Dialect dialect = databaseConfiguration.getDialect();
572 Timestamp currentTimestamp = Daos.getDatabaseCurrentTimestamp(connection, dialect);
573
574
575 Dates.addSeconds(currentTimestamp, exportUpdateDateDelayInSecond);
576
577 if (log.isDebugEnabled()) {
578 log.debug(String.format("Current timestamp: %s", currentTimestamp));
579 }
580
581 databaseConfiguration.setSystemTimestamp(currentTimestamp);
582 } catch (QuadrigeTechnicalException | SQLException e) {
583 result.setError(e);
584 } finally {
585 closeSilently(connection);
586 }
587 }
588
589
590
591
592
593
594
595
596
597 protected void fillDefaultDataPeriod(DataSynchroContext dataSynchroContext) {
598 int nbYearDataHistory = QuadrigeConfiguration.getInstance().getImportNbYearDataHistory();
599
600
601 Date dataStartDate = Dates.addYears(new Date(), -1 * nbYearDataHistory);
602 dataStartDate = Dates.truncate(dataStartDate, Calendar.YEAR);
603 dataSynchroContext.setDataStartDate(dataStartDate);
604
605
606 Date dataEndDate = Dates.lastSecondOfTheDay(new Date());
607 dataSynchroContext.setDataEndDate(dataEndDate);
608 }
609
610
611
612
613
614
615
616
617
618 protected void fillDefaultPkIncludes(DataSynchroContext dataSynchroContext) {
619 String confProperty = QuadrigeConfiguration.getInstance().getImportDataPkIncludes();
620 Multimap<String, String> pkIncludes = ConfigurationHelper.getMultimap(confProperty);
621 dataSynchroContext.setPkIncludes(pkIncludes);
622 }
623
624
625 @Override
626 protected List<SynchroTableOperation> getRootOperations(
627 DaoFactory sourceDaoFactory,
628 DaoFactory targetDaoFactory,
629 SynchroDatabaseMetadata dbMeta,
630 DataSynchroContext context) throws SQLException {
631 List<SynchroTableOperation> result = Lists.newArrayList();
632
633
634
635 if (context.isEnableDelete()
636 && (context.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER
637 || context.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL)) {
638
639 Collection<SynchroTableOperation> deletedItemOperations = getDeleteOperations(sourceDaoFactory, targetDaoFactory, dbMeta, context);
640 result.addAll(deletedItemOperations);
641 }
642
643
644 if (context.isEnableInsertOrUpdate()) {
645 Collection<SynchroTableOperation> defaultOperations = super.getRootOperations(sourceDaoFactory, targetDaoFactory, dbMeta, context);
646 result.addAll(defaultOperations);
647 }
648
649 return result;
650 }
651
652 private Collection<SynchroTableOperation> getDeleteOperations(
653 DaoFactory sourceDaoFactory,
654 DaoFactory targetDaoFactory,
655 SynchroDatabaseMetadata dbMeta,
656 DataSynchroContext context) throws SQLException {
657 Assert.isTrue(dbMeta.getConfiguration().isFullMetadataEnable());
658
659 Deque<SynchroTableOperation> result = Queues.newArrayDeque();
660
661 boolean shouldDeleteUsingRemoteId = (context.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL);
662 SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(DataSynchroTables.DELETED_ITEM_HISTORY.name());
663 DataSynchroDatabaseConfiguration target = context.getTarget();
664 Set<String> includedDataTables = DataSynchroTables.getImportTablesIncludes();
665 if (CollectionUtils.isEmpty(includedDataTables)) {
666 return result;
667 }
668
669
670 Map<String, SynchroTableOperation> operationByTableName = new HashMap<>();
671
672
673 Map<String, Object> bindings = createSelectBindingsForTable(context, DataSynchroTables.DELETED_ITEM_HISTORY.name());
674 ResultSet dihResultSet = dihSourceDao.getData(bindings);
675
676 List<Object> dihIdsToRemove = Lists.newArrayList();
677
678 try {
679 dihResultSet = dihSourceDao.getData(bindings);
680
681 while (dihResultSet.next()) {
682
683 long dihId = dihResultSet.getLong(DatabaseColumns.DEL_ITEM_HIST_ID.name());
684 String objectType = dihResultSet.getString(DatabaseColumns.OBJECT_TYPE_CD.name());
685 String tableName = ObjectTypes.getTableNameFromObjectType(objectType);
686
687 boolean isDataTable = StringUtils.isNotBlank(tableName)
688 && includedDataTables.contains(tableName.toUpperCase());
689
690 if (isDataTable) {
691 SynchroTableDao targetDao = targetDaoFactory.getSourceDao(tableName);
692 SynchroTableMetadata table = targetDao.getTable();
693
694 String objectCode = dihResultSet.getString(DatabaseColumns.OBJECT_CD.name());
695 String objectId = dihResultSet.getString(DatabaseColumns.OBJECT_ID.name());
696
697
698 if (StringUtils.isNotBlank(objectCode) || StringUtils.isNotBlank(objectId)) {
699 List<Object> pk = StringUtils.isNotBlank(objectCode)
700 ? ImmutableList.of(objectCode)
701 : ImmutableList.of(objectId);
702
703 boolean hasChildTables = targetDao.getTable().hasChildJoins();
704
705
706 SynchroTableOperation operation = operationByTableName.get(tableName);
707 if (operation == null) {
708 operation = new SynchroTableOperation(tableName, context);
709 operationByTableName.put(tableName, operation);
710 operation.setEnableProgress(true);
711 result.add(operation);
712 }
713
714
715 if (shouldDeleteUsingRemoteId) {
716 if (table.getColumnIndex(target.getColumnRemoteId()) != -1) {
717 operation.addChildrenToDeleteFromOneColumn(tableName, target.getColumnRemoteId(), pk);
718
719
720 dihIdsToRemove.add(dihId);
721 }
722 else if (log.isWarnEnabled()) {
723
724 log.warn(String
725 .format("[%s] Found deleted items on table [%s] (pk: [%s]) but no column [%s] exists on this table. Skipping this deleted items.",
726 DataSynchroTables.DELETED_ITEM_HISTORY.name(),
727 tableName,
728 SynchroTableMetadata.toPkStr(pk),
729 target.getColumnRemoteId()
730 ));
731 }
732
733
734
735 }
736
737
738 else {
739 operation.addMissingDelete(pk);
740
741
742 if (hasChildTables) {
743 addDeleteChildrenToDeque(table, ImmutableList.of(pk), result, context);
744 }
745 }
746 }
747 }
748 }
749 } finally {
750 Daos.closeSilently(dihResultSet);
751 }
752
753 if (CollectionUtils.isNotEmpty(dihIdsToRemove)) {
754 SynchroTableOperation operation = new SynchroTableOperation(DataSynchroTables.DELETED_ITEM_HISTORY.name(), context);
755 operation.addChildrenToDeleteFromOneColumn(DataSynchroTables.DELETED_ITEM_HISTORY.name(), target.getColumnRemoteId(),
756 dihIdsToRemove);
757 result.add(operation);
758 }
759
760 return result;
761 }
762
763
764 @Override
765 protected void resolveTableRow(SynchroTableMetadata table,
766 RejectedRow reject,
767 RejectedRow.ResolveStrategy rejectStrategy,
768 DaoFactory daoFactory,
769 SynchroTableOperation operation,
770 DataSynchroContext context,
771 SynchroResult result) {
772
773
774 super.resolveTableRow(
775 table,
776 reject,
777 rejectStrategy,
778 daoFactory,
779 operation,
780 context,
781 result
782 );
783
784 DataSynchroDatabaseConfiguration target = context.getTarget();
785
786 boolean hasSynchronizationStatus = table.getColumnNames().contains(target.getColumnSynchronizationStatus());
787
788
789
790
791 if (hasSynchronizationStatus
792 && context.getDirection().isExport()
793 && reject.cause == RejectedRow.Cause.BAD_UPDATE_DATE
794 && rejectStrategy == RejectedRow.ResolveStrategy.KEEP_LOCAL) {
795
796 operation.addMissingColumnUpdate(
797 target.getColumnSynchronizationStatus(),
798 reject.targetPkStr,
799 SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue());
800 }
801 }
802
803
804 @Override
805 protected void detachRows(
806 SynchroTableDao targetDao,
807 List<List<Object>> pks,
808 DataSynchroContext context,
809 SynchroResult result,
810 Deque<SynchroTableOperation> pendingOperations)
811 throws SQLException {
812
813
814 super.detachRows(targetDao, pks, context, result, pendingOperations);
815
816
817
818 DataSynchroDatabaseConfiguration target = context.getTarget();
819 SynchroTableMetadata table = targetDao.getTable();
820 boolean hasSynchronizationStatus = table.getColumnNames().contains(target.getColumnSynchronizationStatus());
821 boolean hasRemoteId = table.getColumnNames().contains(target.getColumnRemoteId());
822 if (!hasSynchronizationStatus && !hasRemoteId) {
823 return;
824 }
825
826 String tableName = table.getName();
827 String tablePrefix = table.getTableLogPrefix() + " - " + result.getNbRows(tableName);
828 String readyToSynchronize = SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue();
829 int countR = 0;
830
831
832 if (hasRemoteId) {
833 for (List<Object> pk : pks) {
834 targetDao.executeUpdateColumn(target.getColumnRemoteId(), pk, null);
835 countR++;
836 reportProgress(result, targetDao, countR, tablePrefix);
837 }
838 }
839
840
841
842 if (hasSynchronizationStatus) {
843 for (List<Object> pk : pks) {
844 targetDao.executeUpdateColumn(target.getColumnSynchronizationStatus(), pk, readyToSynchronize);
845 countR++;
846 reportProgress(result, targetDao, countR, tablePrefix);
847 }
848 }
849
850 if (countR == 0) {
851 return;
852 }
853
854 targetDao.flush();
855
856 result.addTableName(tableName);
857 result.addUpdates(tableName, countR);
858
859 if (log.isInfoEnabled()) {
860 log.info(String.format("%s done: %s (detachment: %s)", tablePrefix, countR, countR));
861 }
862
863 if (targetDao.getCurrentOperation().isEnableProgress()) {
864 result.getProgressionModel().increments(countR % batchSize);
865 }
866
867 }
868
869
870
871
872
873
874
875
876
877
878
879 protected void initContext(DataSynchroContext context) {
880
881
882 context.getSource().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
883 context.getTarget().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
884
885 DataSynchroDatabaseConfiguration source = context.getSource();
886 DataSynchroDatabaseConfiguration target = context.getTarget();
887
888
889 source.setColumnRemoteId(DatabaseColumns.REMOTE_ID.name().toLowerCase());
890 target.setColumnRemoteId(DatabaseColumns.REMOTE_ID.name().toLowerCase());
891
892
893 source.setColumnSynchronizationStatus(DatabaseColumns.SYNCHRONIZATION_STATUS.name().toLowerCase());
894 target.setColumnSynchronizationStatus(DatabaseColumns.SYNCHRONIZATION_STATUS.name().toLowerCase());
895
896
897 Multimap<String, String> allowedMissingMandatoryColumns = ArrayListMultimap.create();
898 List<String> pmfmColumns = ImmutableList.of(
899 DatabaseColumns.PAR_CD.name().toUpperCase(),
900 DatabaseColumns.MATRIX_ID.name().toUpperCase(),
901 DatabaseColumns.FRACTION_ID.name().toUpperCase(),
902 DatabaseColumns.METHOD_ID.name().toUpperCase(),
903 DatabaseColumns.UNIT_ID.name().toUpperCase()
904 );
905 allowedMissingMandatoryColumns.putAll(DataSynchroTables.MEASUREMENT.name().toUpperCase(), pmfmColumns);
906 allowedMissingMandatoryColumns.putAll(DataSynchroTables.TAXON_MEASUREMENT.name().toUpperCase(), pmfmColumns);
907 allowedMissingMandatoryColumns.putAll(DataSynchroTables.MEASUREMENT_FILE.name().toUpperCase(), pmfmColumns);
908 setAllowedMissingMandatoryColumns(allowedMissingMandatoryColumns);
909
910
911 if (Daos.isPostgresqlDatabase(context.getSource().getJdbcUrl())) {
912 context.getSource().setTempQueryParameterGenerated(config.isTempQueryParameterGenerated());
913 }
914 if (Daos.isPostgresqlDatabase(context.getTarget().getJdbcUrl())) {
915 context.getTarget().setTempQueryParameterGenerated(config.isTempQueryParameterGenerated());
916 }
917
918 }
919
920
921
922
923
924
925
926
927
928
929
930 protected List<Multimap<String, String>> computePkIncludesListForBatch(SynchroResult resultAfterPrepare,
931 DataSynchroContext context, boolean useTargetMetadata) {
932 int maxRootRowCount = QuadrigeConfiguration.getInstance().getImportDataMaxRootRowCount();
933 if (MapUtils.isEmpty(resultAfterPrepare.getUpdateDateHits())
934 || maxRootRowCount <= 0
935 || resultAfterPrepare.getTotalRows() <= maxRootRowCount) {
936 return null;
937 }
938
939 Assert.notNull(context);
940 if (log.isDebugEnabled()) {
941 log.debug(String.format("Compute PKs to import, by batch of %s rows", maxRootRowCount));
942 }
943
944 SynchroDatabaseConfiguration source = context.getSource();
945 Assert.notNull(source);
946
947 Connection sourceConnection = null;
948 DaoFactory sourceDaoFactory = null;
949 SynchroDatabaseMetadata dbMeta = null;
950 ResultSet dataToUpdate = null;
951
952 try {
953
954 Set<String> tableNames = resultAfterPrepare.getUpdateDateHits().keySet();
955 Assert.notEmpty(tableNames);
956
957
958 List<String> tableNamesToIgnore = ImmutableList.of(DataSynchroTables.DELETED_ITEM_HISTORY.name().toUpperCase());
959 List<String> ignoredTableNames = new ArrayList<>();
960 for (String tableName: tableNames) {
961 if (tableNamesToIgnore.contains(tableName.toUpperCase())) {
962 ignoredTableNames.add(tableName);
963 }
964 }
965 tableNames.removeAll(ignoredTableNames);
966
967 List<Multimap<String, String>> result = Lists.newArrayList();
968
969
970 sourceConnection = createConnection(source);
971
972 log.debug("Loading source database metadata...");
973 if (useTargetMetadata) {
974 SynchroDatabaseConfiguration target = context.getTarget();
975 Assert.notNull(target);
976 Connection targetConnection = createConnection(target);
977 boolean isFullMetadataEnable = target.isFullMetadataEnable();
978 target.setFullMetadataEnable(true);
979 dbMeta = loadDatabaseMetadata(
980 targetConnection,
981 target,
982 tableNames);
983 target.setFullMetadataEnable(isFullMetadataEnable);
984 } else {
985 boolean isFullMetadataEnable = source.isFullMetadataEnable();
986 source.setFullMetadataEnable(true);
987 dbMeta = loadDatabaseMetadata(
988 sourceConnection,
989 source,
990 tableNames);
991 source.setFullMetadataEnable(isFullMetadataEnable);
992 }
993
994
995 sourceDaoFactory = newDaoFactory(sourceConnection, source, dbMeta);
996
997
998 Multimap<String, String> currentBatch = ArrayListMultimap.create();
999 int currentBatchSize = 0;
1000 for (String tableName : tableNames) {
1001
1002 SynchroTableDao sourceDao = sourceDaoFactory.getSourceDao(tableName);
1003
1004 Map<String, Object> bindings = createSelectBindingsForTable(context, tableName);
1005 dataToUpdate = sourceDao.getData(bindings);
1006
1007 while (dataToUpdate.next()) {
1008
1009 String pkStr = SynchroTableMetadata.toPkStr(sourceDao.getPk(dataToUpdate));
1010
1011
1012 if (currentBatchSize == maxRootRowCount) {
1013 result.add(currentBatch);
1014 currentBatch = ArrayListMultimap.create();
1015 currentBatchSize = 0;
1016 }
1017
1018 currentBatch.put(tableName, pkStr);
1019 currentBatchSize++;
1020 }
1021
1022
1023 Daos.closeSilently(dataToUpdate);
1024 dataToUpdate = null;
1025 }
1026
1027
1028 if (currentBatchSize > 0) {
1029 result.add(currentBatch);
1030 }
1031
1032
1033 for (String tableName : ignoredTableNames) {
1034
1035 currentBatch = ArrayListMultimap.create();
1036
1037 SynchroTableDao sourceDao = sourceDaoFactory.getSourceDao(tableName);
1038 Map<String, Object> bindings = createSelectBindingsForTable(context, tableName);
1039 dataToUpdate = sourceDao.getData(bindings);
1040
1041 while (dataToUpdate.next()) {
1042 String pkStr = SynchroTableMetadata.toPkStr(sourceDao.getPk(dataToUpdate));
1043 currentBatch.put(tableName, pkStr);
1044 }
1045
1046 result.add(currentBatch);
1047
1048
1049 Daos.closeSilently(dataToUpdate);
1050 dataToUpdate = null;
1051 }
1052
1053
1054 return result;
1055
1056 } catch (Exception e) {
1057 if (log.isDebugEnabled()) {
1058 log.debug(e);
1059 }
1060 throw new SynchroTechnicalException(e);
1061 } finally {
1062 Daos.closeSilently(dataToUpdate);
1063 IOUtils.closeQuietly(sourceDaoFactory);
1064 closeSilently(dbMeta);
1065 closeSilently(sourceConnection);
1066 }
1067 }
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079 protected void synchronizeUsingBatch(DataSynchroContext synchroContext,
1080 List<Multimap<String, String>> pkIncludesListForBatch) {
1081
1082 SynchroResult result = synchroContext.getResult();
1083
1084 boolean savedEnableDelete = synchroContext.isEnableDelete();
1085 boolean currentBatchEnableDelete = savedEnableDelete;
1086
1087
1088 ProgressionModel mainProgressionModel = result.getProgressionModel();
1089 int mainTotal = 0;
1090 for (Multimap<String, String> currentBatchPkIncludes : pkIncludesListForBatch) {
1091 mainTotal += currentBatchPkIncludes.size();
1092 }
1093 mainProgressionModel.setTotal(mainTotal);
1094
1095
1096 result.clear();
1097
1098 for (Multimap<String, String> currentBatchPkIncludes : pkIncludesListForBatch) {
1099
1100
1101 SynchroResult currentBatchResult = new SynchroResult(result.getLocalUrl(), result.getRemoteUrl());
1102 for (String tableName : currentBatchPkIncludes.keySet()) {
1103 currentBatchResult.setUpdateDate(tableName, null);
1104 currentBatchResult.addRows(tableName, currentBatchPkIncludes.get(tableName).size());
1105 }
1106 synchroContext.setResult(currentBatchResult);
1107 synchroContext.setPkIncludes(currentBatchPkIncludes);
1108 synchroContext.setEnableDelete(currentBatchEnableDelete);
1109
1110 int batchProgressionModelOffset = mainProgressionModel.getCurrent();
1111 addProgressionListeners(mainProgressionModel,
1112 currentBatchResult.getProgressionModel(),
1113 batchProgressionModelOffset);
1114
1115
1116 super.synchronize(synchroContext);
1117
1118
1119 result.addAll(currentBatchResult);
1120
1121
1122
1123
1124
1125 if (!currentBatchResult.isSuccess()) {
1126 result.setError(currentBatchResult.getError());
1127 break;
1128 }
1129
1130
1131
1132 currentBatchEnableDelete = false;
1133
1134
1135 try {
1136 commitAndFreeMemory(synchroContext.getTarget());
1137 } catch (SQLException e) {
1138 result.setError(e);
1139 break;
1140 }
1141 }
1142
1143
1144 mainProgressionModel.setCurrent(mainProgressionModel.getTotal());
1145
1146
1147 synchroContext.setResult(result);
1148 synchroContext.setPkIncludes(null);
1149 synchroContext.setEnableDelete(savedEnableDelete);
1150 }
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161 protected void addProgressionListeners(
1162 final ProgressionModel mainProgressionModel,
1163 final ProgressionModel batchProgressionModel,
1164 final int batchProgressionModelOffset) {
1165
1166 batchProgressionModel.addPropertyChangeListener(ProgressionModel.PROPERTY_CURRENT,
1167 evt -> {
1168 Integer current = (Integer) evt.getNewValue();
1169 mainProgressionModel.setCurrent(batchProgressionModelOffset + current);
1170 });
1171
1172
1173 batchProgressionModel.addPropertyChangeListener(ProgressionModel.PROPERTY_MESSAGE,
1174 evt -> {
1175 String message = (String) evt.getNewValue();
1176 mainProgressionModel.setMessage(message);
1177 });
1178 }
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190 protected void commitAndFreeMemory(SynchroDatabaseConfiguration dbConfig) throws SQLException {
1191
1192 Connection connection = null;
1193 try {
1194 connection = createConnection(dbConfig);
1195 connection.commit();
1196 System.gc();
1197 } finally {
1198 closeSilently(connection);
1199 }
1200 }
1201
1202 @Override
1203 protected Connection createConnection(SynchroDatabaseConfiguration databaseConfiguration) throws SQLException {
1204 Connection connection = super.createConnection(databaseConfiguration);
1205
1206
1207 SynchroDirection synchroDirection = null;
1208
1209 if (databaseConfiguration instanceof AbstractSynchroDatabaseConfiguration) {
1210 synchroDirection = ((AbstractSynchroDatabaseConfiguration) databaseConfiguration).getDirection();
1211 }
1212 if ((synchroDirection == SynchroDirection.IMPORT_SERVER2TEMP || synchroDirection == SynchroDirection.EXPORT_LOCAL2TEMP)
1213 && !databaseConfiguration.isRollbackOnly() && Daos.isHsqlFileDatabase(databaseConfiguration.getJdbcUrl()) && databaseConfiguration.isTarget()) {
1214 connection.setAutoCommit(true);
1215 }
1216
1217
1218 if (Daos.isOracleDatabase(databaseConfiguration.getJdbcUrl()) && databaseConfiguration.isSynonymsEnable()) {
1219 ((OracleConnection) Daos.unwrapConnection(connection)).setIncludeSynonyms(true);
1220 }
1221
1222
1223 if (Daos.isPostgresqlDatabase(databaseConfiguration.getJdbcUrl())) {
1224 ((PGConnection) Daos.unwrapConnection(connection)).addDataType(String.format("\"%s\".\"geometry\"", QuadrigeConfiguration.getInstance().getPostgisSchema()), PGgeometry.class);
1225 }
1226
1227
1228 Daos.setTimezone(connection, QuadrigeConfiguration.getInstance().getDbTimezone());
1229
1230 return connection;
1231 }
1232
1233 @Override
1234 protected DataSynchroDatabaseConfiguration newSynchroDatabaseConfiguration(DataSynchroContext context, Properties sourceConnectionProperties, boolean isTarget) {
1235 return new DataSynchroDatabaseConfiguration(context, sourceConnectionProperties, isTarget);
1236 }
1237
1238 @Override
1239 protected DataSynchroContext newSynchroContext() {
1240 return new DataSynchroContext();
1241 }
1242 }