1 package fr.ifremer.quadrige3.synchro.intercept.data;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.eventbus.Subscribe;
28 import fr.ifremer.common.synchro.dao.SynchroBaseDao;
29 import fr.ifremer.common.synchro.dao.SynchroTableDao;
30 import fr.ifremer.common.synchro.dao.SynchroTableDaoImpl;
31 import fr.ifremer.common.synchro.intercept.SynchroDeletePreventedException;
32 import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
33 import fr.ifremer.common.synchro.intercept.SynchroOperationRepository;
34 import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
35 import fr.ifremer.common.synchro.meta.event.CreateQueryEvent;
36 import fr.ifremer.common.synchro.meta.event.LoadTableEvent;
37 import fr.ifremer.common.synchro.query.SynchroQueryBuilder;
38 import fr.ifremer.common.synchro.query.SynchroQueryOperator;
39 import fr.ifremer.quadrige3.core.dao.ObjectTypes;
40 import fr.ifremer.quadrige3.core.dao.referential.ObjectTypeCode;
41 import fr.ifremer.quadrige3.core.dao.technical.Daos;
42 import fr.ifremer.quadrige3.synchro.intercept.data.internal.ExportFkRemoteIdByObjectTypeInterceptor;
43 import fr.ifremer.quadrige3.synchro.intercept.data.internal.ImportRemoteIdByObjectTypeInterceptor;
44 import fr.ifremer.quadrige3.synchro.meta.DatabaseColumns;
45 import fr.ifremer.quadrige3.synchro.meta.data.DataSynchroTables;
46 import fr.ifremer.quadrige3.synchro.service.SynchroDirection;
47 import org.apache.commons.collections4.CollectionUtils;
48
49 import java.io.IOException;
50 import java.sql.PreparedStatement;
51 import java.sql.ResultSet;
52 import java.sql.SQLException;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.stream.Collectors;
56
57
58
59
60
61
62
63 public class QualificationHistoryInterceptor extends AbstractDataInterceptor {
64
65 protected Map<String, String> tableMap;
66 private int pkColIndex = -1;
67 private int objectTypeCodeColIndex = -1;
68 private int elementIdColIndex = -1;
69 private PreparedStatement selectObjectTypeCodeAndElementIdStatement;
70
71
72
73
74
75
76 public QualificationHistoryInterceptor() {
77 super(
78 DataSynchroTables.QUALIFICATION_HISTORY.name(),
79
80 SynchroDirection.IMPORT_TEMP2LOCAL,
81 SynchroDirection.IMPORT_FILE2LOCAL,
82
83 SynchroDirection.EXPORT_TEMP2SERVER,
84 SynchroDirection.EXPORT_LOCAL2TEMP,
85 SynchroDirection.EXPORT_LOCAL2FILE
86 );
87 setEnableOnWrite(true);
88 setEnableOnRead(true);
89
90 tableMap = ImmutableMap.of(
91 DataSynchroTables.SURVEY.name(), DatabaseColumns.SURVEY_ID.name(),
92 DataSynchroTables.SAMPLING_OPERATION.name(), DatabaseColumns.SAMPLING_OPER_ID.name(),
93 DataSynchroTables.MEASUREMENT.name(), DatabaseColumns.MEAS_ID.name(),
94 DataSynchroTables.TAXON_MEASUREMENT.name(), DatabaseColumns.TAXON_MEAS_ID.name(),
95 DataSynchroTables.PHOTO.name(), DatabaseColumns.PHOTO_ID.name()
96 );
97 }
98
99 @Override
100 public SynchroInterceptorBase clone() {
101 QualificationHistoryInterceptor clone = (QualificationHistoryInterceptor) super.clone();
102 clone.tableMap = this.tableMap;
103 clone.pkColIndex = this.pkColIndex;
104 clone.objectTypeCodeColIndex = this.objectTypeCodeColIndex;
105 clone.elementIdColIndex = this.elementIdColIndex;
106 return clone;
107 }
108
109
110
111
112
113
114
115
116 @Subscribe
117 public void handleQuery(CreateQueryEvent e) {
118
119 switch (e.queryName) {
120
121 case count:
122 case countFromUpdateDate:
123 case select:
124 case selectFromUpdateDate:
125 case selectMaxUpdateDate:
126
127 if (isInDirections(SynchroDirection.IMPORT_TEMP2LOCAL, SynchroDirection.IMPORT_FILE2LOCAL)) {
128 e.sql = addRestrictionsOnImport(e.sql);
129
130 } else if (isInDirections(SynchroDirection.EXPORT_LOCAL2TEMP, SynchroDirection.EXPORT_LOCAL2FILE, SynchroDirection.EXPORT_TEMP2SERVER)) {
131 e.sql = addRestrictionsOnExport(e.sql);
132 }
133 break;
134 default:
135 break;
136 }
137 }
138
139
140
141
142
143
144
145
146 @Subscribe
147 public void handleTableLoad(LoadTableEvent e) {
148
149 SynchroTableMetadata table = e.table;
150 pkColIndex = table.getSelectColumnIndex(table.getPkNames().iterator().next());
151 objectTypeCodeColIndex = table.getSelectColumnIndex(DatabaseColumns.OBJECT_TYPE_CD.name());
152 elementIdColIndex = table.getSelectColumnIndex(DatabaseColumns.QUAL_HIST_ELEMENT_ID.name());
153
154
155 if (isInDirections(SynchroDirection.IMPORT_TEMP2LOCAL)) {
156
157
158 ImportRemoteIdByObjectTypeInterceptor remoteIdInterceptor = new ImportRemoteIdByObjectTypeInterceptor(
159 getConfig(),
160 tableMap,
161 DatabaseColumns.QUAL_HIST_ELEMENT_ID.name(),
162 elementIdColIndex,
163 objectTypeCodeColIndex,
164 false);
165 table.addInterceptor(remoteIdInterceptor);
166 }
167
168
169 else if (isInDirections(SynchroDirection.EXPORT_TEMP2SERVER)) {
170
171
172 ExportFkRemoteIdByObjectTypeInterceptor remoteIdInterceptor = new ExportFkRemoteIdByObjectTypeInterceptor(
173 getConfig(),
174 tableMap,
175 DatabaseColumns.QUAL_HIST_ELEMENT_ID.name(),
176 elementIdColIndex,
177 objectTypeCodeColIndex,
178 false);
179 table.addInterceptor(remoteIdInterceptor);
180 }
181
182 }
183
184 @Override
185 protected void doOnRead(Object[] data, SynchroTableDao sourceDao, SynchroTableDao targetDao) throws SQLException {
186 if (isInDirections(SynchroDirection.IMPORT_FILE2LOCAL)) {
187
188 data[pkColIndex] = null;
189 }
190 }
191
192 @Override
193 protected void doOnWrite(Object[] data, List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer, boolean insert) throws SQLException {
194 if (isInDirections(SynchroDirection.IMPORT_FILE2LOCAL) && targetDao instanceof SynchroTableDaoImpl) {
195
196
197 List<Object> pks = ((SynchroTableDaoImpl) targetDao).generateNewPk();
198 data[pkColIndex] = pks.iterator().next();
199
200
201 if (data[elementIdColIndex] == null || data[objectTypeCodeColIndex] == null) {
202 return;
203 }
204
205 String objectTypeCode = (String) data[objectTypeCodeColIndex];
206 String tableName = ObjectTypes.getTableNameFromObjectType(objectTypeCode).toLowerCase();
207 if (!getConfig().getRemapPks().containsKey(tableName)) {
208 return;
209 }
210
211
212 String fkStr = data[elementIdColIndex].toString();
213
214
215 String localFkStr = getConfig().getRemapPks().get(tableName).get(fkStr);
216
217 if (localFkStr != null && !localFkStr.equals(fkStr)) {
218
219
220 data[elementIdColIndex] = Long.parseLong(localFkStr);
221 }
222
223 }
224 }
225
226 @Override
227 protected void doOnDelete(List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer) throws SQLException {
228 if (CollectionUtils.size(pk) != 1)
229 return;
230
231 Object id = pk.get(0);
232
233
234 if (isInDirections(SynchroDirection.EXPORT_TEMP2SERVER)) {
235 String objectTypeCode = getObjectTypeCode(targetDao, id);
236 if (!ObjectTypeCode.SURVEY.value().equalsIgnoreCase(objectTypeCode)) {
237 throw new SynchroDeletePreventedException();
238 }
239 }
240
241 else if (isInDirections(SynchroDirection.IMPORT_FILE2LOCAL)) {
242
243 Map<String, String> objectTypeCodeAndElementId = getObjectTypeCodeAndElementId(targetDao, id);
244 if (objectTypeCodeAndElementId.isEmpty()) {
245 return;
246 }
247 String objectTypeCode = objectTypeCodeAndElementId.keySet().iterator().next();
248 String elementId = objectTypeCodeAndElementId.get(objectTypeCode);
249 String tableName = ObjectTypes.getTableNameFromObjectType(objectTypeCode).toLowerCase();
250
251 if (getConfig().getRemapPks().containsKey(tableName) && getConfig().getRemapPks().get(tableName).containsKey(elementId)) {
252 throw new SynchroDeletePreventedException();
253 }
254 }
255
256
257 }
258
259 @Override
260 protected void doClose() throws IOException {
261 super.doClose();
262
263 Daos.closeSilently(selectObjectTypeCodeAndElementIdStatement);
264 selectObjectTypeCodeAndElementIdStatement = null;
265 }
266
267
268
269 private String getObjectTypeCode(SynchroBaseDao dao, Object pk) throws SQLException {
270 Map<String, String> result = getObjectTypeCodeAndElementId(dao, pk);
271 if (!result.isEmpty()) return result.keySet().iterator().next();
272 return null;
273 }
274
275 private Map<String, String> getObjectTypeCodeAndElementId(SynchroBaseDao dao, Object pk) throws SQLException {
276 if (selectObjectTypeCodeAndElementIdStatement == null || selectObjectTypeCodeAndElementIdStatement.isClosed()) {
277 selectObjectTypeCodeAndElementIdStatement = dao.getPreparedStatement(
278 String.format(
279 "SELECT %s,%s FROM %s WHERE QUAL_HIST_ID=?",
280 DatabaseColumns.OBJECT_TYPE_CD.name(),
281 DatabaseColumns.QUAL_HIST_ELEMENT_ID.name(),
282 DataSynchroTables.QUALIFICATION_HISTORY.name()
283 )
284 );
285 }
286 selectObjectTypeCodeAndElementIdStatement.setObject(1, pk);
287 try (ResultSet rs = selectObjectTypeCodeAndElementIdStatement.executeQuery()) {
288 rs.next();
289 String objectTypeCode = rs.getString(1);
290 String elementId = rs.getString(2);
291 if (objectTypeCode != null && elementId != null) {
292 return ImmutableMap.of(objectTypeCode, elementId);
293 }
294 return ImmutableMap.of();
295 }
296 }
297
298 protected String addRestrictionsOnImport(String sql) {
299
300 SynchroQueryBuilder query = SynchroQueryBuilder.newBuilder(sql);
301
302
303 query.addWhere(SynchroQueryOperator.AND, String.format("t.%s = '%s'",
304 DatabaseColumns.OBJECT_TYPE_CD,
305 ObjectTypeCode.SURVEY.value()));
306
307 return query.build();
308 }
309
310 protected String addRestrictionsOnExport(String sql) {
311
312 SynchroQueryBuilder query = SynchroQueryBuilder.newBuilder(sql);
313
314
315 query.addWhere(SynchroQueryOperator.AND, String.format("t.%s in (%s)",
316 DatabaseColumns.OBJECT_TYPE_CD,
317 tableMap.keySet().stream()
318 .flatMap(tableName -> ObjectTypes.getObjectTypeFromTableName(tableName).stream())
319 .collect(Collectors.joining("','", "'", "'")))
320 );
321
322 return query.build();
323 }
324
325 }