1 package fr.ifremer.quadrige2.synchro.intercept.data;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Sets;
28 import fr.ifremer.common.synchro.dao.Daos;
29 import fr.ifremer.common.synchro.dao.SynchroTableDao;
30 import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
31 import fr.ifremer.common.synchro.intercept.SynchroOperationRepository;
32 import fr.ifremer.common.synchro.meta.SynchroDatabaseMetadata;
33 import fr.ifremer.common.synchro.service.RejectedRow;
34 import fr.ifremer.quadrige2.synchro.meta.DatabaseColumns;
35 import fr.ifremer.quadrige2.synchro.meta.data.DataSynchroTables;
36 import fr.ifremer.quadrige2.synchro.meta.referential.ReferentialSynchroTables;
37 import fr.ifremer.quadrige2.synchro.service.data.DataSynchroDatabaseConfiguration;
38 import fr.ifremer.quadrige2.synchro.vo.SynchroChangesVO;
39 import org.apache.commons.collections4.CollectionUtils;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.hibernate.tool.hbm2ddl.TableMetadata;
43
44 import java.io.IOException;
45 import java.sql.PreparedStatement;
46 import java.sql.ResultSet;
47 import java.sql.SQLException;
48 import java.util.List;
49
50
51
52
53
54
55
56 public class ChangeLogInterceptor extends AbstractDataInterceptor {
57
58 private static final Log log = LogFactory.getLog(ChangeLogInterceptor.class);
59
60 private static boolean debug = log.isDebugEnabled();
61
62 private boolean isEnable = false;
63
64 private SynchroChangesVO changeLog = null;
65
66 private PreparedStatement selectSourceSurveyProgCdsStatement;
67 private PreparedStatement countSourceProgCdStatement;
68 private PreparedStatement countTargetProgCdStatement;
69
70
71
72
73
74
75 public ChangeLogInterceptor() {
76 super(Sets.newHashSet(DataSynchroTables.SURVEY.name()));
77 }
78
79
80 @Override
81 protected void init(DataSynchroDatabaseConfiguration config) {
82 super.init(config);
83 isEnable = isChangeLogEnable(config);
84 setEnableOnWrite(isEnable);
85 }
86
87
88 @Override
89 public SynchroInterceptorBase clone() {
90 ChangeLogInterceptor result = (ChangeLogInterceptor) super.clone();
91 result.isEnable = isEnable;
92 return result;
93 }
94
95
96 @Override
97 public boolean doApply(SynchroDatabaseMetadata meta, TableMetadata table) {
98 return isEnable;
99 }
100
101
102 @Override
103 protected void doOnWrite(Object[] data,
104 List<Object> pk,
105 SynchroTableDao sourceDao,
106 SynchroTableDao targetDao,
107 SynchroOperationRepository buffer,
108 boolean insert) throws SQLException {
109 String tableName = sourceDao.getTable().getName();
110
111
112 if (pk == null || insert) {
113
114 pk = sourceDao.getPk(data);
115 }
116
117 if (changeLog == null) {
118 changeLog = new SynchroChangesVO();
119 }
120
121
122 if (insert) {
123 if (debug) {
124 log.debug(String.format("[%s] detect insert - pk %s", tableName, pk));
125 }
126
127
128 boolean rejected = false;
129 if (insert && tableName.equalsIgnoreCase(DataSynchroTables.SURVEY.name())) {
130
131 Number surveyId = (Number)CollectionUtils.extractSingleton(pk);
132
133 List<String> surveyProgCds = getSourceSurveyProgCds(sourceDao, surveyId);
134
135 List<String> missingTargetProgCds = getTargetNotExistingProgCds(targetDao, surveyProgCds);
136
137
138 if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
139
140
141 missingTargetProgCds = getSourceNotExistingProgCds(sourceDao, missingTargetProgCds);
142
143 if (CollectionUtils.isNotEmpty(missingTargetProgCds)) {
144 for (String migginProgCd: missingTargetProgCds) {
145 changeLog.addReject(tableName, RejectedRow.newBuilder(RejectedRow.Cause.MISSING_FOREIGN_KEY)
146 .setSourcePkStr(pk)
147 .setTargetPkStr(pk)
148 .setMissingFk(DatabaseColumns.PROG_CD.name(), migginProgCd)
149 .build());
150 rejected = true;
151 }
152
153 }
154 }
155 }
156
157 if (!rejected) {
158 changeLog.addInsert(tableName, pk);
159 }
160 }
161 else {
162 if (debug) {
163 log.debug(String.format("[%s] detect update - pk %s", tableName, pk));
164 }
165
166 changeLog.addUpdate(tableName, pk);
167 }
168 }
169
170
171 @Override
172 protected void doOnDelete(List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer)
173 throws SQLException {
174 String tableName = sourceDao.getTable().getName();
175
176 if (changeLog == null) {
177 changeLog = new SynchroChangesVO();
178 }
179
180 if (debug) {
181 log.debug(String.format("[%s] detect delete - pk %s", tableName, pk));
182 }
183
184
185 changeLog.addDelete(tableName, pk);
186 }
187
188
189 @Override
190 protected void doClose() throws IOException {
191 super.doClose();
192
193
194 if (changeLog != null) {
195 changeLog.writeToFile(getConfig().getChangeLogFile(), true );
196 }
197
198 Daos.closeSilently(selectSourceSurveyProgCdsStatement);
199 selectSourceSurveyProgCdsStatement = null;
200
201 Daos.closeSilently(countSourceProgCdStatement);
202 countSourceProgCdStatement = null;
203
204 Daos.closeSilently(countTargetProgCdStatement);
205 countTargetProgCdStatement = null;
206 }
207
208
209
210
211
212
213
214
215
216
217
218
219 private boolean isChangeLogEnable(DataSynchroDatabaseConfiguration config) {
220 return config.getChangeLogFile() != null;
221 }
222
223 private List<String> getSourceSurveyProgCds(SynchroTableDao sourceDao, Number surveyId) throws SQLException {
224 if (selectSourceSurveyProgCdsStatement == null || selectSourceSurveyProgCdsStatement.isClosed()) {
225 selectSourceSurveyProgCdsStatement = sourceDao.getPreparedStatement(createSelectSurveyProgCdsSql());
226 }
227 selectSourceSurveyProgCdsStatement.setObject(1, surveyId);
228 ResultSet rs = selectSourceSurveyProgCdsStatement.executeQuery();
229 List<String> result = Lists.newArrayList();
230 while (rs.next()) {
231 result.add(rs.getString(1));
232 }
233 rs.close();
234 return result;
235 }
236
237 private List<String> getTargetNotExistingProgCds(SynchroTableDao sourceDao, List<String> progCds) throws SQLException {
238 if (countTargetProgCdStatement == null || countTargetProgCdStatement.isClosed()) {
239 countTargetProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
240 }
241
242 return getNotExistingProgCds(countTargetProgCdStatement, progCds);
243 }
244
245 private List<String> getSourceNotExistingProgCds(final SynchroTableDao sourceDao, final List<String> progCds) throws SQLException {
246 if (countSourceProgCdStatement == null || countSourceProgCdStatement.isClosed()) {
247 countSourceProgCdStatement = sourceDao.getPreparedStatement(createCountProgCdSql());
248 }
249 return getNotExistingProgCds(countSourceProgCdStatement, progCds);
250 }
251
252
253 private List<String> getNotExistingProgCds(final PreparedStatement countProgCdStatement, final List<String> progCds) throws SQLException {
254 List<String> notFoundPrograms = Lists.newArrayList();
255 for (String progCd: progCds) {
256 countProgCdStatement.setString(1, progCd);
257 ResultSet rs = countProgCdStatement.executeQuery();
258 rs.next();
259 long count = rs.getLong(1);
260 if (count != 1) {
261 notFoundPrograms.add(progCd);
262 }
263 rs.close();
264 }
265 return notFoundPrograms;
266 }
267
268 private String createSelectSurveyProgCdsSql() {
269 return String.format("SELECT %s FROM %s WHERE %s = ?",
270 DatabaseColumns.PROG_CD,
271 DataSynchroTables.SURVEY_PROG,
272 DatabaseColumns.SURVEY_ID);
273 }
274
275 private String createCountProgCdSql() {
276 return String.format("SELECT count(*) from %s WHERE %s = ?",
277 ReferentialSynchroTables.PROGRAMME,
278 DatabaseColumns.PROG_CD);
279 }
280
281 }