1 package fr.ifremer.quadrige2.synchro.vo;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.base.Joiner;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Splitter;
29 import com.google.common.collect.*;
30 import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
31 import fr.ifremer.common.synchro.service.RejectedRow;
32 import fr.ifremer.quadrige2.core.exception.Quadrige2TechnicalException;
33 import org.apache.commons.collections4.CollectionUtils;
34 import org.apache.commons.io.IOUtils;
35 import org.apache.commons.lang3.StringUtils;
36
37 import java.io.*;
38 import java.util.*;
39 import java.util.regex.Matcher;
40 import java.util.regex.Pattern;
41
42
43
44
45 public class SynchroChangesVO {
46
47 private final static char PK_VALUE_SEPARATOR = '|';
48
49 private final static String INSERTS_SUFFIX = "inserts";
50 private final static String UPDATES_SUFFIX = "updates";
51 private final static String DELETES_SUFFIX = "deletes";
52 private final static String REJECTS_SUFFIX = "rejects";
53
54 private final static String READ_PROPERTY_REGEX = "^([^.]+)[.]([a-zA-Z]+)=(.+)$";
55
56 private final Multimap<String, String> inserts = LinkedHashMultimap.create();
57 private final Multimap<String, String> updates = LinkedHashMultimap.create();
58 private final Multimap<String, String> deletes = LinkedHashMultimap.create();
59
60 private Properties connectionProperties;
61
62
63
64
65
66
67
68
69
70
71
72 protected final Multimap<String, String> rejectedRows = LinkedHashMultimap.create();
73
74
75
76
77
78
79 protected final Set<String> tableNames = Sets.newHashSet();
80
81
82
83
84
85
86
87
88
89 public void addTableName(String tableName) {
90 tableNames.add(tableName);
91 }
92
93
94
95
96
97
98
99
100
101
102
103 public void addReject(String tableName, String... rowInfo) {
104 rejectedRows.put(tableName, Joiner.on(';').join(rowInfo));
105 }
106
107
108
109
110
111
112
113
114
115
116
117 public void addReject(String tableName, RejectedRow rejectedRow) {
118 rejectedRows.put(tableName, rejectedRow.toString());
119 }
120
121
122
123
124
125
126
127
128
129
130
131 public void addInsert(final String tableName, List<Object> pk) {
132 inserts.put(tableName, SynchroTableMetadata.toPkStr(pk));
133 }
134
135
136
137
138
139
140
141
142
143
144
145 public void addUpdate(final String tableName, List<Object> pk) {
146 updates.put(tableName, SynchroTableMetadata.toPkStr(pk));
147 }
148
149
150
151
152
153
154
155
156
157
158
159 public void addDelete(final String tableName, List<Object> pk) {
160 deletes.put(tableName, SynchroTableMetadata.toPkStr(pk));
161 }
162
163
164
165
166
167
168
169
170 public Set<String> getTableNames() {
171 return ImmutableSet.copyOf(tableNames);
172 }
173
174
175
176
177
178
179
180
181 public int getTotalTreated() {
182 return getTotalInserts() + getTotalUpdates() + getTotalDeletes() + getTotalRejects();
183 }
184
185
186
187
188
189
190
191
192 public int getTotalInserts() {
193 return inserts == null ? 0 : inserts.size();
194 }
195
196
197
198
199
200
201
202
203 public int getTotalUpdates() {
204 return updates == null ? 0 : updates.size();
205 }
206
207
208
209
210
211
212
213
214 public int getTotalDeletes() {
215 return deletes == null ? 0 : deletes.size();
216 }
217
218
219
220
221
222
223
224
225 public int getTotalRejects() {
226 return rejectedRows == null ? 0 : rejectedRows.size();
227 }
228
229
230
231
232
233
234
235
236
237
238 public int getNbRows(String tableName) {
239 return getNbInserts(tableName) + getNbUpdates(tableName) + getNbDeletes(tableName);
240 }
241
242
243
244
245
246
247
248
249
250
251 public int getNbInserts(String tableName) {
252 return inserts.get(tableName).size();
253 }
254
255
256
257
258
259
260
261
262
263
264 public int getNbUpdates(String tableName) {
265 return updates.get(tableName).size();
266 }
267
268
269
270
271
272
273
274
275
276
277 public int getNbDeletes(String tableName) {
278 return deletes.get(tableName).size();
279 }
280
281
282
283
284
285
286
287
288
289
290 public String getRejectedRows(String tableName) {
291 Collection<String> result = rejectedRows.get(tableName);
292 if (CollectionUtils.isEmpty(result)) {
293 return "";
294 }
295 StringBuilder sb = new StringBuilder();
296 for(String row: result) {
297 sb.append('\n').append(row);
298 }
299 return sb.substring(1);
300 }
301
302
303
304
305
306
307
308
309
310 public void addRejects(Map<String, String> rejectedRows) {
311 Preconditions.checkNotNull(rejectedRows);
312
313 for(String tableName: rejectedRows.keySet()) {
314 String[] rows = rejectedRows.get(tableName).split("\n");
315 for (String row: rows) {
316 this.rejectedRows.put(tableName, row);
317 }
318 }
319 tableNames.addAll(rejectedRows.keySet());
320 }
321
322
323
324
325
326
327
328
329
330
331 public Collection<String> getInserts(String tableName) {
332 return inserts.get(tableName);
333 }
334
335
336
337
338
339
340
341
342
343
344 public Collection<String> getUpdates(String tableName) {
345 return updates.get(tableName);
346 }
347
348
349
350
351
352
353
354
355
356
357 public Collection<String> getDeletes(String tableName) {
358 return deletes.get(tableName);
359 }
360
361
362
363
364
365
366
367
368 public Properties getConnectionProperties() {
369 return connectionProperties;
370 }
371
372
373
374
375
376
377
378
379
380 public void setConnectionProperties(Properties connectionProperties) {
381 this.connectionProperties = connectionProperties;
382 }
383
384
385
386
387
388
389
390
391
392
393
394 public void writeToFile(File changeLogFile, boolean append) {
395
396
397 if (isEmpty()) {
398 return;
399 }
400
401 Writer writer = null;
402 try {
403
404
405 writer = new BufferedWriter(new FileWriter(changeLogFile, append && changeLogFile.exists()));
406
407
408 if (!inserts.isEmpty()) {
409 writeToFile(writer, inserts, INSERTS_SUFFIX);
410 }
411
412
413 if (!updates.isEmpty()) {
414 writeToFile(writer, updates, UPDATES_SUFFIX);
415 }
416
417
418 if (!deletes.isEmpty()) {
419 writeToFile(writer, deletes, DELETES_SUFFIX);
420 }
421
422
423 if (!rejectedRows.isEmpty()) {
424 writeToFile(writer, rejectedRows, REJECTS_SUFFIX);
425 }
426 } catch (Exception e) {
427 throw new Quadrige2TechnicalException("Unable to write synchronization diff into file: " + changeLogFile.getPath(), e);
428 } finally {
429 IOUtils.closeQuietly(writer);
430 }
431 }
432
433
434
435
436
437
438
439
440
441 public void readFromFile(File changeLogFile) {
442 readFromFile(changeLogFile, true);
443 }
444
445
446
447
448
449
450
451
452
453 public void addFromFile(File changeLogFile) {
454 readFromFile(changeLogFile, false);
455 }
456
457
458
459
460
461
462
463
464 public boolean isEmpty() {
465 return inserts.isEmpty() && updates.isEmpty() && deletes.isEmpty() && rejectedRows.isEmpty();
466 }
467
468
469
470
471 public void clear() {
472 inserts.clear();
473 updates.clear();
474 deletes.clear();
475 rejectedRows.clear();
476 tableNames.clear();
477 }
478
479
480
481
482
483
484
485
486
487
488
489
490
491 protected void readFromFile(File changeLogFile, boolean clearBeforeRead) {
492
493
494 if (clearBeforeRead) {
495 clear();
496 }
497
498 BufferedReader reader = null;
499 try {
500
501
502 reader = new BufferedReader(new FileReader(changeLogFile));
503
504 Pattern propertyPattern = Pattern.compile(READ_PROPERTY_REGEX);
505 Splitter propertyValueSplitter = Splitter.on(PK_VALUE_SEPARATOR).omitEmptyStrings().trimResults();
506
507 String line = reader.readLine();
508 while (line != null) {
509
510
511 if (StringUtils.isNotBlank(line)) {
512 Matcher propertyMatcher = propertyPattern.matcher(line.trim());
513 if (propertyMatcher.matches()) {
514 String tableName = propertyMatcher.group(1);
515 String propertySuffix = propertyMatcher.group(2);
516 String propertyValue = propertyMatcher.group(3);
517
518 if (StringUtils.isNotBlank(propertyValue)) {
519 tableNames.add(tableName);
520
521 if (INSERTS_SUFFIX.equals(propertySuffix)) {
522 inserts.putAll(tableName, propertyValueSplitter.split(propertyValue));
523 }
524 else if (UPDATES_SUFFIX.equals(propertySuffix)) {
525 updates.putAll(tableName, propertyValueSplitter.split(propertyValue));
526 }
527 else if (DELETES_SUFFIX.equals(propertySuffix)) {
528 deletes.putAll(tableName, propertyValueSplitter.split(propertyValue));
529 }
530 else if (REJECTS_SUFFIX.equals(propertySuffix)) {
531 rejectedRows.putAll(tableName, propertyValueSplitter.split(propertyValue));
532 }
533 }
534
535 }
536 }
537
538
539 line = reader.readLine();
540
541 }
542 } catch (Exception e) {
543 throw new Quadrige2TechnicalException("Unable to write synchronization change log into file: " + changeLogFile.getPath(), e);
544 } finally {
545 IOUtils.closeQuietly(reader);
546 }
547 }
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563 protected void writeToFile(Writer writer, Multimap<String, String> pksByTableName, String propertySuffix) throws IOException {
564
565 StringBuilder sb = new StringBuilder();
566 for (String tableName : pksByTableName.keySet()) {
567
568
569 sb.setLength(0);
570 for (String pkStr : pksByTableName.get(tableName)) {
571 sb.append(PK_VALUE_SEPARATOR).append(pkStr);
572 }
573
574 writer.write(String.format("%s.%s=%s\n",
575 tableName,
576 propertySuffix,
577 sb.substring(1)
578 ));
579 }
580 }
581
582 }