1 package fr.ifremer.quadrige3.synchro.intercept.data;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Sets;
28 import fr.ifremer.common.synchro.dao.Daos;
29 import fr.ifremer.common.synchro.dao.SynchroTableDao;
30 import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
31 import fr.ifremer.common.synchro.intercept.SynchroMissingForeignKeyException;
32 import fr.ifremer.common.synchro.intercept.SynchroOperationRepository;
33 import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
34 import fr.ifremer.common.synchro.service.RejectedRow;
35 import fr.ifremer.quadrige3.synchro.meta.DatabaseColumns;
36 import fr.ifremer.quadrige3.synchro.meta.data.DataSynchroTables;
37 import fr.ifremer.quadrige3.synchro.meta.referential.ReferentialSynchroTables;
38 import fr.ifremer.quadrige3.synchro.service.data.DataSynchroDatabaseConfiguration;
39 import fr.ifremer.quadrige3.synchro.vo.SynchroChangesVO;
40 import org.apache.commons.collections4.CollectionUtils;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.hibernate.tool.hbm2ddl.TableMetadata;
44
45 import java.io.IOException;
46 import java.sql.PreparedStatement;
47 import java.sql.ResultSet;
48 import java.sql.SQLException;
49 import java.util.ArrayList;
50 import java.util.List;
51 import java.util.StringJoiner;
52
53
54
55
56
57
58
59 public class ChangeLogInterceptor extends AbstractDataInterceptor {
60
61 private static final Log log = LogFactory.getLog(ChangeLogInterceptor.class);
62
63 private static final boolean debug = log.isDebugEnabled();
64
65 private boolean isEnable = false;
66
67 private SynchroChangesVO changeLog = null;
68
69 private PreparedStatement selectSourceSurveyProgCdsStatement;
70 private PreparedStatement countSourceProgCdStatement;
71 private PreparedStatement countTargetProgCdStatement;
72
73
74
75
76
77
78 public ChangeLogInterceptor() {
79 super(Sets.newHashSet(DataSynchroTables.SURVEY.name()));
80 }
81
82
83 @Override
84 protected void init(DataSynchroDatabaseConfiguration config) {
85 super.init(config);
86 isEnable = isChangeLogEnable(config);
87 setEnableOnWrite(isEnable);
88 }
89
90
91 @Override
92 public SynchroInterceptorBase clone() {
93 ChangeLogInterceptor result = (ChangeLogInterceptor) super.clone();
94 result.isEnable = isEnable;
95 return result;
96 }
97
98
99 @Override
100 public boolean doApply(SynchroDatabaseMetadata meta, TableMetadata table) {
101 return isEnable;
102 }
103
104
105 @Override
106 protected void doOnWrite(Object[] data,
107 List<Object> pk,
108 SynchroTableDao sourceDao,
109 SynchroTableDao targetDao,
110 SynchroOperationRepository buffer,
111 boolean insert) throws SQLException {
112 String tableName = sourceDao.getTable().getName();
113
114
115 if (pk == null || insert) {
116
117 pk = sourceDao.getPk(data);
118 }
119
120 if (changeLog == null) {
121 changeLog = new SynchroChangesVO();
122 }
123
124
125 if (insert) {
126 if (debug) {
127 log.debug(String.format("[%s] detect insert - pk %s", tableName, pk));
128 }
129
130
131 boolean rejected = false;
132 List<String> missingTargetProgCds = new ArrayList<>();
133 if (tableName.equalsIgnoreCase(DataSynchroTables.SURVEY.name())) {
134
135 Number surveyId = (Number)CollectionUtils.extractSingleton(pk);
136
137 List<String> surveyProgCds = getSourceSurveyProgCds(sourceDao, surveyId);
138
139 missingTargetProgCds = getTargetNotExistingProgCds(targetDao, surveyProgCds);
140
141
142 if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
143
144
145 missingTargetProgCds = getSourceNotExistingProgCds(sourceDao, missingTargetProgCds);
146
147 if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
148 for (String missingProgCd: missingTargetProgCds) {
149 changeLog.addReject(tableName, RejectedRow.newBuilder(RejectedRow.Cause.MISSING_FOREIGN_KEY)
150 .setSourcePkStr(pk)
151 .setTargetPkStr(pk)
152 .setMissingFk(DatabaseColumns.PROG_CD.name(), missingProgCd)
153 .build());
154 rejected = true;
155 }
156
157 }
158 }
159 }
160
161 if (!rejected) {
162 changeLog.addInsert(tableName, pk);
163 } else {
164 StringJoiner joiner = new StringJoiner(",");
165 missingTargetProgCds.forEach(joiner::add);
166 throw new SynchroMissingForeignKeyException(tableName, pk, pk, DatabaseColumns.PROG_CD.name(), joiner.toString());
167 }
168 }
169 else {
170 if (debug) {
171 log.debug(String.format("[%s] detect update - pk %s", tableName, pk));
172 }
173
174 changeLog.addUpdate(tableName, pk);
175 }
176 }
177
178
179 @Override
180 protected void doOnDelete(List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer) {
181 String tableName = sourceDao.getTable().getName();
182
183 if (changeLog == null) {
184 changeLog = new SynchroChangesVO();
185 }
186
187 if (debug) {
188 log.debug(String.format("[%s] detect delete - pk %s", tableName, pk));
189 }
190
191
192 changeLog.addDelete(tableName, pk);
193 }
194
195
196 @Override
197 protected void doClose() throws IOException {
198 super.doClose();
199
200
201 if (changeLog != null) {
202 changeLog.writeToFile(getConfig().getChangeLogFile(), true );
203 }
204
205 Daos.closeSilently(selectSourceSurveyProgCdsStatement);
206 selectSourceSurveyProgCdsStatement = null;
207
208 Daos.closeSilently(countSourceProgCdStatement);
209 countSourceProgCdStatement = null;
210
211 Daos.closeSilently(countTargetProgCdStatement);
212 countTargetProgCdStatement = null;
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226 private boolean isChangeLogEnable(DataSynchroDatabaseConfiguration config) {
227 return config.getChangeLogFile() != null;
228 }
229
230 private List<String> getSourceSurveyProgCds(SynchroTableDao sourceDao, Number surveyId) throws SQLException {
231 if (selectSourceSurveyProgCdsStatement == null || selectSourceSurveyProgCdsStatement.isClosed()) {
232 selectSourceSurveyProgCdsStatement = sourceDao.getPreparedStatement(createSelectSurveyProgCdsSql());
233 }
234 selectSourceSurveyProgCdsStatement.setObject(1, surveyId);
235 ResultSet rs = selectSourceSurveyProgCdsStatement.executeQuery();
236 List<String> result = Lists.newArrayList();
237 while (rs.next()) {
238 result.add(rs.getString(1));
239 }
240 rs.close();
241 return result;
242 }
243
244 private List<String> getTargetNotExistingProgCds(SynchroTableDao sourceDao, List<String> progCds) throws SQLException {
245 if (countTargetProgCdStatement == null || countTargetProgCdStatement.isClosed()) {
246 countTargetProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
247 }
248
249 return getNotExistingProgCds(countTargetProgCdStatement, progCds);
250 }
251
252 private List<String> getSourceNotExistingProgCds(final SynchroTableDao sourceDao, final List<String> progCds) throws SQLException {
253 if (countSourceProgCdStatement == null || countSourceProgCdStatement.isClosed()) {
254 countSourceProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
255 }
256 return getNotExistingProgCds(countSourceProgCdStatement, progCds);
257 }
258
259
260 private List<String> getNotExistingProgCds(final PreparedStatement countProgCdStatement, final List<String> progCds) throws SQLException {
261 List<String> notFoundPrograms = Lists.newArrayList();
262 for (String progCd: progCds) {
263 countProgCdStatement.setString(1, progCd);
264 ResultSet rs = countProgCdStatement.executeQuery();
265 rs.next();
266 long count = rs.getLong(1);
267 if (count != 1) {
268 notFoundPrograms.add(progCd);
269 }
270 rs.close();
271 }
272 return notFoundPrograms;
273 }
274
275 private String createSelectSurveyProgCdsSql() {
276 return String.format("SELECT %s FROM %s WHERE %s = ?",
277 DatabaseColumns.PROG_CD,
278 DataSynchroTables.SURVEY_PROG,
279 DatabaseColumns.SURVEY_ID);
280 }
281
282 private String createCountProgCdSql() {
283 return String.format("SELECT count(*) from %s WHERE %s = ?",
284 ReferentialSynchroTables.PROGRAMME,
285 DatabaseColumns.PROG_CD);
286 }
287
288 }