1 package fr.ifremer.quadrige3.synchro.vo;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 import com.google.common.base.Joiner;
27 import com.google.common.base.Splitter;
28 import com.google.common.collect.ImmutableSet;
29 import com.google.common.collect.LinkedHashMultimap;
30 import com.google.common.collect.Multimap;
31 import com.google.common.collect.Sets;
32 import fr.ifremer.common.synchro.meta.SynchroTableMetadata;
33 import fr.ifremer.common.synchro.service.RejectedRow;
34 import fr.ifremer.quadrige3.core.dao.technical.Assert;
35 import fr.ifremer.quadrige3.core.exception.QuadrigeTechnicalException;
36 import org.apache.commons.collections4.CollectionUtils;
37 import org.apache.commons.io.IOUtils;
38 import org.apache.commons.lang3.StringUtils;
39
40 import java.io.*;
41 import java.util.*;
42 import java.util.regex.Matcher;
43 import java.util.regex.Pattern;
44
45
46
47
48 public class SynchroChangesVO {
49
50 private final static char PK_VALUE_SEPARATOR = '|';
51
52 private final static String INSERTS_SUFFIX = "inserts";
53 private final static String UPDATES_SUFFIX = "updates";
54 private final static String DELETES_SUFFIX = "deletes";
55 private final static String REJECTS_SUFFIX = "rejects";
56
57 private final static String READ_PROPERTY_REGEX = "^([^.]+)[.]([a-zA-Z]+)=(.+)$";
58
59 private final Multimap<String, String> inserts = LinkedHashMultimap.create();
60 private final Multimap<String, String> updates = LinkedHashMultimap.create();
61 private final Multimap<String, String> deletes = LinkedHashMultimap.create();
62
63 private Properties connectionProperties;
64
65
66
67
68
69
70
71
72
73
74
75 private final Multimap<String, String> rejectedRows = LinkedHashMultimap.create();
76
77
78
79
80
81
82 private final Set<String> tableNames = Sets.newHashSet();
83
84
85
86
87
88
89
90
91
92 public void addTableName(String tableName) {
93 tableNames.add(tableName);
94 }
95
96
97
98
99
100
101
102
103
104
105
106 public void addReject(String tableName, String... rowInfo) {
107 rejectedRows.put(tableName, Joiner.on(';').join(rowInfo));
108 }
109
110
111
112
113
114
115
116
117
118
119
120 public void addReject(String tableName, RejectedRow rejectedRow) {
121 rejectedRows.put(tableName, rejectedRow.toString());
122 }
123
124
125
126
127
128
129
130
131
132
133
134 public void addInsert(final String tableName, List<Object> pk) {
135 inserts.put(tableName, SynchroTableMetadata.toPkStr(pk));
136 }
137
138
139
140
141
142
143
144
145
146
147
148 public void addUpdate(final String tableName, List<Object> pk) {
149 updates.put(tableName, SynchroTableMetadata.toPkStr(pk));
150 }
151
152
153
154
155
156
157
158
159
160
161
162 public void addDelete(final String tableName, List<Object> pk) {
163 deletes.put(tableName, SynchroTableMetadata.toPkStr(pk));
164 }
165
166
167
168
169
170
171
172
173 public Set<String> getTableNames() {
174 return ImmutableSet.copyOf(tableNames);
175 }
176
177
178
179
180
181
182
183
184 public int getTotalTreated() {
185 return getTotalInserts() + getTotalUpdates() + getTotalDeletes() + getTotalRejects();
186 }
187
188
189
190
191
192
193
194
195 public int getTotalInserts() {
196 return inserts.size();
197 }
198
199
200
201
202
203
204
205
206 public int getTotalUpdates() {
207 return updates.size();
208 }
209
210
211
212
213
214
215
216
217 public int getTotalDeletes() {
218 return deletes.size();
219 }
220
221
222
223
224
225
226
227
228 public int getTotalRejects() {
229 return rejectedRows.size();
230 }
231
232
233
234
235
236
237
238
239
240
241 public int getNbRows(String tableName) {
242 return getNbInserts(tableName) + getNbUpdates(tableName) + getNbDeletes(tableName);
243 }
244
245
246
247
248
249
250
251
252
253
254 public int getNbInserts(String tableName) {
255 return inserts.get(tableName).size();
256 }
257
258
259
260
261
262
263
264
265
266
267 public int getNbUpdates(String tableName) {
268 return updates.get(tableName).size();
269 }
270
271
272
273
274
275
276
277
278
279
280 public int getNbDeletes(String tableName) {
281 return deletes.get(tableName).size();
282 }
283
284
285
286
287
288
289
290
291
292
293 public String getRejectedRows(String tableName) {
294 Collection<String> result = rejectedRows.get(tableName);
295 if (CollectionUtils.isEmpty(result)) {
296 return "";
297 }
298 StringBuilder sb = new StringBuilder();
299 for(String row: result) {
300 sb.append('\n').append(row);
301 }
302 return sb.substring(1);
303 }
304
305
306
307
308
309
310
311
312
313 public void addRejects(Map<String, String> rejectedRows) {
314 Assert.notNull(rejectedRows);
315
316 for(String tableName: rejectedRows.keySet()) {
317 String[] rows = rejectedRows.get(tableName).split("\n");
318 for (String row: rows) {
319 this.rejectedRows.put(tableName, row);
320 }
321 }
322 tableNames.addAll(rejectedRows.keySet());
323 }
324
325
326
327
328
329
330
331
332
333
334 public Collection<String> getInserts(String tableName) {
335 return inserts.get(tableName);
336 }
337
338
339
340
341
342
343
344
345
346
347 public Collection<String> getUpdates(String tableName) {
348 return updates.get(tableName);
349 }
350
351
352
353
354
355
356
357
358
359
360 public Collection<String> getDeletes(String tableName) {
361 return deletes.get(tableName);
362 }
363
364
365
366
367
368
369
370
371 public Properties getConnectionProperties() {
372 return connectionProperties;
373 }
374
375
376
377
378
379
380
381
382
383 public void setConnectionProperties(Properties connectionProperties) {
384 this.connectionProperties = connectionProperties;
385 }
386
387
388
389
390
391
392
393
394
395
396
397 public void writeToFile(File changeLogFile, boolean append) {
398
399
400 if (isEmpty()) {
401 return;
402 }
403
404 Writer writer = null;
405 try {
406
407
408 writer = new BufferedWriter(new FileWriter(changeLogFile, append && changeLogFile.exists()));
409
410
411 if (!inserts.isEmpty()) {
412 writeToFile(writer, inserts, INSERTS_SUFFIX);
413 }
414
415
416 if (!updates.isEmpty()) {
417 writeToFile(writer, updates, UPDATES_SUFFIX);
418 }
419
420
421 if (!deletes.isEmpty()) {
422 writeToFile(writer, deletes, DELETES_SUFFIX);
423 }
424
425
426 if (!rejectedRows.isEmpty()) {
427 writeToFile(writer, rejectedRows, REJECTS_SUFFIX);
428 }
429 } catch (Exception e) {
430 throw new QuadrigeTechnicalException("Unable to write synchronization diff into file: " + changeLogFile.getPath(), e);
431 } finally {
432 IOUtils.closeQuietly(writer);
433 }
434 }
435
436
437
438
439
440
441
442
443
444 public void readFromFile(File changeLogFile) {
445 readFromFile(changeLogFile, true);
446 }
447
448
449
450
451
452
453
454
455
456 public void addFromFile(File changeLogFile) {
457 readFromFile(changeLogFile, false);
458 }
459
460
461
462
463
464
465
466
467 public boolean isEmpty() {
468 return inserts.isEmpty() && updates.isEmpty() && deletes.isEmpty() && rejectedRows.isEmpty();
469 }
470
471
472
473
474 public void clear() {
475 inserts.clear();
476 updates.clear();
477 deletes.clear();
478 rejectedRows.clear();
479 tableNames.clear();
480 }
481
482
483
484
485
486
487
488
489
490
491
492
493
494 protected void readFromFile(File changeLogFile, boolean clearBeforeRead) {
495
496
497 if (clearBeforeRead) {
498 clear();
499 }
500
501 BufferedReader reader = null;
502 try {
503
504
505 reader = new BufferedReader(new FileReader(changeLogFile));
506
507 Pattern propertyPattern = Pattern.compile(READ_PROPERTY_REGEX);
508 Splitter propertyValueSplitter = Splitter.on(PK_VALUE_SEPARATOR).omitEmptyStrings().trimResults();
509
510 String line = reader.readLine();
511 while (line != null) {
512
513
514 if (StringUtils.isNotBlank(line)) {
515 Matcher propertyMatcher = propertyPattern.matcher(line.trim());
516 if (propertyMatcher.matches()) {
517 String tableName = propertyMatcher.group(1);
518 String propertySuffix = propertyMatcher.group(2);
519 String propertyValue = propertyMatcher.group(3);
520
521 if (StringUtils.isNotBlank(propertyValue)) {
522 tableNames.add(tableName);
523
524 if (INSERTS_SUFFIX.equals(propertySuffix)) {
525 inserts.putAll(tableName, propertyValueSplitter.split(propertyValue));
526 }
527 else if (UPDATES_SUFFIX.equals(propertySuffix)) {
528 updates.putAll(tableName, propertyValueSplitter.split(propertyValue));
529 }
530 else if (DELETES_SUFFIX.equals(propertySuffix)) {
531 deletes.putAll(tableName, propertyValueSplitter.split(propertyValue));
532 }
533 else if (REJECTS_SUFFIX.equals(propertySuffix)) {
534 rejectedRows.putAll(tableName, propertyValueSplitter.split(propertyValue));
535 }
536 }
537
538 }
539 }
540
541
542 line = reader.readLine();
543
544 }
545 } catch (Exception e) {
546 throw new QuadrigeTechnicalException("Unable to write synchronization change log into file: " + changeLogFile.getPath(), e);
547 } finally {
548 IOUtils.closeQuietly(reader);
549 }
550 }
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566 protected void writeToFile(Writer writer, Multimap<String, String> pksByTableName, String propertySuffix) throws IOException {
567
568 StringBuilder sb = new StringBuilder();
569 for (String tableName : pksByTableName.keySet()) {
570
571
572 sb.setLength(0);
573 for (String pkStr : pksByTableName.get(tableName)) {
574 sb.append(PK_VALUE_SEPARATOR).append(pkStr);
575 }
576
577 writer.write(String.format("%s.%s=%s\n",
578 tableName,
579 propertySuffix,
580 sb.substring(1)
581 ));
582 }
583 }
584
585 }