View Javadoc
1   package fr.ifremer.quadrige2.synchro.intercept.data;
2   
3   /*-
4    * #%L
5    * Quadrige2 Core :: Quadrige2 Synchro Core
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2017 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import com.google.common.base.Preconditions;
27  import com.google.common.collect.ImmutableList;
28  import com.google.common.collect.Maps;
29  import com.google.common.eventbus.Subscribe;
30  import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
31  import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
32  import fr.ifremer.common.synchro.meta.SynchroJoinMetadata;
33  import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
34  import fr.ifremer.common.synchro.meta.SynchroTableMetadata.DuplicateKeyStrategy;
35  import fr.ifremer.common.synchro.meta.event.CreateQueryEvent;
36  import fr.ifremer.common.synchro.meta.event.LoadJoinEvent;
37  import fr.ifremer.common.synchro.meta.event.LoadTableEvent;
38  import fr.ifremer.common.synchro.query.SynchroQueryBuilder;
39  import fr.ifremer.common.synchro.query.SynchroQueryOperator;
40  import fr.ifremer.common.synchro.service.SynchroDatabaseConfiguration;
41  import fr.ifremer.quadrige2.core.dao.system.synchronization.SynchronizationStatus;
42  import fr.ifremer.quadrige2.synchro.intercept.data.internal.*;
43  import fr.ifremer.quadrige2.synchro.meta.data.DataSynchroTables;
44  import fr.ifremer.quadrige2.synchro.service.SynchroDirection;
45  import org.hibernate.tool.hbm2ddl.TableMetadata;
46  
47  import java.io.IOException;
48  import java.sql.Timestamp;
49  import java.util.Map;
50  
51  /**
52   * Manage only data table with columns 'id' AND 'remote_id' :
53   * <ul>
54   * <li>Set as root only if the table as an object_type (OBJECT_TYPE is need for join with PERSON_SESION_VESSEL)
55   * <li>when exporting to server, override 'update_date' column with a systimestamp
56   * <p/>
57   * 
58   * @author Benoit Lavenier <benoit.lavenier@e-is.pro>
59   * @since 1.0
60   */
61  public class DataTableInterceptor extends AbstractDataInterceptor {
62  
63  	private Timestamp systimestamp = null;
64  
65  	private Map<String, Map<Long, Long>> temporaryLocalIdMap = null;
66  
67  	/**
68  	 * <p>
69  	 * Constructor for DataTableInterceptor.
70  	 * </p>
71  	 */
72  	public DataTableInterceptor() {
73  		super(DataSynchroTables.getImportTablesIncludes());
74  	}
75  
76  	/** {@inheritDoc} */
77  	@Override
78  	public boolean doApply(SynchroDatabaseMetadata meta, TableMetadata table) {
79  		return hasColumns(table, getConfig().getColumnRemoteId());
80  	}
81  
82  	/** {@inheritDoc} */
83  	@Override
84  	public SynchroInterceptorBase clone() {
85  		DataTableInterceptor result = (DataTableInterceptor) super.clone();
86  		result.systimestamp = this.systimestamp;
87  		return result;
88  	}
89  
90  	/** {@inheritDoc} */
91  	@Override
92  	protected void doClose() throws IOException {
93  		super.doClose();
94  		// nullify map
95  		temporaryLocalIdMap = null;
96  	}
97  
98  	/**
99  	 * <p>
100 	 * handleQuery.
101 	 * </p>
102 	 * 
103 	 * @param e
104 	 *            a {@link fr.ifremer.common.synchro.meta.event.CreateQueryEvent} object.
105 	 */
106 	@Subscribe
107 	public void handleQuery(CreateQueryEvent e) {
108 		SynchroDirection direction = getConfig().getDirection();
109 		SynchroTableMetadata table = e.source;
110 
111 		// Set table as root if synchronization status column exists
112 		String remoteIdColumn = getConfig().getColumnRemoteId();
113 		boolean hasSynchronizationStatusColumn = table.getColumnIndex(getConfig().getColumnSynchronizationStatus()) != -1;
114 		boolean hasRemoteIdColumn = table.getColumnIndex(remoteIdColumn) != -1;
115 
116 		String columnId = "NOT_EXISTING_COLUMN";
117 		if (table.getPkNames() != null && table.getPkNames().size() == 1) {
118 			columnId = table.getPkNames().iterator().next();
119 		}
120 
121 		// IMPORT: Server DB -> Temp DB
122 		if (hasRemoteIdColumn && direction == SynchroDirection.IMPORT_SERVER2TEMP) {
123 
124 			switch (e.queryName) {
125 			// Select queries : remove unsed columns
126 			case select:
127 			case selectFromUpdateDate:
128 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
129 						.deleteColumnIfExists(remoteIdColumn)
130 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
131 						.build();
132 				break;
133 			case insert:
134 			case update: {
135 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
136 						.deleteColumnIfExists(remoteIdColumn);
137 				if (hasSynchronizationStatusColumn) {
138 					qb.setColumnValue(getConfig().getColumnSynchronizationStatus(),
139 							String.format("'%s'", SynchronizationStatus.SYNCHRONIZED.value()));
140 				}
141 				e.sql = qb.build();
142 				break;
143 			}
144 			default:
145 				break;
146 			}
147 		}
148 
149 		// IMPORT: Temp DB -> Local DB
150 		else if (hasRemoteIdColumn && direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
151 
152 			switch (e.queryName) {
153 			// Select queries : remove unsed columns
154 			case select:
155 			case selectFromUpdateDate:
156 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
157 						.deleteColumnIfExists(remoteIdColumn)
158 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
159 						.build();
160 				break;
161 			case insert: {
162 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
163 						.deleteColumn(remoteIdColumn)
164 						.replaceColumn(columnId, remoteIdColumn);
165 				if (hasSynchronizationStatusColumn) {
166 					qb.setColumnValue(getConfig().getColumnSynchronizationStatus(), "'SYNC'");
167 				}
168 				qb.addColumn(columnId, e.source.getSelectSequenceNextValString());
169 				e.sql = qb.build();
170 				break;
171 			}
172 			case update: {
173 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
174 						.deleteColumn(remoteIdColumn)
175 						.replaceColumn(columnId, remoteIdColumn);
176 				if (hasSynchronizationStatusColumn) {
177 					qb.setColumnValue(getConfig().getColumnSynchronizationStatus(), "'SYNC'");
178 				}
179 				e.sql = qb.build();
180 				break;
181 			}
182 			default:
183 				break;
184 			}
185 		}
186 
187 		// EXPORT: Local DB -> Temp DB
188 		else if (direction == SynchroDirection.EXPORT_LOCAL2TEMP) {
189 
190 			// Nothing to do ?
191 		}
192 
193 		// EXPORT: Temp DB -> Server DB
194 		else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
195 
196 			switch (e.queryName) {
197 			case selectMaxUpdateDate:
198 				// Do not try to retrieve max update_date on server
199 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
200 						.addWhere(SynchroQueryOperator.AND, "1=2")
201 						.build();
202 				break;
203 			// Select queries : remove unsed columns
204 			case select:
205 			case selectFromUpdateDate:
206 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
207 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
208 						.deleteColumnIfExists(remoteIdColumn)
209 						.build();
210 				break;
211 			case insert: {
212 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
213 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
214 						.deleteColumnIfExists(remoteIdColumn);
215 				e.sql = qb.build();
216 				break;
217 			}
218 			case update: {
219 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
220 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
221 						.deleteColumnIfExists(remoteIdColumn);
222 				e.sql = qb.build();
223 				break;
224 			}
225 			default:
226 				break;
227 			}
228 		}
229 
230 		// IMPORT: File -> Local DB
231 		else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
232 
233 			// Nothing to do ?
234 		}
235 
236 		// EXPORT: Local DB -> File
237 		else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
238 
239 			// Nothing to do ?
240 		}
241 	}
242 
243 	/**
244 	 * <p>
245 	 * handleTableLoad.
246 	 * </p>
247 	 * 
248 	 * @param e
249 	 *            a {@link fr.ifremer.common.synchro.meta.event.LoadTableEvent} object.
250 	 */
251 	@Subscribe
252 	public void handleTableLoad(LoadTableEvent e) {
253 
254 		SynchroTableMetadata table = e.table;
255 		String remoteIdColumn = getConfig().getColumnRemoteId();
256 		String updateDateColumn = getConfig().getColumnUpdateDate();
257 
258 		// Set table as root if synchronization status column exists
259 		boolean hasSynchronizationStatusColumn = table.getColumnIndex(getConfig().getColumnSynchronizationStatus()) != -1;
260 		boolean hasUpdateDateColumn = table.getColumnIndex(updateDateColumn) != -1;
261 
262 		String pkName = table.getPkNames().iterator().next();
263 
264 		SynchroDirection direction = getConfig().getDirection();
265 
266 		// Import: Temp DB -> Local DB
267 		if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
268 			// Define a natural id on REMOTE_ID, if not already define
269 			// (could have been override - e.g. ProduceSortingMeasurementInterceptor)
270 			if (!table.hasUniqueConstraint(remoteIdColumn)) {
271 				table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(remoteIdColumn), DuplicateKeyStrategy.REPLACE);
272 			}
273 
274 			// Make sure status = SYNC before override a row
275 			if (!getConfig().isForceEditedRowOverride()
276 					&& hasSynchronizationStatusColumn
277 					&& hasUpdateDateColumn) {
278 
279 				ImportEditedRowInterceptor synchronizationStatusInterceptor = new ImportEditedRowInterceptor(
280 						getConfig(),
281 						table.getName().toLowerCase(),
282 						pkName,
283 						table.getSelectColumnIndex(pkName),
284 						table.getSelectColumnIndex(updateDateColumn)
285 						);
286 				table.addInterceptor(synchronizationStatusInterceptor);
287 			}
288 		}
289 
290 		// Export: Temp DB -> Server DB
291 		else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
292 			if (systimestamp == null) {
293 				systimestamp = checkAndGetSystemTimestamp(getConfig());
294 			}
295 
296 			// Define a natural id on ID column, if not already define
297 			// (could have been override - e.g. ProduceSortingMeasurementInterceptor)
298 			if (!table.hasUniqueConstraint(remoteIdColumn)) {
299 				table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(pkName), DuplicateKeyStrategy.REPLACE);
300 			}
301 
302 			int updateDateColumnIndex = table.getSelectColumnIndex(getConfig().getColumnUpdateDate());
303 			boolean hasUpdateDate = updateDateColumnIndex != -1;
304 
305 			ExportPkRemoteIdInterceptor remoteIdInterceptor = new ExportPkRemoteIdInterceptor(
306 					getConfig(),
307 					table.getName(),
308 					pkName,
309 					table.getSelectColumnIndex(pkName),
310 					hasSynchronizationStatusColumn,
311 					hasUpdateDate,
312 					updateDateColumnIndex,
313 					systimestamp,
314 					table.hasChildJoins());
315 
316 			table.addInterceptor(remoteIdInterceptor);
317 		}
318 
319 		// Export: Local DB -> File
320 		else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
321 
322 			// Define a natural id on ID column, if not already define
323 			// (could have been override)
324 			if (!table.hasUniqueConstraint(remoteIdColumn)) {
325 				table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(pkName), DuplicateKeyStrategy.REPLACE);
326 			}
327 
328 			int synchronizationStatusColumnIndex = table.getSelectColumnIndex(getConfig().getColumnSynchronizationStatus());
329 			boolean hasSynchronizationStatus = synchronizationStatusColumnIndex != -1;
330 
331 			// Update sync status to 'FILE_SYNC', in the local DB
332 			if (hasSynchronizationStatus) {
333 				int[] selectPkIndexes = table.getSelectPkIndexs();
334 
335 				ExportToFileSynchronizationStatusInterceptor sourceSynchronizationStatusUpdateInterceptor = new ExportToFileSynchronizationStatusInterceptor(
336 						table.getName(),
337 						selectPkIndexes,
338 						synchronizationStatusColumnIndex);
339 				table.addInterceptor(sourceSynchronizationStatusUpdateInterceptor);
340 			}
341 		}
342 
343 		// Import: File ->Local DB
344 		else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
345 
346 			// Add the PK interceptor
347 			ImportFromFilePkInterceptor pkInterceptor = new ImportFromFilePkInterceptor(
348 					table.getName().toLowerCase(),
349 					table.getSelectColumnIndex(pkName),
350 					getTemporaryLocalIdMap()
351 					);
352 			table.addInterceptor(pkInterceptor);
353 
354 		}
355 	}
356 
357 	/**
358 	 * <p>
359 	 * handleJoinLoad.
360 	 * </p>
361 	 * 
362 	 * @param e
363 	 *            a {@link fr.ifremer.common.synchro.meta.event.LoadJoinEvent} object.
364 	 */
365 	@Subscribe
366 	public void handleJoinLoad(LoadJoinEvent e) {
367 		SynchroJoinMetadata join = e.join;
368 		// Do not apply if mirror database
369 		if (!join.isValid()) {
370 			return;
371 		}
372 
373 		SynchroTableMetadata fkTable = join.getFkTable();
374 		SynchroDirection direction = getConfig().getDirection();
375 
376 		// If the FK table is the current table, or not processed by this interceptor
377 		if (fkTable == e.source || isNotInterceptedTable(fkTable)) {
378 
379 			// Get the PK column name
380 			SynchroTableMetadata pkTable = join.getPkTable();
381 			String pkTableName = pkTable.getName().toLowerCase();
382 			String columnId = "";
383 			if (pkTable.getPkNames().size() == 1) {
384 				columnId = pkTable.getPkNames().iterator().next();
385 			}
386 
387 			if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
388 
389 				String fkColumnName = join.getFkColumn().getName().toLowerCase();
390 				int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
391 
392 				// Create and configure a interceptor, to rewrite remote id
393 				ImportRemoteIdInterceptor remoteIdInterceptor = new ImportRemoteIdInterceptor(
394 						getConfig(),
395 						pkTableName,
396 						fkColumnName,
397 						fkColumnIndex,
398 						columnId,
399 						join.getFkColumn().isNullable());
400 
401 				if (!fkTable.containsInterceptor(remoteIdInterceptor)) {
402 					fkTable.addInterceptor(remoteIdInterceptor);
403 				}
404 
405 			} else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
406 				String fkColumnName = join.getFkColumn().getName().toLowerCase();
407 				int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
408 
409 				// Create and configure a interceptor, to rewrite remote id
410 				ExportFkRemoteIdInterceptor remoteIdInterceptor = new ExportFkRemoteIdInterceptor(
411 						getConfig(),
412 						pkTableName,
413 						fkColumnName,
414 						fkColumnIndex,
415 						columnId,
416 						join.getFkColumn().isNullable());
417 
418 				if (!fkTable.containsInterceptor(remoteIdInterceptor)) {
419 					fkTable.addInterceptor(remoteIdInterceptor);
420 				}
421 
422 			} else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
423 				String fkColumnName = join.getFkColumn().getName().toLowerCase();
424 				int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
425 
426 				ImportFromFileFkInterceptor localIdInterceptor = new ImportFromFileFkInterceptor(
427 						pkTableName,
428 						fkColumnIndex,
429 						getTemporaryLocalIdMap());
430 
431 				if (!fkTable.containsInterceptor(localIdInterceptor)) {
432 					fkTable.addInterceptor(localIdInterceptor);
433 				}
434 			}
435 
436 		}
437 
438 	}
439 
440 	/* -- Internal methods -- */
441 
442 	/**
443 	 * <p>
444 	 * isNotInterceptedTable.
445 	 * </p>
446 	 * 
447 	 * @param table
448 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
449 	 * @return a boolean.
450 	 */
451 	protected boolean isNotInterceptedTable(SynchroTableMetadata table) {
452 
453 		return table.getPkNames().size() != 1
454 				|| !table.getColumnNames().contains(getConfig().getColumnRemoteId());
455 	}
456 
457 	private Map<String, Map<Long, Long>> getTemporaryLocalIdMap() {
458 		if (temporaryLocalIdMap == null) {
459 			temporaryLocalIdMap = Maps.newHashMap();
460 		}
461 		return temporaryLocalIdMap;
462 	}
463 
464 	/* -- Protected methods -- */
465 
466 	/**
467 	 * <p>
468 	 * checkAndGetSystemTimestamp.
469 	 * </p>
470 	 * 
471 	 * @param configuration
472 	 *            a {@link fr.ifremer.common.synchro.service.SynchroDatabaseConfiguration} object.
473 	 * @return a {@link java.sql.Timestamp} object.
474 	 */
475 	protected Timestamp checkAndGetSystemTimestamp(SynchroDatabaseConfiguration configuration) {
476 		Timestamp systimestamp = configuration.getSystemTimestamp();
477 		Preconditions.checkNotNull(systimestamp,
478 				String.format("Could not found system timestamp in database configuration. This is need for %s", getClass().getSimpleName()));
479 		return systimestamp;
480 	}
481 }