View Javadoc
1   package fr.ifremer.quadrige2.synchro.intercept.data;
2   
3   /*-
4    * #%L
5    * Quadrige2 Core :: Quadrige2 Synchro Core
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2017 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import com.google.common.collect.Lists;
27  import com.google.common.collect.Sets;
28  import fr.ifremer.common.synchro.dao.Daos;
29  import fr.ifremer.common.synchro.dao.SynchroTableDao;
30  import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
31  import fr.ifremer.common.synchro.intercept.SynchroOperationRepository;
32  import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
33  import fr.ifremer.common.synchro.service.RejectedRow;
34  import fr.ifremer.quadrige2.synchro.meta.DatabaseColumns;
35  import fr.ifremer.quadrige2.synchro.meta.data.DataSynchroTables;
36  import fr.ifremer.quadrige2.synchro.meta.referential.ReferentialSynchroTables;
37  import fr.ifremer.quadrige2.synchro.service.data.DataSynchroDatabaseConfiguration;
38  import fr.ifremer.quadrige2.synchro.vo.SynchroChangesVO;
39  import org.apache.commons.collections4.CollectionUtils;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.hibernate.tool.hbm2ddl.TableMetadata;
43  
44  import java.io.IOException;
45  import java.sql.PreparedStatement;
46  import java.sql.ResultSet;
47  import java.sql.SQLException;
48  import java.util.List;
49  
50  /**
51   * Listen all changes on table SURVEY, and store changes onto a file (see DataSynchroContext.getChangeLogFile())
52   *
53   * @author Benoit Lavenier <benoit.lavenier@e-is.pro>
54   * @since 1.0
55   */
56  public class ChangeLogInterceptor extends AbstractDataInterceptor {
57  
58  	private static final Log log = LogFactory.getLog(ChangeLogInterceptor.class);
59  
60  	private static boolean debug = log.isDebugEnabled();
61  
62  	private boolean isEnable = false;
63  
64  	private SynchroChangesVO changeLog = null;
65  
66  	private PreparedStatement selectSourceSurveyProgCdsStatement;
67  	private PreparedStatement countSourceProgCdStatement;
68  	private PreparedStatement countTargetProgCdStatement;
69  
70  	/**
71  	 * <p>
72  	 * Constructor for ChangeLogInterceptor.
73  	 * </p>
74  	 */
75  	public ChangeLogInterceptor() {
76  		super(Sets.newHashSet(DataSynchroTables.SURVEY.name()));
77  	}
78  
79  	/** {@inheritDoc} */
80  	@Override
81  	protected void init(DataSynchroDatabaseConfiguration config) {
82  		super.init(config);
83  		isEnable = isChangeLogEnable(config);
84  		setEnableOnWrite(isEnable);
85  	}
86  
87  	/** {@inheritDoc} */
88  	@Override
89  	public SynchroInterceptorBase clone() {
90  		ChangeLogInterceptor result = (ChangeLogInterceptor) super.clone();
91  		result.isEnable = isEnable;
92  		return result;
93  	}
94  
95  	/** {@inheritDoc} */
96  	@Override
97  	public boolean doApply(SynchroDatabaseMetadata meta, TableMetadata table) {
98  		return isEnable;
99  	}
100 
101 	/** {@inheritDoc} */
102 	@Override
103 	protected void doOnWrite(Object[] data,
104 							 List<Object> pk,
105 							 SynchroTableDao sourceDao,
106 							 SynchroTableDao targetDao,
107 							 SynchroOperationRepository buffer,
108 							 boolean insert) throws SQLException {
109 		String tableName = sourceDao.getTable().getName();
110 
111 		// log changes
112 		if (pk == null || insert) {
113 			// always get the source when insert (see Mantis #0029876)
114 			pk = sourceDao.getPk(data);
115 		}
116 
117 		if (changeLog == null) {
118 			changeLog = new SynchroChangesVO();
119 		}
120 
121 		// Add this changes to result
122 		if (insert) {
123 			if (debug) {
124 				log.debug(String.format("[%s] detect insert - pk %s", tableName, pk));
125 			}
126 
127 			// Detect programs that not exists on target DB - mantis #37518
128 			boolean rejected = false;
129 			if (insert && tableName.equalsIgnoreCase(DataSynchroTables.SURVEY.name())) {
130 
131 				Number surveyId = (Number)CollectionUtils.extractSingleton(pk);
132 
133 				List<String> surveyProgCds = getSourceSurveyProgCds(sourceDao, surveyId);
134 
135 				List<String> missingTargetProgCds = getTargetNotExistingProgCds(targetDao, surveyProgCds);
136 
137 				// Missing prog in target
138 				if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
139 
140 					// Source DB (file) may have this missing programs...
141 					missingTargetProgCds = getSourceNotExistingProgCds(sourceDao, missingTargetProgCds);
142 
143 					if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
144 						for (String migginProgCd: missingTargetProgCds) {
145 							changeLog.addReject(tableName, RejectedRow.newBuilder(RejectedRow.Cause.MISSING_FOREIGN_KEY)
146 									.setSourcePkStr(pk)
147 									.setTargetPkStr(pk)
148 									.setMissingFk(DatabaseColumns.PROG_CD.name(), migginProgCd)
149 									.build());
150 							rejected = true;
151 						}
152 
153 					}
154 				}
155 			}
156 
157 			if (!rejected) {
158 				changeLog.addInsert(tableName, pk);
159 			}
160 		}
161 		else {
162 			if (debug) {
163 				log.debug(String.format("[%s] detect update - pk %s", tableName, pk));
164 			}
165 
166 			changeLog.addUpdate(tableName, pk);
167 		}
168 	}
169 
170 	/** {@inheritDoc} */
171 	@Override
172 	protected void doOnDelete(List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer)
173 			throws SQLException {
174 		String tableName = sourceDao.getTable().getName();
175 
176 		if (changeLog == null) {
177 			changeLog = new SynchroChangesVO();
178 		}
179 
180 		if (debug) {
181 			log.debug(String.format("[%s] detect delete - pk %s", tableName, pk));
182 		}
183 
184 		// Add this changes to result
185 		changeLog.addDelete(tableName, pk);
186 	}
187 
188 	/** {@inheritDoc} */
189 	@Override
190 	protected void doClose() throws IOException {
191 		super.doClose();
192 
193 		// Save result to file
194 		if (changeLog != null) {
195 			changeLog.writeToFile(getConfig().getChangeLogFile(), true /* append */);
196 		}
197 
198 		Daos.closeSilently(selectSourceSurveyProgCdsStatement);
199 		selectSourceSurveyProgCdsStatement = null;
200 
201 		Daos.closeSilently(countSourceProgCdStatement);
202 		countSourceProgCdStatement = null;
203 
204 		Daos.closeSilently(countTargetProgCdStatement);
205 		countTargetProgCdStatement = null;
206 	}
207 
208 	/* -- Internal methods -- */
209 
210 	/**
211 	 * <p>
212 	 * isChangeLogEnable.
213 	 * </p>
214 	 *
215 	 * @param config
216 	 *            a {@link fr.ifremer.quadrige2.synchro.service.data.DataSynchroDatabaseConfiguration} object.
217 	 * @return a boolean.
218 	 */
219 	private boolean isChangeLogEnable(DataSynchroDatabaseConfiguration config) {
220 		return config.getChangeLogFile() != null;
221 	}
222 
223 	private List<String> getSourceSurveyProgCds(SynchroTableDao sourceDao, Number surveyId) throws SQLException {
224 		if (selectSourceSurveyProgCdsStatement == null || selectSourceSurveyProgCdsStatement.isClosed()) {
225 			selectSourceSurveyProgCdsStatement = sourceDao.getPreparedStatement(createSelectSurveyProgCdsSql());
226 		}
227 		selectSourceSurveyProgCdsStatement.setObject(1, surveyId);
228 		ResultSet rs = selectSourceSurveyProgCdsStatement.executeQuery();
229 		List<String> result = Lists.newArrayList();
230 		while (rs.next()) {
231 			result.add(rs.getString(1));
232 		}
233 		rs.close();
234 		return result;
235 	}
236 
237 	private List<String> getTargetNotExistingProgCds(SynchroTableDao sourceDao, List<String> progCds) throws SQLException {
238 		if (countTargetProgCdStatement == null || countTargetProgCdStatement.isClosed()) {
239 			countTargetProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
240 		}
241 
242 		return getNotExistingProgCds(countTargetProgCdStatement, progCds);
243 	}
244 
245 	private List<String> getSourceNotExistingProgCds(final SynchroTableDao sourceDao, final List<String> progCds) throws SQLException {
246 		if (countSourceProgCdStatement == null || countSourceProgCdStatement.isClosed()) {
247 			countSourceProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
248 		}
249 		return getNotExistingProgCds(countSourceProgCdStatement, progCds);
250 	}
251 
252 
253 	private List<String> getNotExistingProgCds(final PreparedStatement countProgCdStatement, final List<String> progCds) throws SQLException {
254 		List<String> notFoundPrograms = Lists.newArrayList();
255 		for (String progCd: progCds) {
256 			countProgCdStatement.setString(1, progCd);
257 			ResultSet rs = countProgCdStatement.executeQuery();
258 			rs.next();
259 			long count = rs.getLong(1);
260 			if (count != 1) {
261 				notFoundPrograms.add(progCd);
262 			}
263 			rs.close();
264 		}
265 		return notFoundPrograms;
266 	}
267 
268 	private String createSelectSurveyProgCdsSql() {
269 		return String.format("SELECT %s FROM %s WHERE %s = ?",
270 				DatabaseColumns.PROG_CD,
271 				DataSynchroTables.SURVEY_PROG,
272 				DatabaseColumns.SURVEY_ID);
273 	}
274 
275 	private String createCountProgCdSql() {
276 		return String.format("SELECT count(*) from %s WHERE %s = ?",
277 				ReferentialSynchroTables.PROGRAMME,
278 				DatabaseColumns.PROG_CD);
279 	}
280 
281 }