View Javadoc
1   package fr.ifremer.quadrige2.synchro.intercept.data;
2   
3   /*-
4    * #%L
5    * Quadrige2 Core :: Quadrige2 Synchro Core
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2017 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import com.google.common.base.Joiner;
27  import com.google.common.collect.ImmutableList;
28  import com.google.common.collect.ImmutableSet;
29  import com.google.common.collect.Lists;
30  import com.google.common.collect.Maps;
31  import com.google.common.eventbus.Subscribe;
32  import fr.ifremer.common.synchro.dao.Daos;
33  import fr.ifremer.common.synchro.dao.SynchroTableDao;
34  import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
35  import fr.ifremer.common.synchro.intercept.SynchroOperationRepository;
36  import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
37  import fr.ifremer.common.synchro.meta.event.CreateQueryEvent;
38  import fr.ifremer.common.synchro.meta.event.LoadTableEvent;
39  import fr.ifremer.common.synchro.query.SynchroQueryBuilder;
40  import fr.ifremer.common.synchro.query.SynchroQueryName;
41  import fr.ifremer.common.synchro.query.SynchroQueryOperator;
42  import fr.ifremer.quadrige2.core.config.Quadrige2Configuration;
43  import fr.ifremer.quadrige2.core.dao.referential.ObjectTypeCode;
44  import fr.ifremer.quadrige2.core.dao.system.synchronization.SynchronizationStatus;
45  import fr.ifremer.quadrige2.synchro.meta.DatabaseColumns;
46  import fr.ifremer.quadrige2.synchro.meta.data.DataSynchroTables;
47  import fr.ifremer.quadrige2.synchro.service.SynchroDirection;
48  import fr.ifremer.quadrige2.synchro.service.data.DataSynchroDatabaseConfiguration;
49  import fr.ifremer.quadrige2.synchro.vo.SynchroDateOperatorVO;
50  import org.apache.commons.collections.CollectionUtils;
51  import org.apache.commons.collections.MapUtils;
52  import org.apache.commons.lang3.StringUtils;
53  import org.hibernate.LockMode;
54  
55  import java.io.IOException;
56  import java.sql.PreparedStatement;
57  import java.sql.ResultSet;
58  import java.sql.SQLException;
59  import java.util.*;
60  
61  /**
62   * <p>
63   * SurveyInterceptor class.
64   * </p>
65   *
66   */
67  public class SurveyInterceptor extends AbstractDataInterceptor {
68  
69  	/** Constant <code>NATURAL_ID_UC_NAME</code> */
70  	public static final String UNIQUE_CONSTRAINT_NAME = "NATURAL_IDC";
71  
72  	public static final List<String> UNIQUE_CONSTRAINT_COLUMNS = ImmutableList.of(
73  			DatabaseColumns.SURVEY_DT.name(),
74  			DatabaseColumns.SURVEY_TIME.name(),
75  			DatabaseColumns.MON_LOC_ID.name());
76  
77  	private int columnSurveyIdIndex = -1;
78  	private boolean enableIntegrityConstraints = true;
79  	private boolean forceDuplication = false;
80  	private PreparedStatement selectTargetProgCdsStatement;
81  	private PreparedStatement selectSourceProgCdsStatement;
82  
83  	/**
84  	 * <p>
85  	 * Constructor for SurveyInterceptor.
86  	 * </p>
87  	 */
88  	public SurveyInterceptor() {
89  		super(DataSynchroTables.SURVEY.name());
90  		setEnableOnWrite(true);
91  	}
92  
93  	/** {@inheritDoc} */
94  	@Override
95  	public void init(DataSynchroDatabaseConfiguration config) {
96  		super.init(config);
97  		enableIntegrityConstraints = config.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER
98  				|| config.getDirection() == SynchroDirection.IMPORT_TEMP2LOCAL;
99  		forceDuplication = config.isForceDuplication();
100 
101 		setEnableCheckUniqueConstraint(config.getDirection() == SynchroDirection.EXPORT_TEMP2SERVER
102 				|| config.getDirection() == SynchroDirection.IMPORT_FILE2LOCAL);
103 	}
104 
105 	/** {@inheritDoc} */
106 	@Override
107 	public SynchroInterceptorBase clone() {
108 		SurveyInterceptor newBean = (SurveyInterceptor) super.clone();
109 		newBean.columnSurveyIdIndex = this.columnSurveyIdIndex;
110 		newBean.enableIntegrityConstraints = this.enableIntegrityConstraints;
111 		newBean.forceDuplication = this.forceDuplication;
112 		newBean.selectSourceProgCdsStatement = selectSourceProgCdsStatement;
113 		return newBean;
114 	}
115 
116 	/** {@inheritDoc} */
117 	@Override
118 	protected void doClose() throws IOException {
119 		super.doClose();
120 
121 		// Close statement
122 		Daos.closeSilently(selectTargetProgCdsStatement);
123 		selectTargetProgCdsStatement = null;
124 
125 		Daos.closeSilently(selectSourceProgCdsStatement);
126 		selectSourceProgCdsStatement = null;
127 
128 	}
129 
130 	/**
131 	 * <p>
132 	 * handleLoadTable.
133 	 * </p>
134 	 *
135 	 * @param e
136 	 *            a {@link fr.ifremer.common.synchro.meta.event.LoadTableEvent} object.
137 	 */
138 	@Subscribe
139 	public void handleLoadTable(LoadTableEvent e) {
140 		SynchroTableMetadata table = e.table;
141 		SynchroDirection direction = getConfig().getDirection();
142 
143 		columnSurveyIdIndex = e.table.getSelectColumnIndex(DatabaseColumns.SURVEY_ID.name());
144 
145 		table.setRoot(true);
146 
147 		// Export: Temp DB -> Server DB
148 		if (direction == SynchroDirection.EXPORT_TEMP2SERVER) {
149 			// Please note that the columns list is missing a check on PROG_CD (see mantis #35293).
150 			// But the final check and result is done inside the function doOnCheckUniqueConstraint()
151 			table.addUniqueConstraint(UNIQUE_CONSTRAINT_NAME, UNIQUE_CONSTRAINT_COLUMNS, SynchroTableMetadata.DuplicateKeyStrategy.REJECT);
152 
153 			// Enable lock on update
154 			table.setLockOnUpdate(LockMode.UPGRADE_NOWAIT);
155 		}
156 
157 		// Import: File -> Local DB
158 		if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
159 			// Please note that the columns list is missing a check on PROG_CD (see mantis #35293).
160 			// But the final check and result is done inside the function doOnCheckUniqueConstraint()
161 			table.addUniqueConstraint(UNIQUE_CONSTRAINT_NAME, UNIQUE_CONSTRAINT_COLUMNS, SynchroTableMetadata.DuplicateKeyStrategy.REJECT);
162 		}
163 	}
164 
165 	/**
166 	 * <p>
167 	 * handleQuery.
168 	 * </p>
169 	 *
170 	 * @param e
171 	 *            a {@link fr.ifremer.common.synchro.meta.event.CreateQueryEvent} object.
172 	 */
173 	@Subscribe
174 	public void handleQuery(CreateQueryEvent e) {
175 		SynchroDirection direction = getConfig().getDirection();
176 
177 		switch (e.queryName) {
178 			// Select queries : remove unused columns
179 			case count:
180 			case countFromUpdateDate:
181 			case select:
182 			case selectFromUpdateDate:
183 			case selectMaxUpdateDate:
184 				if (direction == SynchroDirection.IMPORT_SERVER2TEMP
185 						|| direction == SynchroDirection.IMPORT_NO_TEMP) {
186 					// Add restriction on user access rights
187 					e.sql = addRestrictionOnImport(e.source, e.queryName, e.sql);
188 
189 					// Add a filter for debug (filter on test survey)
190 					e.sql = addDebugRestriction(e.sql, DatabaseColumns.SURVEY_ID.name());
191 				}
192 
193 				else if (direction == SynchroDirection.IMPORT_TEMP2LOCAL) {
194 					// Add restriction on PK filter
195 					e.sql = addRestrictionOnImportFromTempDB(e.source, e.queryName, e.sql);
196 
197 					// Add a filter for debug (filter on test survey)
198 					e.sql = addDebugRestriction(e.sql, DatabaseColumns.SURVEY_ID.name());
199 				}
200 
201 				else if (direction == SynchroDirection.EXPORT_LOCAL2TEMP) {
202 					// Add restriction on user access rights
203 					e.sql = addRestrictionOnExport(e.source, e.queryName, e.sql);
204 
205 					// Add a filter for debug (filter on test survey)
206 					e.sql = addDebugRestriction(e.sql, getConfig().getColumnRemoteId());
207 				}
208 
209 				else if (direction == SynchroDirection.IMPORT_FILE2LOCAL) {
210 					// Add restriction on user access rights
211 					e.sql = addRestrictionOnImportFromFile(e.source, e.queryName, e.sql);
212 
213 					// Add a filter for debug (filter on test survey)
214 					e.sql = addDebugRestriction(e.sql, getConfig().getColumnRemoteId());
215 				}
216 
217 				else if (direction == SynchroDirection.EXPORT_LOCAL2FILE) {
218 					// Add restriction on user access rights
219 					e.sql = addRestrictionOnExportToFile(e.source, e.queryName, e.sql);
220 
221 					// Add a filter for debug (filter on test survey)
222 					e.sql = addDebugRestriction(e.sql, getConfig().getColumnRemoteId());
223 				}
224 				break;
225 
226 			default:
227 				break;
228 		}
229 	}
230 
231 	/** {@inheritDoc} */
232 	@Override
233 	protected void doOnWrite(Object[] data,
234 							 List<Object> pk,
235 							 SynchroTableDao sourceDao,
236 							 SynchroTableDao targetDao,
237 							 SynchroOperationRepository buffer,
238 							 boolean insert) throws SQLException {
239 
240 		// If root entity: no transformation
241 		if (buffer == null) {
242 			return;
243 		}
244 
245 		Object surveyId = data[columnSurveyIdIndex];
246 
247 		if (surveyId != null) {
248 			buffer.addChildToUpdateFromManyColumns(DataSynchroTables.QUALIFICATION_HISTORY.name(),
249 					ImmutableSet.of(DatabaseColumns.OBJECT_TYPE_CD.name(), DatabaseColumns.QUAL_HIST_ELEMENT_ID.name()),
250 					Lists.newArrayList(ObjectTypeCode.SURVEY.value(), Long.parseLong(surveyId.toString())));
251 			buffer.addChildToUpdateFromManyColumns(DataSynchroTables.VALIDATION_HISTORY.name(),
252 					ImmutableSet.of(DatabaseColumns.OBJECT_TYPE_CD.name(), DatabaseColumns.VALID_HIST_ELEMENT_ID.name()),
253 					Lists.newArrayList(ObjectTypeCode.SURVEY.value(), Long.parseLong(surveyId.toString())));
254 		}
255 	}
256 
257 	@Override
258 	public void doOnCheckUniqueConstraint(Object[] data,
259 										  List<Object> pk,
260 										  SynchroTableDao sourceDao,
261 										  SynchroTableDao targetDao,
262 										  Map<String, List<Object>> duplicatedKeys) throws SQLException {
263 		// Explications: une contrainte d'uncité "classique" a été posée, mais elle ne regarde pas la correspondance
264 		// avec PROG_CD.
265 		// Si cette premier contrainte d'unicité n'a pas de doublon, alors inutile d'aller plus loin.
266 		// En revanche, si elle retourne un doublon, il faut aller plus loin, en comparant les PROG_CG des 2 SURVEY
267 		// (target et source).
268 		// Si des programmes se croisent, alors il y a un doublon.
269 
270 		// SKip, if not duplicated key found on UNIQUE_CONSTRAINT_COLUMS
271 		if (MapUtils.isEmpty(duplicatedKeys) || !duplicatedKeys.containsKey(UNIQUE_CONSTRAINT_NAME)) {
272 			return;
273 		}
274 
275 		// Remove existing duplication form list (will in re-add later if this is a real duplicated key)
276 		List<Object> potentialDuplicateKey = duplicatedKeys.remove(UNIQUE_CONSTRAINT_NAME);
277 
278 		// Get programs from target DB
279 		Object remoteIdToCkeck = potentialDuplicateKey.iterator().next();
280 		if (selectTargetProgCdsStatement == null || selectTargetProgCdsStatement.isClosed()) {
281 			selectTargetProgCdsStatement = targetDao.getPreparedStatement(createSelectProgCdsBySimilarSurveySql());
282 		}
283 		selectTargetProgCdsStatement.setObject(1, remoteIdToCkeck);
284 		ResultSet rs = selectTargetProgCdsStatement.executeQuery();
285 		Map<String, Integer> targetSurveyIdsByProgCd = Maps.newHashMap();
286 		while (rs.next()) {
287 			targetSurveyIdsByProgCd.put(rs.getString(1), Integer.parseInt(rs.getObject(2).toString()));
288 		}
289 		rs.close();
290 
291 		// Get programs from the source DB
292 		if (selectSourceProgCdsStatement == null || selectSourceProgCdsStatement.isClosed()) {
293 			selectSourceProgCdsStatement = sourceDao.getPreparedStatement(createSelectProgCdsBySurveyIdSql());
294 		}
295 		Object localId = Integer.parseInt(pk.iterator().next().toString());
296 		selectSourceProgCdsStatement.setObject(1, localId);
297 		rs = selectSourceProgCdsStatement.executeQuery();
298 		List<String> sourceProgCds = Lists.newArrayList();
299 		while (rs.next()) {
300 			sourceProgCds.add(rs.getString(1));
301 		}
302 		rs.close();
303 
304 		Collection sameProgCds = CollectionUtils.intersection(targetSurveyIdsByProgCd.keySet(), sourceProgCds);
305 		if (CollectionUtils.isNotEmpty(sameProgCds)) {
306 			// WARNING: should be a Integer = same type as remote_id duplicate key result
307 			Integer duplicateKey = targetSurveyIdsByProgCd.get((String) sameProgCds.iterator().next());
308 			duplicatedKeys.put(UNIQUE_CONSTRAINT_NAME, ImmutableList.<Object> of(duplicateKey));
309 		}
310 	}
311 
312 	/* -- Internal methods -- */
313 
314 	/**
315 	 * <p>
316 	 * addRestrictionOnImport.
317 	 * </p>
318 	 *
319 	 * @param table
320 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
321 	 * @param queryName
322 	 *            a {@link fr.ifremer.common.synchro.query.SynchroQueryName} object.
323 	 * @param sql
324 	 *            a {@link java.lang.String} object.
325 	 * @return a {@link java.lang.String} object.
326 	 */
327 	protected String addRestrictionOnImport(SynchroTableMetadata table, SynchroQueryName queryName, String sql) {
328 
329 		DataSynchroDatabaseConfiguration databaseConfiguration = getConfig();
330 		boolean enableUpdateDateFilter = SynchroQueryName.withUpdateDate(queryName);
331 
332 		SynchroQueryBuilder queryBuilder = SynchroQueryBuilder.newBuilder(queryName, sql);
333 
334 		// select
335 		// -> need to add a 'distinct' because of inner joins
336 		if (queryName == SynchroQueryName.count
337 				|| queryName == SynchroQueryName.countFromUpdateDate) {
338 			queryBuilder.replaceColumn("count(*)", "count(distinct t.survey_id)");
339 		} else {
340 			queryBuilder.setColumnDistinct(true);
341 
342 			String updateDtColumn = DatabaseColumns.UPDATE_DT.name();
343 			if (!queryBuilder.constainsColumn(updateDtColumn)
344 					&& queryBuilder.constainsColumn("t." + updateDtColumn)) {
345 				updateDtColumn = "t." + updateDtColumn;
346 			}
347 
348 			if (queryBuilder.constainsColumn(updateDtColumn)) {
349 				// select: update_dt = max(update_dt, sampling_oper.update_dt)
350 				queryBuilder.replaceColumn(updateDtColumn,
351 						String.format("GREATEST(t.%s, (select max(%s) from sampling_operation where survey_id = t.survey_id)) AS %s",
352 								DatabaseColumns.UPDATE_DT.name(),
353 								DatabaseColumns.UPDATE_DT.name(),
354 								DatabaseColumns.UPDATE_DT.name()));
355 			}
356 		}
357 
358 		// from: programs
359 		String programJoinFilter = createProgramCodesFilter("INNER JOIN SURVEY_PROG sp ON sp.survey_id=t.survey_id AND sp.prog_cd IN (%s)");
360 		if (StringUtils.isNotBlank(programJoinFilter)) {
361 			queryBuilder.addJoin(programJoinFilter);
362 		} else {
363 			queryBuilder.addJoin("INNER JOIN SURVEY_PROG sp ON sp.survey_id=t.survey_id");
364 		}
365 
366 		// from: program privilege on user
367 		// Mantis #34991 add program privilege on department
368 		queryBuilder.addJoin("INNER JOIN QUSER u on u.QUSER_ID=:userId");
369 		queryBuilder.addJoin("LEFT OUTER JOIN PROG_QUSER_PROG_PRIV up ON up.prog_cd=sp.prog_cd AND up.quser_id=u.QUSER_ID");
370 		queryBuilder.addJoin("LEFT OUTER JOIN PROG_DEP_PROG_PRIV dp ON dp.PROG_CD=sp.prog_cd AND dp.DEP_ID=u.DEP_ID");
371 
372 		// from: program privilege on user
373 		queryBuilder.addJoin("LEFT OUTER JOIN SAMPLING_OPERATION so ON so.survey_id=t.survey_id");
374 
375 		// where: if rights updated
376 		if (enableUpdateDateFilter) {
377 			// Select if survey updated
378 			queryBuilder.addWhere(SynchroQueryOperator.OR, "t." + DatabaseColumns.UPDATE_DT.name(), ">", ":updateDate");
379 			// Select id sampling_operation updated
380 			queryBuilder.addWhere(SynchroQueryOperator.OR, "so." + DatabaseColumns.UPDATE_DT.name(), ">", ":updateDate");
381 		}
382 
383 		// where: limit to pks (for import by Pk)
384 		String pkFilter = createPkFilter(table);
385 		if (StringUtils.isNotBlank(pkFilter)) {
386 			// Apply Pk filter, but do not apply date restriction
387 			queryBuilder.addWhere(SynchroQueryOperator.AND, pkFilter);
388 		}
389 
390 		// where: filter on period, if a date column exists AND no Pk filter
391 		else if (databaseConfiguration.getDataStartDate() != null
392 				&& databaseConfiguration.getDataEndDate() != null) {
393 			queryBuilder.addWhere(SynchroQueryOperator.AND, String.format(
394 					"t.%s  >= :startDate AND t.%s <= :endDate", DatabaseColumns.SURVEY_DT.name(), DatabaseColumns.SURVEY_DT.name()));
395 		}
396 
397 		queryBuilder.addWhere(SynchroQueryOperator.AND, "(up.QUSER_ID is not null or dp.DEP_ID is not null)");
398 
399 		return queryBuilder.build();
400 	}
401 
402 	/**
403 	 * <p>
404 	 * addRestrictionOnImportFromTempDB.
405 	 * </p>
406 	 *
407 	 * @param table
408 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
409 	 * @param queryName
410 	 *            a {@link fr.ifremer.common.synchro.query.SynchroQueryName} object.
411 	 * @param sql
412 	 *            a {@link java.lang.String} object.
413 	 * @return a {@link java.lang.String} object.
414 	 */
415 	protected String addRestrictionOnImportFromTempDB(SynchroTableMetadata table, SynchroQueryName queryName, String sql) {
416 
417 		String pkFilter = createPkFilter(table);
418 		if (StringUtils.isBlank(pkFilter)) {
419 			return sql;
420 		}
421 
422 		// where: limit to pks (for import by Pk) - need for batch import (see mantis #30765)
423 		SynchroQueryBuilder queryBuilder = SynchroQueryBuilder.newBuilder(queryName, sql);
424 		queryBuilder.addWhere(SynchroQueryOperator.AND, pkFilter);
425 		return queryBuilder.build();
426 
427 	}
428 
429 	/**
430 	 * <p>
431 	 * addRestrictionOnImportFromFile.
432 	 * </p>
433 	 *
434 	 * @param table
435 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
436 	 * @param queryName
437 	 *            a {@link fr.ifremer.common.synchro.query.SynchroQueryName} object.
438 	 * @param sql
439 	 *            a {@link java.lang.String} object.
440 	 * @return a {@link java.lang.String} object.
441 	 */
442 	protected String addRestrictionOnImportFromFile(SynchroTableMetadata table, SynchroQueryName queryName, String sql) {
443 
444 		SynchroQueryBuilder queryBuilder = SynchroQueryBuilder.newBuilder(queryName, sql);
445 
446 		// select
447 		// -> need to add a 'distinct' because of inner joins
448 		if (queryName == SynchroQueryName.count
449 				|| queryName == SynchroQueryName.countFromUpdateDate) {
450 			queryBuilder.replaceColumn("count(*)", "count(distinct t.survey_id)");
451 		} else {
452 			queryBuilder.setColumnDistinct(true);
453 		}
454 
455 		// where: limit to pks (for import by Pk)
456 		String pkFilter = createPkFilter(table);
457 		if (StringUtils.isNotBlank(pkFilter)) {
458 			// Apply Pk filter, but do not apply date restriction
459 			queryBuilder.addWhere(SynchroQueryOperator.AND, pkFilter);
460 		}
461 
462 		return queryBuilder.build();
463 	}
464 
465 	/**
466 	 * <p>
467 	 * addRestrictionOnExport.
468 	 * </p>
469 	 *
470 	 * @param table
471 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
472 	 * @param queryName
473 	 *            a {@link fr.ifremer.common.synchro.query.SynchroQueryName} object.
474 	 * @param sql
475 	 *            a {@link java.lang.String} object.
476 	 * @return a {@link java.lang.String} object.
477 	 */
478 	protected String addRestrictionOnExport(SynchroTableMetadata table, SynchroQueryName queryName, String sql) {
479 
480 		SynchroQueryBuilder queryBuilder = SynchroQueryBuilder.newBuilder(queryName, sql);
481 
482 		// select
483 		// -> need to add a 'distinct' because of inner join
484 		if (queryName == SynchroQueryName.count
485 				|| queryName == SynchroQueryName.countFromUpdateDate) {
486 			queryBuilder.replaceColumn("count(*)", "count(distinct t.survey_id)");
487 		} else {
488 			queryBuilder.setColumnDistinct(true);
489 		}
490 
491 		// from: programs
492 		String programJoinFilter = createProgramCodesFilter("INNER JOIN SURVEY_PROG sp ON sp.survey_id=t.survey_id AND sp.prog_cd IN (%s)");
493 		if (StringUtils.isNotBlank(programJoinFilter)) {
494 			queryBuilder.addJoin(programJoinFilter);
495 		} else {
496 			queryBuilder.addJoin("INNER JOIN SURVEY_PROG sp ON sp.survey_id=t.survey_id");
497 		}
498 
499 		// from: program privilege on user
500 		queryBuilder.addJoin("INNER JOIN PROG_QUSER_PROG_PRIV up ON up.prog_cd=sp.prog_cd AND up.quser_id=:userId");
501 
502 		// where: 'ready_to_sync' data AND 'file_sync' validated data (see Mantis #28806)
503 		// and valid_dt is not null (see Mantis #30372)
504 		String whereClause = String.format("(%s='%s') OR (%s='%s' AND %s IS NOT NULL)",
505 				DatabaseColumns.SYNCHRONIZATION_STATUS.name(), SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue(),
506 				DatabaseColumns.SYNCHRONIZATION_STATUS.name(), SynchronizationStatus.SYNCHRONIZED_WITH_FILE.getValue(),
507 				DatabaseColumns.SURVEY_VALID_DT);
508 		queryBuilder.addWhere(SynchroQueryOperator.AND, whereClause);
509 
510 		return queryBuilder.build();
511 	}
512 
513 	/**
514 	 * <p>
515 	 * addRestrictionOnExportToFile.
516 	 * </p>
517 	 *
518 	 * @param table
519 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
520 	 * @param queryName
521 	 *            a {@link fr.ifremer.common.synchro.query.SynchroQueryName} object.
522 	 * @param sql
523 	 *            a {@link java.lang.String} object.
524 	 * @return a {@link java.lang.String} object.
525 	 */
526 	protected String addRestrictionOnExportToFile(SynchroTableMetadata table, SynchroQueryName queryName, String sql) {
527 
528 		SynchroQueryBuilder queryBuilder = SynchroQueryBuilder.newBuilder(queryName, sql);
529 
530 		// select
531 		// -> need to add a 'distinct' because of inner join
532 		if (queryName == SynchroQueryName.count
533 				|| queryName == SynchroQueryName.countFromUpdateDate) {
534 			queryBuilder.replaceColumn("count(*)", "count(distinct t.survey_id)");
535 		} else {
536 			queryBuilder.setColumnDistinct(true);
537 		}
538 
539 		// from: programs
540 		String programJoinFilter = createProgramCodesFilter("INNER JOIN SURVEY_PROG sp ON sp.survey_id=t.survey_id AND sp.prog_cd IN (%s)");
541 		if (StringUtils.isNotBlank(programJoinFilter)) {
542 			queryBuilder.addJoin(programJoinFilter);
543 		}
544 
545 		// where: limit to controlled data
546 		queryBuilder.addWhere(SynchroQueryOperator.AND,
547 				String.format("%s is not null", DatabaseColumns.SURVEY_CONTROL_DT.name()));
548 
549 		// where: if only dirty data (see mantis #27242)
550 		boolean hasFilterDirtyOnly = getConfig().isDirtyOnly();
551 		if (hasFilterDirtyOnly) {
552 			queryBuilder.addWhere(SynchroQueryOperator.AND,
553 					String.format("%s IN ('%s', '%s')",
554 							DatabaseColumns.SYNCHRONIZATION_STATUS.name(),
555 							SynchronizationStatus.DIRTY.getValue(),
556 							SynchronizationStatus.READY_TO_SYNCHRONIZE.getValue()));
557 		}
558 
559 		// where: by default, always omit 'deleted' data
560 		else {
561 			queryBuilder.addWhere(SynchroQueryOperator.AND,
562 					String.format("%s <> '%s'", DatabaseColumns.SYNCHRONIZATION_STATUS.name(), SynchronizationStatus.DELETED.getValue()));
563 		}
564 
565 		// where: date filter (see mantis #27242)
566 		String dateFilter = createDateFilter();
567 		if (StringUtils.isNotBlank(dateFilter)) {
568 			// Apply date filter
569 			queryBuilder.addWhere(SynchroQueryOperator.AND, dateFilter);
570 		}
571 
572 		return queryBuilder.build();
573 	}
574 
575 	/**
576 	 * <p>
577 	 * createProgramCodesFilter.
578 	 * </p>
579 	 *
580 	 * @param stringToFormat
581 	 *            a {@link java.lang.String} object.
582 	 * @return a {@link java.lang.String} object.
583 	 */
584 	protected String createProgramCodesFilter(String stringToFormat) {
585 		Set<String> programCodes = getConfig().getProgramCodes();
586 		if (CollectionUtils.isEmpty(programCodes)) {
587 			return null;
588 		}
589 		return String.format(stringToFormat,
590 				"'"
591 						+ Joiner.on("','").join(programCodes)
592 						+ "'");
593 	}
594 
595 	/**
596 	 * <p>
597 	 * createPkFilter.
598 	 * </p>
599 	 *
600 	 * @param table
601 	 *            a {@link fr.ifremer.common.synchro.meta.SynchroTableMetadata} object.
602 	 * @return a {@link java.lang.String} object.
603 	 */
604 	protected String createPkFilter(SynchroTableMetadata table) {
605 		if (getConfig().getPkIncludes() == null
606 				|| getConfig().getPkIncludes().isEmpty()) {
607 			return null;
608 		}
609 
610 		if (table.getPkNames().size() != 1) {
611 			return null;
612 		}
613 
614 		String pkName = table.getPkNames().iterator().next();
615 
616 		Collection<String> pkStrs = getConfig()
617 				.getPkIncludes()
618 				.get(table.getName().toUpperCase());
619 
620 		if (CollectionUtils.isEmpty(pkStrs)) {
621 			// There is filter by PK, but not on this table
622 			// so exclude this table
623 			return "1=2";
624 		}
625 
626 		StringBuilder inBuilder = new StringBuilder();
627 		for (String pkStr : pkStrs) {
628 			inBuilder.append(',').append(pkStr);
629 		}
630 
631 		return String.format("t.%s IN (%s)",
632 				pkName,
633 				inBuilder.substring(1)
634 		);
635 	}
636 
637 	/**
638 	 * <p>
639 	 * createDateFilter.
640 	 * </p>
641 	 *
642 	 * @return a {@link java.lang.String} object.
643 	 */
644 	protected String createDateFilter() {
645 		DataSynchroDatabaseConfiguration config = getConfig();
646 		SynchroDateOperatorVO operator = config.getDateOperator();
647 		Date startDate = config.getDataStartDate();
648 
649 		if (operator == null || startDate == null) {
650 			return null;
651 		}
652 
653 		if (operator == SynchroDateOperatorVO.BETWEEN) {
654 			Date endDate = config.getDataEndDate();
655 			if (endDate == null) {
656 				return null;
657 			}
658 
659 			return String.format("t.%s  >= :startDate AND t.%s <= :endDate",
660 					DatabaseColumns.SURVEY_DT.name(),
661 					DatabaseColumns.SURVEY_DT.name());
662 		}
663 
664 		String operationStr;
665 		switch (operator) {
666 			case EQUALS: // =
667 				operationStr = "=";
668 				break;
669 			case BEFORE: // <
670 				operationStr = "<";
671 				break;
672 			case BEFORE_OR_EQUALS: // <=
673 				operationStr = "<=";
674 				break;
675 			case AFTER: // >
676 				operationStr = ">";
677 				break;
678 			case AFTER_OR_EQUALS: // >=
679 				operationStr = ">=";
680 				break;
681 			default:
682 				return null;
683 		}
684 
685 		return String.format("t.%s  %s :startDate",
686 				DatabaseColumns.SURVEY_DT.name(),
687 				operationStr);
688 	}
689 
690 	/**
691 	 * <p>
692 	 * addDebugRestriction.
693 	 * </p>
694 	 *
695 	 * @param sql
696 	 *            a {@link java.lang.String} object.
697 	 * @param surveyFilteredColumnName
698 	 *            a {@link java.lang.String} object.
699 	 * @return a {@link java.lang.String} object.
700 	 */
701 	protected String addDebugRestriction(String sql, String surveyFilteredColumnName) {
702 
703 		String idIncludes = Quadrige2Configuration.getInstance().getApplicationConfig().getOption("quadrige2.synchro.import.survey.includes");
704 		if (StringUtils.isBlank(idIncludes)) {
705 			return sql;
706 		}
707 
708 		SynchroQueryBuilder queryBuilder = SynchroQueryBuilder.newBuilder(sql);
709 
710 		// IDs to include
711 		queryBuilder.addWhere(SynchroQueryOperator.AND, String.format("t.%s in (%s)",
712 				surveyFilteredColumnName,
713 				idIncludes));
714 
715 		return queryBuilder.build();
716 	}
717 
718 	protected String createSelectProgCdsBySurveyIdSql() {
719 		return "select distinct sp.PROG_CD"
720 				+ " from SURVEY_PROG sp"
721 				+ " inner join SURVEY t on t.SURVEY_ID=sp.SURVEY_ID"
722 				+ " where t.SURVEY_ID=?";
723 	}
724 
725 	protected String createSelectProgCdsBySimilarSurveySql() {
726 		// Attention, il ne faut pas faire une simple requete sur SURVEY_PROG, car le paramètre de la requete (un
727 		// survey_id)
728 		// sera le premier doublon détecté dans SURVEY HORS programme (sur le reste de la clef thématique).
729 		// Il existe potentiellement plus de doublon que ce cas ci. Il faut donc considérer toutes les ligne de SURVEY
730 		// qui sont
731 		// avec le même [SURVEY_DT,SURVEY_TIME,MON_LOC_ID] que ce premier doublon potentiel.
732 		return "select distinct sp.PROG_CD, sp.SURVEY_ID"
733 				+ " from SURVEY t1,"
734 				+ " SURVEY t2"
735 				+ " inner join SURVEY_PROG sp on t2.SURVEY_ID=sp.SURVEY_ID"
736 				+ " where t1.SURVEY_ID=?"
737 				+ " AND t1.SURVEY_DT=t2.SURVEY_DT"
738 				+ " AND t1.MON_LOC_ID=t2.MON_LOC_ID"
739 				+ " AND ((t1.SURVEY_TIME IS NULL AND t2.SURVEY_TIME IS NULL) OR (t1.SURVEY_TIME=t1.SURVEY_TIME))";
740 	}
741 }