View Javadoc
1   package fr.ifremer.quadrige3.synchro.intercept.data;
2   
3   /*-
4    * #%L
5    * Quadrige3 Core :: Quadrige3 Synchro Core
6    * $Id:$
7    * $HeadURL:$
8    * %%
9    * Copyright (C) 2017 Ifremer
10   * %%
11   * This program is free software: you can redistribute it and/or modify
12   * it under the terms of the GNU Affero General Public License as published by
13   * the Free Software Foundation, either version 3 of the License, or
14   * (at your option) any later version.
15   *
16   * This program is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU General Public License for more details.
20   *
21   * You should have received a copy of the GNU Affero General Public License
22   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23   * #L%
24   */
25  
26  import com.google.common.collect.ImmutableMap;
27  import com.google.common.eventbus.Subscribe;
28  import fr.ifremer.common.synchro.dao.SynchroBaseDao;
29  import fr.ifremer.common.synchro.dao.SynchroTableDao;
30  import fr.ifremer.common.synchro.dao.SynchroTableDaoImpl;
31  import fr.ifremer.common.synchro.intercept.SynchroDeletePreventedException;
32  import fr.ifremer.common.synchro.intercept.SynchroInterceptorBase;
33  import fr.ifremer.common.synchro.intercept.SynchroOperationRepository;
34  import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
35  import fr.ifremer.common.synchro.meta.event.CreateQueryEvent;
36  import fr.ifremer.common.synchro.meta.event.LoadTableEvent;
37  import fr.ifremer.common.synchro.query.SynchroQueryBuilder;
38  import fr.ifremer.common.synchro.query.SynchroQueryOperator;
39  import fr.ifremer.quadrige3.core.dao.ObjectTypes;
40  import fr.ifremer.quadrige3.core.dao.referential.ObjectTypeCode;
41  import fr.ifremer.quadrige3.core.dao.technical.Daos;
42  import fr.ifremer.quadrige3.synchro.intercept.data.internal.ExportFkRemoteIdByObjectTypeInterceptor;
43  import fr.ifremer.quadrige3.synchro.intercept.data.internal.ImportRemoteIdByObjectTypeInterceptor;
44  import fr.ifremer.quadrige3.synchro.meta.DatabaseColumns;
45  import fr.ifremer.quadrige3.synchro.meta.data.DataSynchroTables;
46  import fr.ifremer.quadrige3.synchro.service.SynchroDirection;
47  import org.apache.commons.collections4.CollectionUtils;
48  
49  import java.io.IOException;
50  import java.sql.PreparedStatement;
51  import java.sql.ResultSet;
52  import java.sql.SQLException;
53  import java.util.List;
54  import java.util.Map;
55  import java.util.stream.Collectors;
56  
57  /**
58   * Manage columns rewriting ('id' AND 'remote_id') on table QUALIFICATION_HISTORY
59   *
60   * @author Benoit Lavenier <benoit.lavenier@e-is.pro>
61   * @since 1.0
62   */
63  public class QualificationHistoryInterceptor extends AbstractDataInterceptor {
64  
65      protected Map<String, String> tableMap;
66      private int pkColIndex = -1;
67      private int objectTypeCodeColIndex = -1;
68      private int elementIdColIndex = -1;
69      private PreparedStatement selectObjectTypeCodeAndElementIdStatement;
70  
71      /**
72       * <p>
73       * Constructor for QualificationHistoryInterceptor.
74       * </p>
75       */
76      public QualificationHistoryInterceptor() {
77          super(
78              DataSynchroTables.QUALIFICATION_HISTORY.name(),
79              // IMPORT:
80              SynchroDirection.IMPORT_TEMP2LOCAL,
81              SynchroDirection.IMPORT_FILE2LOCAL,
82              // EXPORT:
83              SynchroDirection.EXPORT_TEMP2SERVER,
84              SynchroDirection.EXPORT_LOCAL2TEMP,
85              SynchroDirection.EXPORT_LOCAL2FILE
86          );
87          setEnableOnWrite(true);
88          setEnableOnRead(true);
89  
90          tableMap = ImmutableMap.of(
91              DataSynchroTables.SURVEY.name(), DatabaseColumns.SURVEY_ID.name(),
92              DataSynchroTables.SAMPLING_OPERATION.name(), DatabaseColumns.SAMPLING_OPER_ID.name(),
93              DataSynchroTables.MEASUREMENT.name(), DatabaseColumns.MEAS_ID.name(),
94              DataSynchroTables.TAXON_MEASUREMENT.name(), DatabaseColumns.TAXON_MEAS_ID.name(),
95              DataSynchroTables.PHOTO.name(), DatabaseColumns.PHOTO_ID.name()
96          );
97      }
98  
99      @Override
100     public SynchroInterceptorBase clone() {
101         QualificationHistoryInterceptor clone = (QualificationHistoryInterceptor) super.clone();
102         clone.tableMap = this.tableMap;
103         clone.pkColIndex = this.pkColIndex;
104         clone.objectTypeCodeColIndex = this.objectTypeCodeColIndex;
105         clone.elementIdColIndex = this.elementIdColIndex;
106         return clone;
107     }
108 
109     /**
110      * <p>
111      * handleQuery.
112      * </p>
113      *
114      * @param e a {@link fr.ifremer.common.synchro.meta.event.CreateQueryEvent} object.
115      */
116     @Subscribe
117     public void handleQuery(CreateQueryEvent e) {
118 
119         switch (e.queryName) {
120             // Select queries :
121             case count:
122             case countFromUpdateDate:
123             case select:
124             case selectFromUpdateDate:
125             case selectMaxUpdateDate:
126 
127                 if (isInDirections(SynchroDirection.IMPORT_TEMP2LOCAL, SynchroDirection.IMPORT_FILE2LOCAL)) {
128                     e.sql = addRestrictionsOnImport(e.sql);
129 
130                 } else if (isInDirections(SynchroDirection.EXPORT_LOCAL2TEMP, SynchroDirection.EXPORT_LOCAL2FILE, SynchroDirection.EXPORT_TEMP2SERVER)) {
131                     e.sql = addRestrictionsOnExport(e.sql);
132                 }
133                 break;
134             default:
135                 break;
136         }
137     }
138 
139     /**
140      * <p>
141      * handleTableLoad.
142      * </p>
143      *
144      * @param e a {@link fr.ifremer.common.synchro.meta.event.LoadTableEvent} object.
145      */
146     @Subscribe
147     public void handleTableLoad(LoadTableEvent e) {
148 
149         SynchroTableMetadata table = e.table;
150         pkColIndex = table.getSelectColumnIndex(table.getPkNames().iterator().next());
151         objectTypeCodeColIndex = table.getSelectColumnIndex(DatabaseColumns.OBJECT_TYPE_CD.name());
152         elementIdColIndex = table.getSelectColumnIndex(DatabaseColumns.QUAL_HIST_ELEMENT_ID.name());
153 
154         // IMPORT: Temp DB -> Local DB
155         if (isInDirections(SynchroDirection.IMPORT_TEMP2LOCAL)) {
156 
157             // Create and configure a interceptor, to rewrite data remote ID into local ID
158             ImportRemoteIdByObjectTypeInterceptor remoteIdInterceptor = new ImportRemoteIdByObjectTypeInterceptor(
159                 getConfig(),
160                 tableMap,
161                 DatabaseColumns.QUAL_HIST_ELEMENT_ID.name(),
162                 elementIdColIndex,
163                 objectTypeCodeColIndex,
164                 false);
165             table.addInterceptor(remoteIdInterceptor);
166         }
167 
168         // EXPORT: Temp DB -> Server DB
169         else if (isInDirections(SynchroDirection.EXPORT_TEMP2SERVER)) {
170 
171             // Create and configure a interceptor, to rewrite data local ID into remote ID
172             ExportFkRemoteIdByObjectTypeInterceptor remoteIdInterceptor = new ExportFkRemoteIdByObjectTypeInterceptor(
173                 getConfig(),
174                 tableMap,
175                 DatabaseColumns.QUAL_HIST_ELEMENT_ID.name(),
176                 elementIdColIndex,
177                 objectTypeCodeColIndex,
178                 false);
179             table.addInterceptor(remoteIdInterceptor);
180         }
181 
182     }
183 
184     @Override
185     protected void doOnRead(Object[] data, SynchroTableDao sourceDao, SynchroTableDao targetDao) throws SQLException {
186         if (isInDirections(SynchroDirection.IMPORT_FILE2LOCAL)) {
187             // Reset primary key to allow insertion
188             data[pkColIndex] = null;
189         }
190     }
191 
192     @Override
193     protected void doOnWrite(Object[] data, List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer, boolean insert) throws SQLException {
194         if (isInDirections(SynchroDirection.IMPORT_FILE2LOCAL) && targetDao instanceof SynchroTableDaoImpl) {
195 
196             // Affect new PK
197             List<Object> pks = ((SynchroTableDaoImpl) targetDao).generateNewPk();
198             data[pkColIndex] = pks.iterator().next();
199 
200             // Try to remap data id when importing from file
201             if (data[elementIdColIndex] == null || data[objectTypeCodeColIndex] == null) {
202                 return;
203             }
204 
205             String objectTypeCode = (String) data[objectTypeCodeColIndex];
206             String tableName = ObjectTypes.getTableNameFromObjectType(objectTypeCode).toLowerCase();
207             if (!getConfig().getRemapPks().containsKey(tableName)) {
208                 return;
209             }
210 
211             // Get the source FK
212             String fkStr = data[elementIdColIndex].toString();
213 
214             // Try to get the corresponding local FK
215             String localFkStr = getConfig().getRemapPks().get(tableName).get(fkStr);
216 
217             if (localFkStr != null && !localFkStr.equals(fkStr)) {
218 
219                 // Replace this FK
220                 data[elementIdColIndex] = Long.parseLong(localFkStr);
221             }
222 
223         }
224     }
225 
226     @Override
227     protected void doOnDelete(List<Object> pk, SynchroTableDao sourceDao, SynchroTableDao targetDao, SynchroOperationRepository buffer) throws SQLException {
228         if (CollectionUtils.size(pk) != 1)
229             return;
230 
231         Object id = pk.get(0);
232 
233         // Don't delete any history except SURVEY (only export)
234         if (isInDirections(SynchroDirection.EXPORT_TEMP2SERVER)) {
235             String objectTypeCode = getObjectTypeCode(targetDao, id);
236             if (!ObjectTypeCode.SURVEY.value().equalsIgnoreCase(objectTypeCode)) {
237                 throw new SynchroDeletePreventedException();
238             }
239         }
240 
241         else if (isInDirections(SynchroDirection.IMPORT_FILE2LOCAL)) {
242             // Don't delete if trying to remove remapped data
243             Map<String, String> objectTypeCodeAndElementId = getObjectTypeCodeAndElementId(targetDao, id);
244             if (objectTypeCodeAndElementId.isEmpty()) {
245                 return;
246             }
247             String objectTypeCode = objectTypeCodeAndElementId.keySet().iterator().next();
248             String elementId = objectTypeCodeAndElementId.get(objectTypeCode);
249             String tableName = ObjectTypes.getTableNameFromObjectType(objectTypeCode).toLowerCase();
250 
251             if (getConfig().getRemapPks().containsKey(tableName) && getConfig().getRemapPks().get(tableName).containsKey(elementId)) {
252                 throw new SynchroDeletePreventedException();
253             }
254         }
255 
256         // Default: do delete
257     }
258 
259     @Override
260     protected void doClose() throws IOException {
261         super.doClose();
262 
263         Daos.closeSilently(selectObjectTypeCodeAndElementIdStatement);
264         selectObjectTypeCodeAndElementIdStatement = null;
265     }
266 
267     /* -- Internal methods -- */
268 
269     private String getObjectTypeCode(SynchroBaseDao dao, Object pk) throws SQLException {
270         Map<String, String> result = getObjectTypeCodeAndElementId(dao, pk);
271         if (!result.isEmpty()) return result.keySet().iterator().next();
272         return null;
273     }
274 
275     private Map<String, String> getObjectTypeCodeAndElementId(SynchroBaseDao dao, Object pk) throws SQLException {
276         if (selectObjectTypeCodeAndElementIdStatement == null || selectObjectTypeCodeAndElementIdStatement.isClosed()) {
277             selectObjectTypeCodeAndElementIdStatement = dao.getPreparedStatement(
278                 String.format(
279                     "SELECT %s,%s FROM %s WHERE QUAL_HIST_ID=?",
280                     DatabaseColumns.OBJECT_TYPE_CD.name(),
281                     DatabaseColumns.QUAL_HIST_ELEMENT_ID.name(),
282                     DataSynchroTables.QUALIFICATION_HISTORY.name()
283                 )
284             );
285         }
286         selectObjectTypeCodeAndElementIdStatement.setObject(1, pk);
287         try (ResultSet rs = selectObjectTypeCodeAndElementIdStatement.executeQuery()) {
288             rs.next();
289             String objectTypeCode = rs.getString(1);
290             String elementId = rs.getString(2);
291             if (objectTypeCode != null && elementId != null) {
292                 return ImmutableMap.of(objectTypeCode, elementId);
293             }
294             return ImmutableMap.of();
295         }
296     }
297 
298     protected String addRestrictionsOnImport(String sql) {
299 
300         SynchroQueryBuilder query = SynchroQueryBuilder.newBuilder(sql);
301 
302         // Limit to line on survey (object type code = PASS)
303         query.addWhere(SynchroQueryOperator.AND, String.format("t.%s = '%s'",
304             DatabaseColumns.OBJECT_TYPE_CD,
305             ObjectTypeCode.SURVEY.value()));
306 
307         return query.build();
308     }
309 
310     protected String addRestrictionsOnExport(String sql) {
311 
312         SynchroQueryBuilder query = SynchroQueryBuilder.newBuilder(sql);
313 
314         // Limit to line on table map
315         query.addWhere(SynchroQueryOperator.AND, String.format("t.%s in (%s)",
316             DatabaseColumns.OBJECT_TYPE_CD,
317             tableMap.keySet().stream()
318                 .flatMap(tableName -> ObjectTypes.getObjectTypeFromTableName(tableName).stream())
319                 .collect(Collectors.joining("','", "'", "'")))
320         );
321 
322         return query.build();
323     }
324 
325 }