1 package fr.ifremer.quadrige2.synchro.service.referential;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.base.Joiner;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Predicate;
29 import com.google.common.base.Splitter;
30 import com.google.common.collect.*;
31 import fr.ifremer.common.synchro.SynchroTechnicalException;
32 import fr.ifremer.common.synchro.config.SynchroConfiguration;
33 import fr.ifremer.common.synchro.dao.DaoFactory;
34 import fr.ifremer.common.synchro.dao.Daos;
35 import fr.ifremer.common.synchro.dao.SynchroBaseDao;
36 import fr.ifremer.common.synchro.dao.SynchroTableDao;
37 import fr.ifremer.common.synchro.meta.*;
38 import fr.ifremer.common.synchro.service.*;
39 import fr.ifremer.quadrige2.core.config.Quadrige2Configuration;
40 import fr.ifremer.quadrige2.core.dao.ObjectTypes;
41 import fr.ifremer.quadrige2.core.dao.referential.StatusCode;
42 import fr.ifremer.quadrige2.core.dao.technical.hibernate.TemporaryDataHelper;
43 import fr.ifremer.quadrige2.synchro.meta.DatabaseColumns;
44 import fr.ifremer.quadrige2.synchro.meta.administration.ProgramStrategySynchroTables;
45 import fr.ifremer.quadrige2.synchro.meta.referential.ReferentialSynchroTables;
46 import fr.ifremer.quadrige2.synchro.meta.system.RuleSynchroTables;
47 import fr.ifremer.quadrige2.synchro.service.SynchroDirection;
48 import org.apache.commons.collections4.CollectionUtils;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52 import org.nuiton.i18n.I18n;
53 import org.springframework.beans.factory.annotation.Autowired;
54 import org.springframework.stereotype.Service;
55
56 import javax.annotation.Nullable;
57 import javax.sql.DataSource;
58 import java.io.File;
59 import java.sql.Connection;
60 import java.sql.ResultSet;
61 import java.sql.SQLException;
62 import java.sql.Timestamp;
63 import java.util.*;
64
65
66
67
68
69
70
71 @Service("referentialSynchroService")
72 public class ReferentialSynchroServiceImpl extends SynchroServiceImpl implements ReferentialSynchroService {
73
74 private static final Log LOG =
75 LogFactory.getLog(ReferentialSynchroServiceImpl.class);
76
77
78
79
80 private static final String TQP_DELETE_BY_COMPARISON_PREFIX = "DELETE#";
81 private static final int TQP_DEFAULT_PERSON_ID = -1;
82
83 private static boolean DISABLE_INTEGRITY_CONSTRAINTS = true;
84 private static boolean ALLOW_MISSING_OPTIONAL_COLUMN = true;
85 private static boolean ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA = true;
86 private static boolean KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS = true;
87
88
89
90 private static int DAO_CACHE_SIZE = 2;
91
92
93
94
95
96
97
98
99
100
101
102 @Autowired
103 public ReferentialSynchroServiceImpl(DataSource dataSource, SynchroConfiguration config) {
104 super(dataSource, config,
105 DISABLE_INTEGRITY_CONSTRAINTS,
106 ALLOW_MISSING_OPTIONAL_COLUMN,
107 ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
108 KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
109 setDaoCacheSize(DAO_CACHE_SIZE);
110 }
111
112
113
114
115
116
117 public ReferentialSynchroServiceImpl() {
118 super(DISABLE_INTEGRITY_CONSTRAINTS,
119 ALLOW_MISSING_OPTIONAL_COLUMN,
120 ALLOW_ADDITIONAL_MANDATORY_COLUMN_IN_SOURCE_SCHEMA,
121 KEEP_WHERE_CLAUSE_ON_QUERIES_BY_FKS);
122 setDaoCacheSize(DAO_CACHE_SIZE);
123 }
124
125
126 @Override
127 public ReferentialSynchroContext createSynchroContext(File sourceDbDirectory,
128 Timestamp lastSynchronizationDate,
129 boolean enableDelete,
130 boolean enableInsertUpdate,
131 SynchroDirection direction,
132 int userId,
133 Set<String> statusCodeIncludes) {
134
135 SynchroContext delegate = super.createSynchroContext(sourceDbDirectory,
136 ReferentialSynchroTables.getImportTablesIncludes());
137
138
139 delegate.getTarget().putAllProperties(Quadrige2Configuration.getInstance().getConnectionProperties());
140
141 ReferentialSynchroContext result = new ReferentialSynchroContext(delegate, direction, userId);
142 result.setLastSynchronizationDate(lastSynchronizationDate);
143 result.setEnableDelete(enableDelete);
144 result.setEnableInsertOrUpdate(enableInsertUpdate);
145 result.setStatusCodeIncludes(statusCodeIncludes);
146 initContext(result);
147
148 return result;
149 }
150
151
152 @Override
153 public ReferentialSynchroContext createSynchroContext(Properties sourceConnectionProperties,
154 Timestamp lastSynchronizationDate,
155 boolean enableDelete,
156 boolean enableInsertUpdate,
157 SynchroDirection direction,
158 int userId,
159 Set<String> statusCodeIncludes) {
160
161 SynchroContext delegate = super.createSynchroContext(sourceConnectionProperties,
162 ReferentialSynchroTables.getImportTablesIncludes());
163
164
165 delegate.getTarget().putAllProperties(Quadrige2Configuration.getInstance().getConnectionProperties());
166
167 ReferentialSynchroContext result = new ReferentialSynchroContext(delegate, direction, userId);
168 result.setLastSynchronizationDate(lastSynchronizationDate);
169 result.setEnableDelete(enableDelete);
170 result.setEnableInsertOrUpdate(enableInsertUpdate);
171 result.setStatusCodeIncludes(statusCodeIncludes);
172 initContext(result);
173
174 return result;
175 }
176
177
178 @Override
179 public void prepare(SynchroContext synchroContext) {
180 Preconditions.checkArgument(synchroContext != null && synchroContext instanceof ReferentialSynchroContext,
181 String.format("The context must be a instance of %s", ReferentialSynchroContext.class.getName()));
182
183 ReferentialSynchroContext referentialSynchroContext = (ReferentialSynchroContext) synchroContext;
184 SynchroDirection direction = referentialSynchroContext.getDirection();
185
186 ReferentialSynchroDatabaseConfiguration target = (ReferentialSynchroDatabaseConfiguration) referentialSynchroContext.getTarget();
187 ReferentialSynchroDatabaseConfiguration source = (ReferentialSynchroDatabaseConfiguration) referentialSynchroContext.getSource();
188
189
190 target.excludeUnusedColumns();
191 source.excludeUnusedColumns();
192
193
194 if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
195 source.setIsMirrorDatabase(true);
196 target.setIsMirrorDatabase(false);
197
198 source.setIsTemporary(false);
199 target.setIsTemporary(false);
200
201
202 target.excludeUnusedColumnsForFileImport();
203 source.excludeUnusedColumnsForFileImport();
204 }
205
206 super.prepare(synchroContext);
207 }
208
209
210 @Override
211 public void synchronize(SynchroContext synchroContext) {
212 Preconditions.checkArgument(synchroContext != null && synchroContext instanceof ReferentialSynchroContext,
213 String.format("The context must be a instance of %s", ReferentialSynchroContext.class.getName()));
214
215 ReferentialSynchroContext referentialSynchroContext = (ReferentialSynchroContext) synchroContext;
216 SynchroDirection direction = referentialSynchroContext.getDirection();
217
218 ReferentialSynchroDatabaseConfiguration target = (ReferentialSynchroDatabaseConfiguration) referentialSynchroContext.getTarget();
219 ReferentialSynchroDatabaseConfiguration source = (ReferentialSynchroDatabaseConfiguration) referentialSynchroContext.getSource();
220
221
222 if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
223 source.setIsMirrorDatabase(true);
224 target.setIsMirrorDatabase(false);
225
226 source.setIsTemporary(false);
227 target.setIsTemporary(false);
228 }
229
230 super.synchronize(synchroContext);
231 }
232
233
234 @Override
235 public Timestamp getSourceLastUpdateDate(SynchroContext synchroContext) {
236 return super.getSourceLastUpdateDate(synchroContext);
237 }
238
239
240 @Override
241 public void finish(SynchroContext synchroContext, SynchroResult resultWithPendingOperation,
242 Map<RejectedRow.Cause, RejectedRow.ResolveStrategy> rejectStrategies) {
243 super.finish(synchroContext, resultWithPendingOperation, rejectStrategies);
244 }
245
246
247
248
249
250
251
252
253
254
255
256 protected void initContext(SynchroContext context) {
257
258
259 context.getTarget().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
260 context.getSource().setColumnUpdateDate(DatabaseColumns.UPDATE_DT.name().toLowerCase());
261
262 if (context instanceof ReferentialSynchroContext) {
263 ReferentialSynchroContext referentialSynchroContext = (ReferentialSynchroContext) context;
264
265
266 {
267 Set<String> statusCodeIncludes = referentialSynchroContext.getStatusCodeIncludes();
268
269 if (CollectionUtils.isEmpty(statusCodeIncludes)) {
270 String configValue = Quadrige2Configuration.getInstance().getImportReferentialStatusIncludes();
271 if (StringUtils.isNotBlank(configValue)) {
272 statusCodeIncludes = Sets.newHashSet(Splitter.on(',').split(configValue));
273 referentialSynchroContext.setStatusCodeIncludes(statusCodeIncludes);
274 }
275 }
276 }
277 }
278 }
279
280
281 @Override
282 protected void prepareRootTable(
283 DaoFactory sourceDaoFactory,
284 DaoFactory targetDaoFactory,
285 SynchroTableMetadata table,
286 SynchroContext context,
287 SynchroResult result) throws SQLException {
288
289 ReferentialSynchroContext referentialContext = (ReferentialSynchroContext) context;
290
291
292
293 if (referentialContext.isEnableInsertOrUpdate()
294 && (referentialContext.isEnableDelete()
295 || !ReferentialSynchroTables.DELETED_ITEM_HISTORY.name().equalsIgnoreCase(table.getName()))) {
296
297 super.prepareRootTable(sourceDaoFactory,
298 targetDaoFactory,
299 table,
300 context,
301 result);
302 }
303
304
305
306 if (referentialContext.isEnableDelete()
307 && !ReferentialSynchroTables.DELETED_ITEM_HISTORY.name().equalsIgnoreCase(table.getName())) {
308 prepareRootTableDeletes(sourceDaoFactory,
309 targetDaoFactory,
310 table,
311 context,
312 result);
313 }
314
315
316 if (referentialContext.isEnableDelete() && referentialContext.getDirection() == SynchroDirection.IMPORT_FILE2LOCAL) {
317 prepareRootTableDeletesFromFile(sourceDaoFactory,
318 targetDaoFactory,
319 table,
320 context,
321 result);
322 }
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341 protected void prepareRootTableDeletes(
342 DaoFactory sourceDaoFactory,
343 DaoFactory targetDaoFactory,
344 SynchroTableMetadata table,
345 SynchroContext context,
346 SynchroResult result) throws SQLException {
347
348 String tableName = table.getName();
349 Set<String> objectTypeFks = ObjectTypes.getObjectTypeFromTableName(tableName, tableName);
350
351 if (CollectionUtils.isEmpty(objectTypeFks)) {
352 return;
353 }
354
355 SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(ReferentialSynchroTables.DELETED_ITEM_HISTORY.name());
356
357 List<List<Object>> columnValues = Lists.newArrayListWithCapacity(objectTypeFks.size());
358 for (String objectTypeFk : objectTypeFks) {
359 columnValues.add(ImmutableList.<Object> of(objectTypeFk));
360 }
361
362
363 Map<String, Object> bindings = createSelectBindingsForTable(context, ReferentialSynchroTables.DELETED_ITEM_HISTORY.name());
364 long count = dihSourceDao.countDataByFks(
365 ImmutableSet.of(DatabaseColumns.OBJECT_TYPE_CD.name()),
366 columnValues,
367 bindings
368 );
369 if (count > 0) {
370 result.addRows(tableName, (int) count);
371 }
372 }
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390 protected void prepareRootTableDeletesFromFile(
391 DaoFactory sourceDaoFactory,
392 DaoFactory targetDaoFactory,
393 SynchroTableMetadata table,
394 SynchroContext context,
395 SynchroResult result) throws SQLException {
396
397 String tableName = table.getName();
398
399
400 if (!ProgramStrategySynchroTables.PROGRAMME.name().equalsIgnoreCase(tableName)
401 && !RuleSynchroTables.RULE_LIST.name().equalsIgnoreCase(tableName)) {
402 return;
403 }
404
405 int count = countMissingPks(sourceDaoFactory, targetDaoFactory, tableName, context);
406 if (count > 0) {
407 result.addRows(tableName, count);
408 }
409
410 }
411
412 private int countMissingPks(DaoFactory sourceDaoFactory,
413 DaoFactory targetDaoFactory,
414 String tableName,
415 SynchroContext context) throws SQLException {
416
417 return getPksStrToDeleteFromFile(sourceDaoFactory, targetDaoFactory, tableName, context).size();
418 }
419
420 private Set<String> getPksStrToDeleteFromFile(DaoFactory sourceDaoFactory,
421 DaoFactory targetDaoFactory,
422 String tableName,
423 SynchroContext context) throws SQLException {
424
425 SynchroTableDao sourceDao = sourceDaoFactory.getSourceDao(tableName);
426 SynchroTableDao targetDao = targetDaoFactory.getSourceDao(tableName);
427 Set<String> sourcePks;
428 Set<String> targetPks;
429 SynchroTableMetadata table = targetDao.getTable();
430
431 if (table.getColumn(DatabaseColumns.STATUS_CD.name().toLowerCase()) != null) {
432
433
434 Set<String> statusCodeColumn = ImmutableSet.of(DatabaseColumns.STATUS_CD.name());
435 List<List<Object>> localStatusCodes = new ArrayList<>();
436 localStatusCodes.add(ImmutableList.<Object> of(StatusCode.LOCAL_ENABLE.getValue()));
437 localStatusCodes.add(ImmutableList.<Object> of(StatusCode.LOCAL_DISABLE.getValue()));
438
439 Map<String, Object> bindings = createSelectBindingsForTable(context, tableName);
440 sourcePks = sourceDao.getPksStrByFks(statusCodeColumn, localStatusCodes, bindings);
441 targetPks = targetDao.getPksStrByFks(statusCodeColumn, localStatusCodes, bindings);
442 } else {
443
444
445 sourcePks = sourceDao.getPksStr();
446 targetPks = targetDao.getPksStr();
447 }
448
449 if (targetPks == null) {
450 targetPks = new HashSet<>();
451 }
452 if (sourcePks == null) {
453 sourcePks = new HashSet<>();
454 }
455 targetPks.removeAll(sourcePks);
456
457 return targetPks;
458 }
459
460
461 @Override
462 protected List<SynchroTableOperation> getRootOperations(
463 DaoFactory sourceDaoFactory,
464 DaoFactory targetDaoFactory,
465 SynchroDatabaseMetadata dbMeta,
466 SynchroContext context) throws SQLException {
467 List<SynchroTableOperation> result = Lists.newArrayList();
468
469 ReferentialSynchroContext referentialContext = (ReferentialSynchroContext) context;
470
471
472 if (referentialContext.isEnableInsertOrUpdate()) {
473 Collection<SynchroTableOperation> defaultOperations = super.getRootOperations(sourceDaoFactory, targetDaoFactory, dbMeta, context);
474 result.addAll(defaultOperations);
475 }
476
477
478 if (referentialContext.isEnableDelete()) {
479 Collection<SynchroTableOperation> deletedItemOperations = getRootDeleteOperations(sourceDaoFactory, targetDaoFactory, dbMeta, context);
480 result.addAll(deletedItemOperations);
481 }
482
483
484 if (referentialContext.isEnableDelete() && referentialContext.getDirection() == SynchroDirection.IMPORT_FILE2LOCAL) {
485 Collection<SynchroTableOperation> deletedItemOperations = getRootDeleteOperationsFromFile(sourceDaoFactory, targetDaoFactory, dbMeta,
486 context);
487 result.addAll(deletedItemOperations);
488 }
489
490 return result;
491 }
492
493 private Collection<SynchroTableOperation> getRootDeleteOperations(
494 DaoFactory sourceDaoFactory,
495 DaoFactory targetDaoFactory,
496 SynchroDatabaseMetadata dbMeta,
497 SynchroContext context) throws SQLException {
498 Preconditions.checkArgument(dbMeta.getConfiguration().isFullMetadataEnable());
499
500 Deque<SynchroTableOperation> result = Queues.newArrayDeque();
501 Map<String, SynchroTableOperation> deletedOperationByTable = Maps.newHashMap();
502 Map<String, SynchroTableOperation> deletedChildrenOperationByTable = Maps.newHashMap();
503
504 SynchroTableDao dihSourceDao = sourceDaoFactory.getSourceDao(ReferentialSynchroTables.DELETED_ITEM_HISTORY.name());
505 Set<String> includedDataTables = context.getTableNames();
506
507
508 Map<String, Object> bindings = createSelectBindingsForTable(context, ReferentialSynchroTables.DELETED_ITEM_HISTORY.name());
509
510 ResultSet dihResultSet = null;
511 List<Object> dihIdsToRemove = Lists.newArrayList();
512 Set<String> tableNamesWithDelete = Sets.newHashSet();
513 boolean doDelete = !context.getTarget().isMirrorDatabase();
514
515 try {
516 LOG.debug(I18n.t("quadrige2.synchro.synchronizeReferential.deletedItems"));
517
518 dihResultSet = dihSourceDao.getData(bindings);
519
520 while (dihResultSet.next()) {
521
522 String objectType = dihResultSet.getString(DatabaseColumns.OBJECT_TYPE_CD.name());
523 String tableName = ObjectTypes.getTableNameFromObjectType(objectType);
524
525 boolean isReferentialTable = StringUtils.isNotBlank(tableName)
526 && includedDataTables.contains(tableName.toUpperCase());
527
528 if (isReferentialTable) {
529 SynchroTableDao targetDao = targetDaoFactory.getSourceDao(tableName);
530 SynchroTableMetadata table = targetDao.getTable();
531
532 String objectCode = dihResultSet.getString(DatabaseColumns.OBJECT_CD.name());
533 String objectId = dihResultSet.getString(DatabaseColumns.OBJECT_ID.name());
534
535
536 if (StringUtils.isNotBlank(objectCode) || StringUtils.isNotBlank(objectId)) {
537
538 if (doDelete) {
539 List<Object> pk;
540
541 if (StringUtils.isNotBlank(objectCode) && !table.isSimpleKey()) {
542 pk = SynchroTableMetadata.fromPkStr(objectCode);
543 }
544 else {
545 pk = StringUtils.isNotBlank(objectCode)
546 ? ImmutableList.<Object> of(objectCode)
547 : ImmutableList.<Object> of(objectId);
548 }
549
550 boolean hasChildTables = table.hasChildJoins();
551
552
553 if (hasChildTables) {
554
555 SynchroTableOperation childrenOperation = deletedChildrenOperationByTable.get(tableName);
556 if (childrenOperation == null) {
557 childrenOperation = new SynchroTableOperation(tableName, context);
558 deletedChildrenOperationByTable.put(tableName, childrenOperation);
559 result.add(childrenOperation);
560 }
561 addChildrenToDelete(table, childrenOperation, ImmutableList.of(pk), result, context);
562 }
563
564
565
566
567
568 SynchroTableOperation operation = new SynchroTableOperation(tableName, context);
569 deletedOperationByTable.put(tableName, operation);
570 operation.setEnableProgress(true);
571 result.add(operation);
572
573 operation.addMissingDelete(pk);
574 }
575 }
576
577
578
579 else {
580 tableNamesWithDelete.add(tableName);
581 }
582 }
583 }
584 } finally {
585 Daos.closeSilently(dihResultSet);
586 }
587
588 if (CollectionUtils.isNotEmpty(dihIdsToRemove)) {
589 SynchroTableOperation operation = new SynchroTableOperation(ReferentialSynchroTables.DELETED_ITEM_HISTORY.name(), context);
590 operation.addChildrenToDeleteFromOneColumn(ReferentialSynchroTables.DELETED_ITEM_HISTORY.name(), DatabaseColumns.REMOTE_ID.name(),
591 dihIdsToRemove);
592 result.add(operation);
593 }
594
595 if (CollectionUtils.isNotEmpty(tableNamesWithDelete)) {
596
597
598 if (!doDelete) {
599 saveTablesWithDelete(tableNamesWithDelete,
600 sourceDaoFactory,
601 targetDaoFactory,
602 dbMeta,
603 context);
604 }
605
606
607 else if (doDelete && context.getSource().isMirrorDatabase()) {
608 addDeletedItemsFromTables(
609 tableNamesWithDelete,
610 result,
611 sourceDaoFactory,
612 targetDaoFactory,
613 dbMeta,
614 context);
615 }
616
617
618
619 else if (doDelete) {
620 saveTablesWithDelete(tableNamesWithDelete,
621 sourceDaoFactory,
622 targetDaoFactory,
623 dbMeta,
624 context);
625
626 addDeletedItemsFromTables(
627 tableNamesWithDelete,
628 result,
629 targetDaoFactory,
630 targetDaoFactory,
631 dbMeta,
632 context);
633 }
634 }
635
636 return result;
637 }
638
639 private Collection<SynchroTableOperation> getRootDeleteOperationsFromFile(
640 DaoFactory sourceDaoFactory,
641 DaoFactory targetDaoFactory,
642 SynchroDatabaseMetadata dbMeta,
643 SynchroContext context) throws SQLException {
644 Preconditions.checkArgument(dbMeta.getConfiguration().isFullMetadataEnable());
645
646 Deque<SynchroTableOperation> result = Queues.newArrayDeque();
647
648
649 result.addAll(getRootDeleteOperationsFromFileOfTable(sourceDaoFactory, targetDaoFactory, dbMeta, context,
650 ProgramStrategySynchroTables.PROGRAMME.name()));
651
652
653 result.addAll(getRootDeleteOperationsFromFileOfTable(sourceDaoFactory, targetDaoFactory, dbMeta, context, RuleSynchroTables.RULE_LIST.name()));
654
655 return result;
656 }
657
658 private Collection<SynchroTableOperation> getRootDeleteOperationsFromFileOfTable(
659 DaoFactory sourceDaoFactory,
660 DaoFactory targetDaoFactory,
661 SynchroDatabaseMetadata dbMeta,
662 SynchroContext context,
663 String tableName) throws SQLException {
664 Preconditions.checkArgument(dbMeta.getConfiguration().isFullMetadataEnable());
665
666 Deque<SynchroTableOperation> result = Queues.newArrayDeque();
667
668
669 Set<String> pksStrToDelete = getPksStrToDeleteFromFile(sourceDaoFactory, targetDaoFactory, tableName, context);
670
671 if (CollectionUtils.isNotEmpty(pksStrToDelete)) {
672 SynchroTableOperation operation = null;
673 SynchroTableOperation childrenOperation = null;
674 SynchroTableDao targetDao = targetDaoFactory.getSourceDao(tableName);
675 SynchroTableMetadata table = targetDao.getTable();
676 boolean hasChildTables = table.hasChildJoins();
677
678 for (String pkStr : pksStrToDelete) {
679
680 List<Object> pk = SynchroTableMetadata.fromPkStr(pkStr);
681
682
683 if (hasChildTables) {
684
685 if (childrenOperation == null) {
686 childrenOperation = new SynchroTableOperation(tableName, context);
687 result.add(childrenOperation);
688 }
689 addChildrenToDelete(table, childrenOperation, ImmutableList.of(pk), result, context);
690 }
691
692
693 if (operation == null) {
694 operation = new SynchroTableOperation(tableName, context);
695
696 operation.setEnableProgress(true);
697 result.add(operation);
698 }
699
700 operation.addMissingDelete(pk);
701 }
702 }
703
704 return result;
705 }
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723 protected final void addChildrenToDelete(
724 SynchroTableMetadata parentTable,
725 SynchroTableOperation childrenOperation,
726 List<List<Object>> parentPks,
727 Deque<SynchroTableOperation> pendingOperations,
728 SynchroContext context) {
729
730 Set<String> pkNames = parentTable.getPkNames();
731
732
733 if (pkNames.size() > 1) {
734 throw new UnsupportedOperationException("Not sure of this implementation: please check before comment out this exception !");
735 }
736
737
738 for (SynchroJoinMetadata join : parentTable.getChildJoins()) {
739 SynchroTableMetadata childTable = join.getTargetTable();
740 SynchroColumnMetadata childTableColumn = join.getTargetColumn();
741
742
743 childrenOperation.addChildrenToDeleteFromManyColumns(childTable.getName(), ImmutableSet.of(childTableColumn.getName()), parentPks);
744 }
745 }
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765 protected void saveTablesWithDelete(
766 Set<String> tableNames,
767 DaoFactory sourceDaoFactory,
768 DaoFactory targetDaoFactory,
769 SynchroDatabaseMetadata dbMeta,
770 SynchroContext context) throws SQLException {
771
772 SynchroBaseDao targetBaseDao = targetDaoFactory.getDao();
773
774
775 targetBaseDao.executeDeleteTempQueryParameter(TQP_DELETE_BY_COMPARISON_PREFIX + "%", true, TQP_DEFAULT_PERSON_ID);
776
777 for (String tableName : tableNames) {
778
779 SynchroTableDao sourceDao = sourceDaoFactory.getSourceDao(tableName);
780 Set<String> pkStrs = sourceDao.getPksStr();
781
782 targetBaseDao.executeInsertIntoTempQueryParameter(
783 ImmutableList.<Object> copyOf(pkStrs),
784 TQP_DELETE_BY_COMPARISON_PREFIX + tableName,
785 TQP_DEFAULT_PERSON_ID
786 );
787 }
788 }
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809 protected void addDeletedItemsFromTables(
810 Set<String> tableNames,
811 Deque<SynchroTableOperation> operations,
812 DaoFactory sourceDaoFactory,
813 DaoFactory targetDaoFactory,
814 SynchroDatabaseMetadata dbMeta,
815 SynchroContext context) throws SQLException {
816
817
818 SynchroTableDao tqpDao = sourceDaoFactory.getSourceDao(SynchroBaseDao.TEMP_QUERY_PARAMETER_TABLE);
819 int tqpValueColumnIndex = tqpDao.getTable().getColumnIndex(SynchroBaseDao.TEMP_QUERY_PARAMETER_VALUE_COLUMN);
820 Preconditions.checkArgument(tqpValueColumnIndex != -1);
821
822
823 Map<String, Object> emptyBinding = Maps.newHashMap();
824 Set<String> fkNames = ImmutableSet.of(SynchroBaseDao.TEMP_QUERY_PARAMETER_PARAM_COLUMN);
825
826
827 for (String tableName : tableNames) {
828 SynchroTableMetadata table = dbMeta.getTable(tableName);
829 boolean hasChildTables = table.hasChildJoins();
830 Set<String> tablePkNames = table.getPkNames();
831 int pkCount = tablePkNames.size();
832
833
834 List<Object> fkValue = ImmutableList.<Object> of(TQP_DELETE_BY_COMPARISON_PREFIX + tableName);
835 ResultSet rs = tqpDao.getDataByFks(
836 fkNames,
837 ImmutableList.of(fkValue),
838 emptyBinding);
839 List<List<Object>> pks = Lists.newArrayList();
840 while (rs.next()) {
841 String pkStr = rs.getString(tqpValueColumnIndex + 1);
842 List<Object> pk = SynchroTableMetadata.fromPkStr(pkStr);
843
844
845 if (pkCount != pk.size()) {
846 String expectedPkStrExample = Joiner.on(String.format(">%s<", SynchroTableMetadata.PK_SEPARATOR)).join(tablePkNames);
847 throw new SynchroTechnicalException(String.format(
848 "Unable to import delete on %s: invalid PK found in the source database (in %s). Should have %s column (e.g. %s).",
849 tableName,
850 SynchroBaseDao.TEMP_QUERY_PARAMETER_TABLE,
851 pkCount,
852 expectedPkStrExample));
853 }
854 pks.add(pk);
855 }
856 rs.close();
857
858
859 if (pks.size() == 0) {
860 throw new SynchroTechnicalException(String.format(
861 "Unable to import delete on %s: No PK found in the source database (in %s). Unable to compare and find PKs to delete.",
862 tableName,
863 SynchroBaseDao.TEMP_QUERY_PARAMETER_TABLE));
864 }
865
866 SynchroTableOperation operation = new SynchroTableOperation(tableName, context);
867 operation.setEnableProgress(true);
868 SynchroTableDao targetTableDao = targetDaoFactory.getTargetDao(tableName, null, operation);
869
870 List<List<Object>> pksToDelete = targetTableDao.getPksByNotFoundFks(
871 tablePkNames,
872 pks,
873 emptyBinding);
874
875
876 pksToDelete = filterExcludeTemporary(table, pksToDelete);
877
878
879 if (CollectionUtils.isNotEmpty(pksToDelete)) {
880
881 operation.addAllMissingDelete(pksToDelete);
882
883
884 if (hasChildTables) {
885 addDeleteChildrenToDeque(table, pksToDelete, operations, context);
886 }
887
888
889 operations.add(operation);
890 }
891
892 }
893
894
895 targetDaoFactory.getDao().executeDeleteTempQueryParameter(TQP_DELETE_BY_COMPARISON_PREFIX + "%", true, TQP_DEFAULT_PERSON_ID);
896 }
897
898
899
900
901
902
903
904
905
906
907
908
909 protected List<List<Object>> filterExcludeTemporary(
910 SynchroTableMetadata table,
911 List<List<Object>> pks) {
912 Set<String> tablePkNames = table.getPkNames();
913
914 if (tablePkNames.size() > 1) {
915 return pks;
916 }
917
918 SynchroColumnMetadata pkColumn = table.getColumn(tablePkNames.iterator().next());
919 Collection<List<Object>> result;
920
921
922 if (SynchroMetadataUtils.isNumericType(pkColumn)) {
923 result = Collections2.filter(pks,
924 new Predicate<List<Object>>() {
925 @Override
926 public boolean apply(@Nullable List<Object> input) {
927 long pk = Long.parseLong(input.get(0).toString());
928 return !TemporaryDataHelper.isTemporaryId(pk);
929 }
930 });
931 }
932
933
934 else {
935
936 result = Collections2.filter(pks,
937 new Predicate<List<Object>>() {
938 @Override
939 public boolean apply(@Nullable List<Object> input) {
940 String pk = input.get(0).toString();
941 return !TemporaryDataHelper.isTemporaryCode(pk);
942 }
943 });
944 }
945
946 return ImmutableList.copyOf(result);
947 }
948
949
950 @Override
951 protected Map<String, Object> createDefaultSelectBindings(SynchroContext context) {
952
953 Map<String, Object> bindings = super.createDefaultSelectBindings(context);
954
955 if (context instanceof ReferentialSynchroContext) {
956 ReferentialSynchroContext dataContext = (ReferentialSynchroContext) context;
957
958
959 bindings.put("userId", dataContext.getUserId());
960 }
961
962 return bindings;
963 }
964
965
966 @Override
967 protected Map<String, Object> createSelectBindingsForTable(SynchroContext context, String tableName) {
968
969 Map<String, Object> result = super.createSelectBindingsForTable(context, tableName);
970
971
972
973 ReferentialSynchroContext referentialSynchroContext = (ReferentialSynchroContext) context;
974 if (CollectionUtils.isNotEmpty(referentialSynchroContext.getTableNamesForced())
975 && referentialSynchroContext.getTableNamesForced().contains(tableName.toUpperCase())) {
976 LOG.debug(String.format("[%s] Forced synchronization (last synchronization date ignored)", tableName));
977 result.remove(SynchroTableMetadata.UPDATE_DATE_BINDPARAM);
978 }
979
980 return result;
981 }
982
983 @Override
984 protected Connection createConnection(String jdbcUrl, String user, String password) throws SQLException {
985 Connection connection = super.createConnection(jdbcUrl, user, password);
986
987
988 fr.ifremer.quadrige2.core.dao.technical.Daos.setTimezone(connection, Quadrige2Configuration.getInstance().getDbTimezone());
989
990 return connection;
991 }
992 }