View Javadoc
1   package fr.ifremer.quadrige3.synchro.intercept.data;
2   
3   /*-
4    * #%L
5    * Quadrige3 Core :: Quadrige3 Synchro Core
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2017 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import com.google.common.collect.ImmutableList;
27  import com.google.common.eventbus.Subscribe;
28  import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
29  import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
30  import fr.ifremer.common.synchro.meta.SynchroJoinMetadata;
31  import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
32  import fr.ifremer.common.synchro.meta.SynchroTableMetadata.DuplicateKeyStrategy;
33  import fr.ifremer.common.synchro.meta.event.CreateQueryEvent;
34  import fr.ifremer.common.synchro.meta.event.LoadJoinEvent;
35  import fr.ifremer.common.synchro.meta.event.LoadTableEvent;
36  import fr.ifremer.common.synchro.query.SynchroQueryBuilder;
37  import fr.ifremer.common.synchro.query.SynchroQueryOperator;
38  import fr.ifremer.common.synchro.service.SynchroDatabaseConfiguration;
39  import fr.ifremer.quadrige3.core.dao.system.synchronization.SynchronizationStatus;
40  import fr.ifremer.quadrige3.core.dao.technical.Assert;
41  import fr.ifremer.quadrige3.synchro.intercept.data.internal.*;
42  import fr.ifremer.quadrige3.synchro.meta.data.DataSynchroTables;
43  import fr.ifremer.quadrige3.synchro.service.SynchroDirection;
44  import org.hibernate.tool.hbm2ddl.TableMetadata;
45  
46  import java.sql.Timestamp;
47  
48  /**
49   * Manage only data table with columns 'id' AND 'remote_id' :
50   * <ul>
51   * <li>Set as root only if the table as an object_type (OBJECT_TYPE is need for join with PERSON_SESION_VESSEL)
52   * <li>when exporting to server, override 'update_date' column with a systimestamp
53   * <p/>
54   * 
55   * @author Benoit Lavenier <benoit.lavenier@e-is.pro>
56   * @since 1.0
57   */
58  public class DataTableInterceptor extends AbstractDataInterceptor {
59  
60  	private Timestamp systimestamp = null;
61  
62  	/**
63  	 * <p>
64  	 * Constructor for DataTableInterceptor.
65  	 * </p>
66  	 */
67  	public DataTableInterceptor() {
68  		super(DataSynchroTables.getImportTablesIncludes());
69  	}
70  
71  	/** {@inheritDoc} */
72  	@Override
73  	public boolean doApply(SynchroDatabaseMetadata meta, TableMetadata table) {
74  		return hasColumns(table, getConfig().getColumnRemoteId());
75  	}
76  
77  	/** {@inheritDoc} */
78  	@Override
79  	public SynchroInterceptorBase clone() {
80  		DataTableInterceptor result = (DataTableInterceptor) super.clone();
81  		result.systimestamp = this.systimestamp;
82  		return result;
83  	}
84  
85  	/**
86  	 * <p>
87  	 * handleQuery.
88  	 * </p>
89  	 * 
90  	 * @param e
91  	 *            a {@link fr.ifremer.common.synchro.meta.event.CreateQueryEvent} object.
92  	 */
93  	@Subscribe
94  	public void handleQuery(CreateQueryEvent e) {
95  		SynchroDirection direction = getConfig().getDirection();
96  		SynchroTableMetadata table = e.source;
97  
98  		// Set table as root if synchronization status column exists
99  		String remoteIdColumn = getConfig().getColumnRemoteId();
100 		boolean hasSynchronizationStatusColumn = table.getColumnIndex(getConfig().getColumnSynchronizationStatus()) != -1;
101 		boolean hasRemoteIdColumn = table.getColumnIndex(remoteIdColumn) != -1;
102 
103 		String columnId = "NOT_EXISTING_COLUMN";
104 		if (table.getPkNames() != null && table.getPkNames().size() == 1) {
105 			columnId = table.getPkNames().iterator().next();
106 		}
107 
108 		// IMPORT: Server DB -> Temp DB
109 		if (hasRemoteIdColumn && direction == SynchroDirection.IMPORT_SERVER2TEMP) {
110 
111 			switch (e.queryName) {
112 			// Select queries : remove unsed columns
113 			case select:
114 			case selectFromUpdateDate:
115 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
116 						.deleteColumnIfExists(remoteIdColumn)
117 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
118 						.build();
119 				break;
120 			case insert:
121 			case update: {
122 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
123 						.deleteColumnIfExists(remoteIdColumn);
124 				if (hasSynchronizationStatusColumn) {
125 					qb.setColumnValue(getConfig().getColumnSynchronizationStatus(),
126 							String.format("'%s'", SynchronizationStatus.SYNCHRONIZED.value()));
127 				}
128 				e.sql = qb.build();
129 				break;
130 			}
131 			default:
132 				break;
133 			}
134 		}
135 
136 		// IMPORT: Temp DB -> Local DB
137 		else if (hasRemoteIdColumn && direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
138 
139 			switch (e.queryName) {
140 			// Select queries : remove unsed columns
141 			case select:
142 			case selectFromUpdateDate:
143 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
144 						.deleteColumnIfExists(remoteIdColumn)
145 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
146 						.build();
147 				break;
148 			case insert: {
149 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
150 						.deleteColumn(remoteIdColumn)
151 						.replaceColumn(columnId, remoteIdColumn);
152 				if (hasSynchronizationStatusColumn) {
153 					qb.setColumnValue(getConfig().getColumnSynchronizationStatus(), "'SYNC'");
154 				}
155 				qb.addColumn(columnId, e.source.getSelectSequenceNextValString());
156 				e.sql = qb.build();
157 				break;
158 			}
159 			case update: {
160 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
161 						.deleteColumn(remoteIdColumn)
162 						.replaceColumn(columnId, remoteIdColumn);
163 				if (hasSynchronizationStatusColumn) {
164 					qb.setColumnValue(getConfig().getColumnSynchronizationStatus(), "'SYNC'");
165 				}
166 				e.sql = qb.build();
167 				break;
168 			}
169 			default:
170 				break;
171 			}
172 		}
173 
174 		// EXPORT: Local DB -> Temp DB
175 		else if (direction == SynchroDirection.EXPORT_LOCAL2TEMP) {
176 
177 			// Nothing to do ?
178 		}
179 
180 		// EXPORT: Temp DB -> Server DB
181 		else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
182 
183 			switch (e.queryName) {
184 			case selectMaxUpdateDate:
185 				// Do not try to retrieve max update_date on server
186 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
187 						.addWhere(SynchroQueryOperator.AND, "1=2")
188 						.build();
189 				break;
190 			// Select queries : remove unsed columns
191 			case select:
192 			case selectFromUpdateDate:
193 				e.sql = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
194 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
195 						.deleteColumnIfExists(remoteIdColumn)
196 						.build();
197 				break;
198 			case insert: {
199 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
200 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
201 						.deleteColumnIfExists(remoteIdColumn);
202 				e.sql = qb.build();
203 				break;
204 			}
205 			case update: {
206 				SynchroQueryBuilder qb = SynchroQueryBuilder.newBuilder(e.queryName, e.sql)
207 						.deleteColumnIfExists(getConfig().getColumnSynchronizationStatus())
208 						.deleteColumnIfExists(remoteIdColumn);
209 				e.sql = qb.build();
210 				break;
211 			}
212 			default:
213 				break;
214 			}
215 		}
216 
217 		// IMPORT: File -> Local DB
218 		else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
219 
220 			// Nothing to do ?
221 		}
222 
223 		// EXPORT: Local DB -> File
224 		else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
225 
226 			// Nothing to do ?
227 		}
228 	}
229 
230 	/**
231 	 * <p>
232 	 * handleTableLoad.
233 	 * </p>
234 	 * 
235 	 * @param e
236 	 *            a {@link fr.ifremer.common.synchro.meta.event.LoadTableEvent} object.
237 	 */
238 	@Subscribe
239 	public void handleTableLoad(LoadTableEvent e) {
240 
241 		SynchroTableMetadata table = e.table;
242 		String remoteIdColumn = getConfig().getColumnRemoteId();
243 		String updateDateColumn = getConfig().getColumnUpdateDate();
244 
245 		// Set table as root if synchronization status column exists
246 		boolean hasSynchronizationStatusColumn = table.getColumnIndex(getConfig().getColumnSynchronizationStatus()) != -1;
247 		boolean hasUpdateDateColumn = table.getColumnIndex(updateDateColumn) != -1;
248 
249 		String pkName = table.getPkNames().iterator().next();
250 
251 		SynchroDirection direction = getConfig().getDirection();
252 
253 		// Import: Temp DB -> Local DB
254 		if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
255 			// Define a natural id on REMOTE_ID, if not already define
256 			// (could have been override - e.g. ProduceSortingMeasurementInterceptor)
257 			if (!table.hasUniqueConstraint(remoteIdColumn)) {
258 				table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(remoteIdColumn), DuplicateKeyStrategy.REPLACE);
259 			}
260 
261 			// Make sure status = SYNC before override a row
262 			if (!getConfig().isForceEditedRowOverride()
263 					&& hasSynchronizationStatusColumn
264 					&& hasUpdateDateColumn) {
265 
266 				ImportEditedRowInterceptor synchronizationStatusInterceptor = new ImportEditedRowInterceptor(
267 						getConfig(),
268 						table.getName().toLowerCase(),
269 						pkName,
270 						table.getSelectColumnIndex(pkName),
271 						table.getSelectColumnIndex(updateDateColumn)
272 						);
273 				table.addInterceptor(synchronizationStatusInterceptor);
274 			}
275 		}
276 
277 		// Export: Temp DB -> Server DB
278 		else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
279 			if (systimestamp == null) {
280 				systimestamp = checkAndGetSystemTimestamp(getConfig());
281 			}
282 
283 			// Define a natural id on ID column, if not already define
284 			// (could have been override - e.g. ProduceSortingMeasurementInterceptor)
285 			if (!table.hasUniqueConstraint(remoteIdColumn)) {
286 				table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(pkName), DuplicateKeyStrategy.REPLACE);
287 			}
288 
289 			int updateDateColumnIndex = table.getSelectColumnIndex(getConfig().getColumnUpdateDate());
290 			boolean hasUpdateDate = updateDateColumnIndex != -1;
291 
292 			ExportPkRemoteIdInterceptor remoteIdInterceptor = new ExportPkRemoteIdInterceptor(
293 					getConfig(),
294 					table.getName(),
295 					pkName,
296 					table.getSelectColumnIndex(pkName),
297 					hasSynchronizationStatusColumn,
298 					hasUpdateDate,
299 					updateDateColumnIndex,
300 					systimestamp,
301 					table.hasChildJoins());
302 
303 			table.addInterceptor(remoteIdInterceptor);
304 		}
305 
306 		// Export: Local DB -> File
307 		else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
308 
309 			// Define a natural id on ID column, if not already define
310 			// (could have been override)
311 			if (!table.hasUniqueConstraint(remoteIdColumn)) {
312 				table.addUniqueConstraint(remoteIdColumn, ImmutableList.of(pkName), DuplicateKeyStrategy.REPLACE);
313 			}
314 
315 			int synchronizationStatusColumnIndex = table.getSelectColumnIndex(getConfig().getColumnSynchronizationStatus());
316 			boolean hasSynchronizationStatus = synchronizationStatusColumnIndex != -1;
317 
318 			// Update sync status to 'FILE_SYNC', in the local DB
319 			if (hasSynchronizationStatus) {
320 				int[] selectPkIndexes = table.getSelectPkIndexes();
321 
322 				ExportToFileSynchronizationStatusInterceptor sourceSynchronizationStatusUpdateInterceptor = new ExportToFileSynchronizationStatusInterceptor(
323 						table.getName(),
324 						selectPkIndexes,
325 						synchronizationStatusColumnIndex);
326 				table.addInterceptor(sourceSynchronizationStatusUpdateInterceptor);
327 			}
328 		}
329 
330 		// Import: File ->Local DB
331 		else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
332 
333 			// Add the PK interceptor
334 			ImportFromFilePkInterceptor pkInterceptor = new ImportFromFilePkInterceptor(
335 					table.getName().toLowerCase(),
336 					table.getSelectColumnIndex(pkName),
337 					getConfig()
338 					);
339 			table.addInterceptor(pkInterceptor);
340 
341 		}
342 	}
343 
344 	/**
345 	 * <p>
346 	 * handleJoinLoad.
347 	 * </p>
348 	 * 
349 	 * @param e
350 	 *            a {@link fr.ifremer.common.synchro.meta.event.LoadJoinEvent} object.
351 	 */
352 	@Subscribe
353 	public void handleJoinLoad(LoadJoinEvent e) {
354 		SynchroJoinMetadata join = e.join;
355 		// Do not apply if mirror database
356 		if (!join.isValid()) {
357 			return;
358 		}
359 
360 		SynchroTableMetadata fkTable = join.getFkTable();
361 		SynchroDirection direction = getConfig().getDirection();
362 
363 		// If the FK table is the current table, or not processed by this interceptor
364 		if (fkTable == e.source || isNotInterceptedTable(fkTable)) {
365 
366 			// Get the PK column name
367 			SynchroTableMetadata pkTable = join.getPkTable();
368 			String pkTableName = pkTable.getName().toLowerCase();
369 			String columnId = "";
370 			if (pkTable.getPkNames().size() == 1) {
371 				columnId = pkTable.getPkNames().iterator().next();
372 			}
373 
374 			if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
375 
376 				String fkColumnName = join.getFkColumn().getName().toLowerCase();
377 				int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
378 
379 				// Create and configure a interceptor, to rewrite remote id
380 				ImportRemoteIdInterceptor remoteIdInterceptor = new ImportRemoteIdInterceptor(
381 						getConfig(),
382 						pkTableName,
383 						fkColumnName,
384 						fkColumnIndex,
385 						columnId,
386 						join.getFkColumn().isNullable());
387 
388 				if (!fkTable.containsInterceptor(remoteIdInterceptor)) {
389 					fkTable.addInterceptor(remoteIdInterceptor);
390 				}
391 
392 			} else if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
393 				String fkColumnName = join.getFkColumn().getName().toLowerCase();
394 				int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
395 
396 				// Create and configure a interceptor, to rewrite remote id
397 				ExportFkRemoteIdInterceptor remoteIdInterceptor = new ExportFkRemoteIdInterceptor(
398 						getConfig(),
399 						pkTableName,
400 						fkColumnName,
401 						fkColumnIndex,
402 						columnId,
403 						join.getFkColumn().isNullable());
404 
405 				if (!fkTable.containsInterceptor(remoteIdInterceptor)) {
406 					fkTable.addInterceptor(remoteIdInterceptor);
407 				}
408 
409 			} else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
410 				String fkColumnName = join.getFkColumn().getName().toLowerCase();
411 				int fkColumnIndex = fkTable.getSelectColumnIndex(fkColumnName);
412 
413 				ImportFromFileFkInterceptor localIdInterceptor = new ImportFromFileFkInterceptor(
414 						pkTableName,
415 						fkColumnIndex,
416 						getConfig()
417 				);
418 
419 				if (!fkTable.containsInterceptor(localIdInterceptor)) {
420 					fkTable.addInterceptor(localIdInterceptor);
421 				}
422 			}
423 
424 		}
425 
426 	}
427 
428 	/* -- Internal methods -- */
429 
430 	/**
431 	 * <p>
432 	 * isNotInterceptedTable.
433 	 * </p>
434 	 * 
435 	 * @param table
436 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
437 	 * @return a boolean.
438 	 */
439 	protected boolean isNotInterceptedTable(SynchroTableMetadata table) {
440 
441 		return table.getPkNames().size() != 1
442 				|| !table.getColumnNames().contains(getConfig().getColumnRemoteId());
443 	}
444 
445 	/* -- Protected methods -- */
446 
447 	/**
448 	 * <p>
449 	 * checkAndGetSystemTimestamp.
450 	 * </p>
451 	 * 
452 	 * @param configuration
453 	 *            a {@link fr.ifremer.common.synchro.service.SynchroDatabaseConfiguration} object.
454 	 * @return a {@link java.sql.Timestamp} object.
455 	 */
456 	protected Timestamp checkAndGetSystemTimestamp(SynchroDatabaseConfiguration configuration) {
457 		Timestamp systimestamp = configuration.getSystemTimestamp();
458 		Assert.notNull(systimestamp,
459 				String.format("Could not found system timestamp in database configuration. This is need for %s", getClass().getSimpleName()));
460 		return systimestamp;
461 	}
462 }