View Javadoc
1   package fr.ifremer.quadrige3.synchro.intercept.data;
2   
3   /*-
4    * #%L
5    * Quadrige3 Core :: Quadrige3 Synchro Core
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2017 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   * 
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   * 
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import com.google.common.collect.Lists;
27  import com.google.common.collect.Sets;
28  import fr.ifremer.common.synchro.dao.Daos;
29  import fr.ifremer.common.synchro.dao.SynchroTableDao;
30  import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
31  import fr.ifremer.common.synchro.intercept.SynchroMissingForeignKeyException;
32  import fr.ifremer.common.synchro.intercept.SynchroOperationRepository;
33  import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
34  import fr.ifremer.common.synchro.service.RejectedRow;
35  import fr.ifremer.quadrige3.synchro.meta.DatabaseColumns;
36  import fr.ifremer.quadrige3.synchro.meta.data.DataSynchroTables;
37  import fr.ifremer.quadrige3.synchro.meta.referential.ReferentialSynchroTables;
38  import fr.ifremer.quadrige3.synchro.service.data.DataSynchroDatabaseConfiguration;
39  import fr.ifremer.quadrige3.synchro.vo.SynchroChangesVO;
40  import org.apache.commons.collections4.CollectionUtils;
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.hibernate.tool.hbm2ddl.TableMetadata;
44  
45  import java.io.IOException;
46  import java.sql.PreparedStatement;
47  import java.sql.ResultSet;
48  import java.sql.SQLException;
49  import java.util.ArrayList;
50  import java.util.List;
51  import java.util.StringJoiner;
52  
53  /**
54   * Listen all changes on table SURVEY, and store changes onto a file (see DataSynchroContext.getChangeLogFile())
55   *
56   * @author Benoit Lavenier <benoit.lavenier@e-is.pro>
57   * @since 1.0
58   */
59  public class ChangeLogInterceptor extends AbstractDataInterceptor {
60  
61  	private static final Log log = LogFactory.getLog(ChangeLogInterceptor.class);
62  
63  	private static final boolean debug = log.isDebugEnabled();
64  
65  	private boolean isEnable = false;
66  
67  	private SynchroChangesVO changeLog = null;
68  
69  	private PreparedStatement selectSourceSurveyProgCdsStatement;
70  	private PreparedStatement countSourceProgCdStatement;
71  	private PreparedStatement countTargetProgCdStatement;
72  
73  	/**
74  	 * <p>
75  	 * Constructor for ChangeLogInterceptor.
76  	 * </p>
77  	 */
78  	public ChangeLogInterceptor() {
79  		super(Sets.newHashSet(DataSynchroTables.SURVEY.name()));
80  	}
81  
82  	/** {@inheritDoc} */
83  	@Override
84  	protected void init(DataSynchroDatabaseConfiguration config) {
85  		super.init(config);
86  		isEnable = isChangeLogEnable(config);
87  		setEnableOnWrite(isEnable);
88  	}
89  
90  	/** {@inheritDoc} */
91  	@Override
92  	public SynchroInterceptorBase clone() {
93  		ChangeLogInterceptor result = (ChangeLogInterceptor) super.clone();
94  		result.isEnable = isEnable;
95  		return result;
96  	}
97  
98  	/** {@inheritDoc} */
99  	@Override
100 	public boolean doApply(SynchroDatabaseMetadata meta, TableMetadata table) {
101 		return isEnable;
102 	}
103 
104 	/** {@inheritDoc} */
105 	@Override
106 	protected void doOnWrite(Object[] data,
107 							 List<Object> pk,
108 							 SynchroTableDao sourceDao,
109 							 SynchroTableDao targetDao,
110 							 SynchroOperationRepository buffer,
111 							 boolean insert) throws SQLException {
112 		String tableName = sourceDao.getTable().getName();
113 
114 		// log changes
115 		if (pk == null || insert) {
116 			// always get the source when insert (see Mantis #0029876)
117 			pk = sourceDao.getPk(data);
118 		}
119 
120 		if (changeLog == null) {
121 			changeLog = new SynchroChangesVO();
122 		}
123 
124 		// Add this changes to result
125 		if (insert) {
126 			if (debug) {
127 				log.debug(String.format("[%s] detect insert - pk %s", tableName, pk));
128 			}
129 
130 			// Detect programs that not exists on target DB - mantis #37518
131 			boolean rejected = false;
132             List<String> missingTargetProgCds = new ArrayList<>();
133 			if (tableName.equalsIgnoreCase(DataSynchroTables.SURVEY.name())) {
134 
135 				Number surveyId = (Number)CollectionUtils.extractSingleton(pk);
136 
137 				List<String> surveyProgCds = getSourceSurveyProgCds(sourceDao, surveyId);
138 
139 				missingTargetProgCds = getTargetNotExistingProgCds(targetDao, surveyProgCds);
140 
141 				// Missing prog in target
142 				if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
143 
144 					// Source DB (file) may have this missing programs...
145 					missingTargetProgCds = getSourceNotExistingProgCds(sourceDao, missingTargetProgCds);
146 
147 					if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
148 						for (String missingProgCd: missingTargetProgCds) {
149 							changeLog.addReject(tableName, RejectedRow.newBuilder(RejectedRow.Cause.MISSING_FOREIGN_KEY)
150 									.setSourcePkStr(pk)
151 									.setTargetPkStr(pk)
152 									.setMissingFk(DatabaseColumns.PROG_CD.name(), missingProgCd)
153 									.build());
154 							rejected = true;
155 						}
156 
157 					}
158 				}
159 			}
160 
161 			if (!rejected) {
162 				changeLog.addInsert(tableName, pk);
163 			} else {
164 				StringJoiner joiner = new StringJoiner(",");
165 				missingTargetProgCds.forEach(joiner::add);
166 				throw new SynchroMissingForeignKeyException(tableName, pk, pk, DatabaseColumns.PROG_CD.name(), joiner.toString());
167 			}
168 		}
169 		else {
170 			if (debug) {
171 				log.debug(String.format("[%s] detect update - pk %s", tableName, pk));
172 			}
173 
174 			changeLog.addUpdate(tableName, pk);
175 		}
176 	}
177 
178 	/** {@inheritDoc} */
179 	@Override
180 	protected void doOnDelete(List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer) {
181 		String tableName = sourceDao.getTable().getName();
182 
183 		if (changeLog == null) {
184 			changeLog = new SynchroChangesVO();
185 		}
186 
187 		if (debug) {
188 			log.debug(String.format("[%s] detect delete - pk %s", tableName, pk));
189 		}
190 
191 		// Add this changes to result
192 		changeLog.addDelete(tableName, pk);
193 	}
194 
195 	/** {@inheritDoc} */
196 	@Override
197 	protected void doClose() throws IOException {
198 		super.doClose();
199 
200 		// Save result to file
201 		if (changeLog != null) {
202 			changeLog.writeToFile(getConfig().getChangeLogFile(), true /* append */);
203 		}
204 
205 		Daos.closeSilently(selectSourceSurveyProgCdsStatement);
206 		selectSourceSurveyProgCdsStatement = null;
207 
208 		Daos.closeSilently(countSourceProgCdStatement);
209 		countSourceProgCdStatement = null;
210 
211 		Daos.closeSilently(countTargetProgCdStatement);
212 		countTargetProgCdStatement = null;
213 	}
214 
215 	/* -- Internal methods -- */
216 
217 	/**
218 	 * <p>
219 	 * isChangeLogEnable.
220 	 * </p>
221 	 *
222 	 * @param config
223 	 *            a {@link fr.ifremer.quadrige3.synchro.service.data.DataSynchroDatabaseConfiguration} object.
224 	 * @return a boolean.
225 	 */
226 	private boolean isChangeLogEnable(DataSynchroDatabaseConfiguration config) {
227 		return config.getChangeLogFile() != null;
228 	}
229 
230 	private List<String> getSourceSurveyProgCds(SynchroTableDao sourceDao, Number surveyId) throws SQLException {
231 		if (selectSourceSurveyProgCdsStatement == null || selectSourceSurveyProgCdsStatement.isClosed()) {
232 			selectSourceSurveyProgCdsStatement = sourceDao.getPreparedStatement(createSelectSurveyProgCdsSql());
233 		}
234 		selectSourceSurveyProgCdsStatement.setObject(1, surveyId);
235 		ResultSet rs = selectSourceSurveyProgCdsStatement.executeQuery();
236 		List<String> result = Lists.newArrayList();
237 		while (rs.next()) {
238 			result.add(rs.getString(1));
239 		}
240 		rs.close();
241 		return result;
242 	}
243 
244 	private List<String> getTargetNotExistingProgCds(SynchroTableDao sourceDao, List<String> progCds) throws SQLException {
245 		if (countTargetProgCdStatement == null || countTargetProgCdStatement.isClosed()) {
246 			countTargetProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
247 		}
248 
249 		return getNotExistingProgCds(countTargetProgCdStatement, progCds);
250 	}
251 
252 	private List<String> getSourceNotExistingProgCds(final SynchroTableDao sourceDao, final List<String> progCds) throws SQLException {
253 		if (countSourceProgCdStatement == null || countSourceProgCdStatement.isClosed()) {
254 			countSourceProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
255 		}
256 		return getNotExistingProgCds(countSourceProgCdStatement, progCds);
257 	}
258 
259 
260 	private List<String> getNotExistingProgCds(final PreparedStatement countProgCdStatement, final List<String> progCds) throws SQLException {
261 		List<String> notFoundPrograms = Lists.newArrayList();
262 		for (String progCd: progCds) {
263 			countProgCdStatement.setString(1, progCd);
264 			ResultSet rs = countProgCdStatement.executeQuery();
265 			rs.next();
266 			long count = rs.getLong(1);
267 			if (count != 1) {
268 				notFoundPrograms.add(progCd);
269 			}
270 			rs.close();
271 		}
272 		return notFoundPrograms;
273 	}
274 
275 	private String createSelectSurveyProgCdsSql() {
276 		return String.format("SELECT %s FROM %s WHERE %s = ?",
277 				DatabaseColumns.PROG_CD,
278 				DataSynchroTables.SURVEY_PROG,
279 				DatabaseColumns.SURVEY_ID);
280 	}
281 
282 	private String createCountProgCdSql() {
283 		return String.format("SELECT count(*) from %s WHERE %s = ?",
284 				ReferentialSynchroTables.PROGRAMME,
285 				DatabaseColumns.PROG_CD);
286 	}
287 
288 }