1 package fr.ifremer.quadrige2.synchro.intercept.data;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.base.Preconditions;
27 import com.google.common.collect.ImmutableList;
28 import com.google.common.collect.Maps;
29 import com.google.common.eventbus.Subscribe;
30 import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
31 import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
32 import fr.ifremer.common.synchro.meta.SynchroJoinMetadata;
33 import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
34 import fr.ifremer.common.synchro.meta.SynchroTableMetadata.DuplicateKeyStrategy;
35 import fr.ifremer.common.synchro.meta.event.CreateQueryEvent;
36 import fr.ifremer.common.synchro.meta.event.LoadJoinEvent;
37 import fr.ifremer.common.synchro.meta.event.LoadTableEvent;
38 import fr.ifremer.common.synchro.query.SynchroQueryBuilder;
39 import fr.ifremer.common.synchro.query.SynchroQueryOperator;
40 import fr.ifremer.common.synchro.service.SynchroDatabaseConfiguration;
41 import fr.ifremer.quadrige2.core.dao.system.synchronization.SynchronizationStatus;
42 import fr.ifremer.quadrige2.synchro.intercept.data.internal.*;
43 import fr.ifremer.quadrige2.synchro.meta.data.DataSynchroTables;
44 import fr.ifremer.quadrige2.synchro.service.SynchroDirection;
45 import org.hibernate.tool.hbm2ddl.TableMetadata;
46
47 import java.io.IOException;
48 import java.sql.Timestamp;
49 import java.util.Map;
50
51
52
53
54
55
56
57
58
59
60
61 public class DataTableInterceptor extends AbstractDataInterceptor {
62
63 private Timestamp systimestamp = null;
64
65 private Map<String, Map<Long, Long>> temporaryLocalIdMap = null;
66
67
68
69
70
71
72 public DataTableInterceptor() {
73 super(DataSynchroTables.getImportTablesIncludes());
74 }
75
76
77 @Override
78 public boolean doApply(SynchroDatabaseMetadata meta, TableMetadata table) {
79 return hasColumns(table, getConfig().getColumnRemoteId());
80 }
81
82
83 @Override
84 public SynchroInterceptorBase clone() {
85 DataTableInterceptor result = (DataTableInterceptor) super.clone();
86 result.systimestamp = this.systimestamp;
87 return result;
88 }
89
90
91 @Override
92 protected void doClose() throws IOException {
93 super.doClose();
94
95 temporaryLocalIdMap = null;
96 }
97
98
99
100
101
102
103
104
105
106 @Subscribe
107 public void handleQuery(CreateQueryEvent e) {
108 SynchroDirection direction = getConfig().getDirection();
109 SynchroTableMetadata table = e.source;
110
111
112 String remoteIdColumn = getConfig().getColumnRemoteId();
113 boolean hasSynchronizationStatusColumn = table.getColumnIndex(getConfig().getColumnSynchronizationStatus()) != -1;
114 boolean hasRemoteIdColumn = table.getColumnIndex(remoteIdColumn) != -1;
115
116 String columnId = "NOT_EXISTING_COLUMN";
117 if (table.getPkNames() != null && table.getPkNames().size() == 1) {
118 columnId = table.getPkNames().iterator().next();
119 }
120
121
122 if (hasRemoteIdColumn && direction == SynchroDirection.IMPORT_SERVER2TEMP) {
123
124 switch (e.queryName) {
125
126 case select:
127 case selectFromUpdateDate:
128 e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
129 .deleteColumnIfExists(remoteIdColumn)
130 .deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
131 .build();
132 break;
133 case insert:
134 case update: {
135 SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
136 .deleteColumnIfExists(remoteIdColumn);
137 if (hasSynchronizationStatusColumn) {
138 qb.setColumnValue(getConfig().getColumnSynchronizationStatus(),
139 String.format("'%s'", SynchronizationStatus.SYNCHRONIZED.value()));
140 }
141 e.sql = qb.build();
142 break;
143 }
144 default:
145 break;
146 }
147 }
148
149
150 else if (hasRemoteIdColumn && direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
151
152 switch (e.queryName) {
153
154 case select:
155 case selectFromUpdateDate:
156 e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
157 .deleteColumnIfExists(remoteIdColumn)
158 .deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
159 .build();
160 break;
161 case insert: {
162 SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
163 .deleteColumn(remoteIdColumn)
164 .replaceColumn(columnId, remoteIdColumn);
165 if (hasSynchronizationStatusColumn) {
166 qb.setColumnValue(getConfig().getColumnSynchronizationStatus(), "'SYNC'");
167 }
168 qb.addColumn(columnId, e.source.getSelectSequenceNextValString());
169 e.sql = qb.build();
170 break;
171 }
172 case update: {
173 SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
174 .deleteColumn(remoteIdColumn)
175 .replaceColumn(columnId, remoteIdColumn);
176 if (hasSynchronizationStatusColumn) {
177 qb.setColumnValue(getConfig().getColumnSynchronizationStatus(), "'SYNC'");
178 }
179 e.sql = qb.build();
180 break;
181 }
182 default:
183 break;
184 }
185 }
186
187
188 else if (direction == SynchroDirection.EXPORT_LOCAL2TEMP) {
189
190
191 }
192
193
194 else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
195
196 switch (e.queryName) {
197 case selectMaxUpdateDate:
198
199 e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
200 .addWhere(SynchroQueryOperator.AND, "1=2")
201 .build();
202 break;
203
204 case select:
205 case selectFromUpdateDate:
206 e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
207 .deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
208 .deleteColumnIfExists(remoteIdColumn)
209 .build();
210 break;
211 case insert: {
212 SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
213 .deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
214 .deleteColumnIfExists(remoteIdColumn);
215 e.sql = qb.build();
216 break;
217 }
218 case update: {
219 SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
220 .deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
221 .deleteColumnIfExists(remoteIdColumn);
222 e.sql = qb.build();
223 break;
224 }
225 default:
226 break;
227 }
228 }
229
230
231 else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
232
233
234 }
235
236
237 else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
238
239
240 }
241 }
242
243
244
245
246
247
248
249
250
251 @Subscribe
252 public void handleTableLoad(LoadTableEvent e) {
253
254 SynchroTableMetadata table = e.table;
255 String remoteIdColumn = getConfig().getColumnRemoteId();
256 String updateDateColumn = getConfig().getColumnUpdateDate();
257
258
259 boolean hasSynchronizationStatusColumn = table.getColumnIndex(getConfig().getColumnSynchronizationStatus()) != -1;
260 boolean hasUpdateDateColumn = table.getColumnIndex(updateDateColumn) != -1;
261
262 String pkName = table.getPkNames().iterator().next();
263
264 SynchroDirection direction = getConfig().getDirection();
265
266
267 if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
268
269
270 if (!table.hasUniqueConstraint(remoteIdColumn)) {
271 table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(remoteIdColumn), DuplicateKeyStrategy.REPLACE);
272 }
273
274
275 if (!getConfig().isForceEditedRowOverride()
276 && hasSynchronizationStatusColumn
277 && hasUpdateDateColumn) {
278
279 ImportEditedRowInterceptor synchronizationStatusInterceptor = new ImportEditedRowInterceptor(
280 getConfig(),
281 table.getName().toLowerCase(),
282 pkName,
283 table.getSelectColumnIndex(pkName),
284 table.getSelectColumnIndex(updateDateColumn)
285 );
286 table.addInterceptor(synchronizationStatusInterceptor);
287 }
288 }
289
290
291 else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
292 if (systimestamp == null) {
293 systimestamp = checkAndGetSystemTimestamp(getConfig());
294 }
295
296
297
298 if (!table.hasUniqueConstraint(remoteIdColumn)) {
299 table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(pkName), DuplicateKeyStrategy.REPLACE);
300 }
301
302 int updateDateColumnIndex = table.getSelectColumnIndex(getConfig().getColumnUpdateDate());
303 boolean hasUpdateDate = updateDateColumnIndex != -1;
304
305 ExportPkRemoteIdInterceptor remoteIdInterceptor = new ExportPkRemoteIdInterceptor(
306 getConfig(),
307 table.getName(),
308 pkName,
309 table.getSelectColumnIndex(pkName),
310 hasSynchronizationStatusColumn,
311 hasUpdateDate,
312 updateDateColumnIndex,
313 systimestamp,
314 table.hasChildJoins());
315
316 table.addInterceptor(remoteIdInterceptor);
317 }
318
319
320 else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
321
322
323
324 if (!table.hasUniqueConstraint(remoteIdColumn)) {
325 table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(pkName), DuplicateKeyStrategy.REPLACE);
326 }
327
328 int synchronizationStatusColumnIndex = table.getSelectColumnIndex(getConfig().getColumnSynchronizationStatus());
329 boolean hasSynchronizationStatus = synchronizationStatusColumnIndex != -1;
330
331
332 if (hasSynchronizationStatus) {
333 int[] selectPkIndexes = table.getSelectPkIndexs();
334
335 ExportToFileSynchronizationStatusInterceptor sourceSynchronizationStatusUpdateInterceptor = new ExportToFileSynchronizationStatusInterceptor(
336 table.getName(),
337 selectPkIndexes,
338 synchronizationStatusColumnIndex);
339 table.addInterceptor(sourceSynchronizationStatusUpdateInterceptor);
340 }
341 }
342
343
344 else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
345
346
347 ImportFromFilePkInterceptor pkInterceptor = new ImportFromFilePkInterceptor(
348 table.getName().toLowerCase(),
349 table.getSelectColumnIndex(pkName),
350 getTemporaryLocalIdMap()
351 );
352 table.addInterceptor(pkInterceptor);
353
354 }
355 }
356
357
358
359
360
361
362
363
364
365 @Subscribe
366 public void handleJoinLoad(LoadJoinEvent e) {
367 SynchroJoinMetadata join = e.join;
368
369 if (!join.isValid()) {
370 return;
371 }
372
373 SynchroTableMetadata fkTable = join.getFkTable();
374 SynchroDirection direction = getConfig().getDirection();
375
376
377 if (fkTable == e.source || isNotInterceptedTable(fkTable)) {
378
379
380 SynchroTableMetadata pkTable = join.getPkTable();
381 String pkTableName = pkTable.getName().toLowerCase();
382 String columnId = "";
383 if (pkTable.getPkNames().size() == 1) {
384 columnId = pkTable.getPkNames().iterator().next();
385 }
386
387 if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
388
389 String fkColumnName = join.getFkColumn().getName().toLowerCase();
390 int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
391
392
393 ImportRemoteIdInterceptor remoteIdInterceptor = new ImportRemoteIdInterceptor(
394 getConfig(),
395 pkTableName,
396 fkColumnName,
397 fkColumnIndex,
398 columnId,
399 join.getFkColumn().isNullable());
400
401 if (!fkTable.containsInterceptor(remoteIdInterceptor)) {
402 fkTable.addInterceptor(remoteIdInterceptor);
403 }
404
405 } else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
406 String fkColumnName = join.getFkColumn().getName().toLowerCase();
407 int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
408
409
410 ExportFkRemoteIdInterceptor remoteIdInterceptor = new ExportFkRemoteIdInterceptor(
411 getConfig(),
412 pkTableName,
413 fkColumnName,
414 fkColumnIndex,
415 columnId,
416 join.getFkColumn().isNullable());
417
418 if (!fkTable.containsInterceptor(remoteIdInterceptor)) {
419 fkTable.addInterceptor(remoteIdInterceptor);
420 }
421
422 } else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
423 String fkColumnName = join.getFkColumn().getName().toLowerCase();
424 int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
425
426 ImportFromFileFkInterceptor localIdInterceptor = new ImportFromFileFkInterceptor(
427 pkTableName,
428 fkColumnIndex,
429 getTemporaryLocalIdMap());
430
431 if (!fkTable.containsInterceptor(localIdInterceptor)) {
432 fkTable.addInterceptor(localIdInterceptor);
433 }
434 }
435
436 }
437
438 }
439
440
441
442
443
444
445
446
447
448
449
450
451 protected boolean isNotInterceptedTable(SynchroTableMetadata table) {
452
453 return table.getPkNames().size() != 1
454 || !table.getColumnNames().contains(getConfig().getColumnRemoteId());
455 }
456
457 private Map<String, Map<Long, Long>> getTemporaryLocalIdMap() {
458 if (temporaryLocalIdMap == null) {
459 temporaryLocalIdMap = Maps.newHashMap();
460 }
461 return temporaryLocalIdMap;
462 }
463
464
465
466
467
468
469
470
471
472
473
474
475 protected Timestamp checkAndGetSystemTimestamp(SynchroDatabaseConfiguration configuration) {
476 Timestamp systimestamp = configuration.getSystemTimestamp();
477 Preconditions.checkNotNull(systimestamp,
478 String.format("Could not found system timestamp in database configuration. This is need for %s", getClass().getSimpleName()));
479 return systimestamp;
480 }
481 }