Skip to content

Commit

Permalink
Merge pull request #89 from statisticsnorway/DPSTAT-814-implementere-…
Browse files Browse the repository at this point in the history
…map-sid-i-depseudonymisering

DPSTAT-814-Implementere map sid i depseudonymisering
  • Loading branch information
RupinderKaurSSB authored Mar 6, 2024
2 parents 759cf0c + 4c1fc16 commit ae11184
Show file tree
Hide file tree
Showing 15 changed files with 348 additions and 64 deletions.
1 change: 1 addition & 0 deletions conf/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ app-roles:
- [email protected]
- [email protected]
- [email protected]
- [email protected]

admins-group: [email protected]

37 changes: 36 additions & 1 deletion doc/requests/examples-sid.http
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ Authorization: Bearer {{keycloak_token}}
"fnr": "11854898347"
}

###
POST {{base_url}}/local-sid/sid/map
Content-Type: application/json
Authorization: Bearer {{keycloak_token}}

{
"snr": "0001ha3"
}

### Get SID for fnr

GET {{base_url}}/sid/fnr/11854898347?snapshot=423243224
Expand All @@ -25,6 +34,32 @@ GET {{base_url}}/sid/snr/0001ha3
Content-Type: application/json
Authorization: Bearer {{keycloak_token}}

### Map batch of fnrs

POST {{base_url}}/sid/map/batch
Content-Type: application/json
Authorization: Bearer {{keycloak_token}}

{
"fnrList": [
"11854898347",
"01839899544"
]
}

### Map batch of snrs

POST {{base_url}}/sid/map/batch
Content-Type: application/json
Authorization: Bearer {{keycloak_token}}

{
"snrList": [
"0001ha3",
"0006kh2"
]
}


### Look up SID for a list of FNRs and return those that are not found

Expand All @@ -35,7 +70,7 @@ Authorization: Bearer {{keycloak_token}}
{
"fnrList": [
"20859374701",
"01234567890"
"01839899544"
]
}

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>logcaptor</artifactId>
<version>2.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public RecordMapProcessor<PseudoMetadataProcessor> newDepseudonymizeRecordProces
for (PseudoConfig config : pseudoConfigs) {
final PseudoFuncs fieldDepseudonymizer = newPseudoFuncs(config.getRules(),
pseudoKeysetsOf(config.getKeysets()));
chain.preprocessor((f, v) -> init(fieldDepseudonymizer, f, v));
chain.register((f, v) -> process(DEPSEUDONYMIZE, fieldDepseudonymizer, f, v, metadataProcessor));
}

Expand Down Expand Up @@ -90,7 +91,7 @@ private String init(PseudoFuncs pseudoFuncs, FieldDescriptor field, String varVa
return varValue;
}

private String process(PseudoOperation operation,
private String process(PseudoOperation operation,
PseudoFuncs func,
FieldDescriptor field,
String varValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public Publisher<Map<String, SidInfo>> lookupFnr(List<String> fnrList, Optional<
);
}

public Publisher<Map<String, SidInfo>> lookupSnr(List<String> snrList, Optional<String> snapshot) {
return Publishers.map(sidClient.lookup(
new MultiSidRequest.MultiSidRequestBuilder().snrList(snrList)
.datasetExtractionSnapshotTime(snapshot.orElse(null)).build()
), MultiSidResponse::toMap
);
}

@Override
public Publisher<MultiSidLookupResponse> lookupMissing(List<String> fnrList, Optional<String> snapshot) {
return Publishers.map(sidClient.lookup(
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/no/ssb/dlp/pseudo/service/sid/LocalSidService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.ssb.dlp.pseudo.service.sid;

import com.google.common.base.Strings;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.async.publisher.Publishers;
import lombok.RequiredArgsConstructor;
Expand All @@ -25,7 +26,8 @@ class LocalSidService implements SidService {
@Override
public Publisher<SidInfo> lookupFnr(String fnr, Optional<String> snapshot) {
String currentSnr = sidCache.getCurrentSnrForFnr(fnr)
.orElseThrow(() -> new LocalSidService.NoSidMappingFoundException("No SID matching fnr=" + fnr));
.orElseThrow(() -> new LocalSidService.NoSidMappingFoundException("No SID matching fnr starting from="
+ Strings.padEnd(fnr, 6, ' ').substring(0, 6)));
return Publishers.just(sidCache.getCurrentFnrForSnr(currentSnr).map(currentFnr ->
new SidInfo.SidInfoBuilder().snr(currentSnr).fnr(currentFnr).build())
.orElse(null));
Expand All @@ -34,7 +36,8 @@ public Publisher<SidInfo> lookupFnr(String fnr, Optional<String> snapshot) {
@Override
public Publisher<SidInfo> lookupSnr(String snr, Optional<String> snapshot) {
String currentFnr = sidCache.getCurrentFnrForSnr(snr)
.orElseThrow(() -> new LocalSidService.NoSidMappingFoundException("No SID matching snr=" + snr));
.orElseThrow(() -> new LocalSidService.NoSidMappingFoundException("No SID matching snr starting from="
+ Strings.padEnd(snr, 4, ' ').substring(0, 4)));
return Publishers.just(sidCache.getCurrentSnrForFnr(currentFnr).map(currentSnr ->
new SidInfo.SidInfoBuilder().snr(currentSnr).fnr(currentFnr).build())
.orElse(null));
Expand All @@ -44,7 +47,8 @@ public Publisher<SidInfo> lookupSnr(String snr, Optional<String> snapshot) {
public Publisher<Map<String, SidInfo>> lookupFnr(List<String> fnrList, Optional<String> snapshot) {
return Publishers.just(fnrList.stream().map(fnr -> {
String currentSnr = sidCache.getCurrentSnrForFnr(fnr).orElseThrow(() ->
new LocalSidService.NoSidMappingFoundException("No SID matching fnr=" + fnr));
new LocalSidService.NoSidMappingFoundException("No SID matching fnr starting from="
+ Strings.padEnd(fnr, 6, ' ').substring(0, 6)));
return sidCache.getCurrentFnrForSnr(currentSnr).map(currentFnr ->
new SidInfo.SidInfoBuilder().snr(currentSnr).fnr(currentFnr)
.datasetExtractionSnapshotTime(snapshot.orElse(null)).build())
Expand All @@ -53,6 +57,20 @@ public Publisher<Map<String, SidInfo>> lookupFnr(List<String> fnrList, Optional<
);
}

@Override
public Publisher<Map<String, SidInfo>> lookupSnr(List<String> snrList, Optional<String> snapshot) {
return Publishers.just(snrList.stream().map(snr -> {
String currentFnr = sidCache.getCurrentFnrForSnr(snr).orElseThrow(() ->
new LocalSidService.NoSidMappingFoundException("No SID matching snr starting from="
+ Strings.padEnd(snr, 4, ' ').substring(0, 4)));
return sidCache.getCurrentSnrForFnr(currentFnr).map(currentSnr ->
new SidInfo.SidInfoBuilder().fnr(currentFnr).snr(currentSnr)
.datasetExtractionSnapshotTime(snapshot.orElse(null)).build())
.orElse(null);
}).collect(Collectors.toMap(SidInfo::snr, sidInfo -> sidInfo))
);
}

@Override
public Publisher<MultiSidLookupResponse> lookupMissing(List<String> fnrList, Optional<String> snapshot) {
return Publishers.just(MultiSidLookupResponse.builder().missing(fnrList.stream().filter(fnr ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
@Jacksonized
@Introspected
@Serdeable
public record MultiSidRequest(List<String> fnrList, String datasetExtractionSnapshotTime) { }
public record MultiSidRequest(List<String> fnrList, List<String> snrList, String datasetExtractionSnapshotTime) {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.ssb.dlp.pseudo.service.sid;

import io.micronaut.core.util.CollectionUtils;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
Expand Down Expand Up @@ -45,7 +46,9 @@ public Publisher<MultiSidLookupResponse> lookupMissing(@QueryValue Optional<Stri
@ExecuteOn(TaskExecutors.IO)
@Post("/map/batch")
public Publisher<Map<String, SidInfo>> lookupFnrs(@QueryValue Optional<String> snapshot, @Body MultiSidRequest req) {
return sidService.lookupFnr(req.fnrList(), snapshot);
return CollectionUtils.isNotEmpty(req.fnrList())
? sidService.lookupFnr(req.fnrList(), snapshot)
: sidService.lookupSnr(req.snrList(), snapshot);
}

@ExecuteOn(TaskExecutors.IO)
Expand Down
113 changes: 71 additions & 42 deletions src/main/java/no/ssb/dlp/pseudo/service/sid/SidMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,24 @@ public class SidMapper implements Mapper {
private final int partitionSize;
private Map<String, Object> config = Collections.emptyMap();

static final String NO_MATCHING_FNR = "No SID-mapping found for fnr starting with {}";
static final String NO_MATCHING_SNR = "No SID-mapping found for snr starting with {}";
static final String INCORRECT_MATCHING_FNR = "Incorrect SID-mapping for fnr starting with {}";
static final String INCORRECT_MATCHING_SNR = "Incorrect SID-mapping for snr starting with {}";

public SidMapper() {
sidService = Application.getContext().getBean(SidService.class);
partitionSize = Application.getContext().getProperty("sid.mapper.partition.size", Integer.class,
DEFAULT_PARTITION_SIZE);
}

private final Set<String> fnrs = ConcurrentHashMap.newKeySet();
private final Set<String> fnrsOrSnrs = ConcurrentHashMap.newKeySet();
private final ConcurrentHashMap<String, ObservableSubscriber<Map<String, SidInfo>>> bulkRequest = new ConcurrentHashMap<>();

@Override
public void init(PseudoFuncInput input) {
for (Object inputValue : input.getValues()) {
fnrs.add(String.valueOf(inputValue));
fnrsOrSnrs.add(String.valueOf(inputValue));
}
}

Expand All @@ -63,54 +68,83 @@ public PseudoFuncOutput map(PseudoFuncInput input) {
PseudoFuncOutput output = new PseudoFuncOutput();
for (Object inputValue : input.getValues()) {
String plain = String.valueOf(inputValue);
mapTo(plain, output);
mapTo(plain, true, output);
}
return output;
}

@Override
public PseudoFuncOutput restore(PseudoFuncInput input) {
PseudoFuncOutput output = new PseudoFuncOutput();
for (Object inputValue : input.getValues()) {
String plain = String.valueOf(inputValue);
mapTo(plain, false, output);
}
return output;
}

private void mapTo(String fnr, PseudoFuncOutput output) {
if (fnr == null) {
private void mapTo(String fnrOrSnr, boolean isFnr, PseudoFuncOutput output) {
if (fnrOrSnr == null) {
return;
}
try {
// Execute the bulk request if necessary
if (bulkRequest.isEmpty()) {
// Split fnrs into chunks of BULK_SIZE
for (List<String> bulkFnr: Lists.partition(List.copyOf(fnrs), partitionSize)) {
// Split fnrs or snrs into chunks of BULK_SIZE
for (List<String> bulkFnrOrSnr : Lists.partition(List.copyOf(fnrsOrSnrs), partitionSize)) {
log.info("Execute SID-mapping bulk request");
final ObservableSubscriber<Map<String, SidInfo>> subscriber = ObservableSubscriber.subscribe(
sidService.lookupFnr(bulkFnr, getSnapshot()));
for (String f: bulkFnr) {
final ObservableSubscriber<Map<String, SidInfo>> subscriber;

if (isFnr) {
subscriber = ObservableSubscriber.subscribe(
sidService.lookupFnr(bulkFnrOrSnr, getSnapshot()));
} else {
subscriber = ObservableSubscriber.subscribe(
sidService.lookupSnr(bulkFnrOrSnr, getSnapshot()));
}

for (String f : bulkFnrOrSnr) {
bulkRequest.put(f, subscriber);
}
}
}
SidInfo result = bulkRequest.get(fnr).awaitResult()
.orElseThrow(() -> new RuntimeException("SID service did not respond")).get(fnr);
if (result == null) {
log.warn("No SID-mapping found for fnr starting with {}", Strings.padEnd(fnr, 6, ' ').substring(0, 6));
output.addWarning(String.format("No SID-mapping found for fnr %s", fnr));
output.add(fnr);
} else if (result.snr() == null) {
log.warn("No SID-mapping found for fnr starting with {}", Strings.padEnd(fnr, 6, ' ').substring(0, 6));
output.addMetadata(MapFuncConfig.Param.SNAPSHOT_DATE, result.datasetExtractionSnapshotTime());
output.addWarning(String.format("No SID-mapping found for fnr %s", fnr));
output.add(fnr);
} else {
if (fnr.equals(result.snr())) {
log.warn("Incorrect SID-mapping for fnr starting with {}. Mapping returned the original fnr!",
Strings.padEnd(fnr, 6, ' ').substring(0, 6));
output.addWarning(String.format("Incorrect SID-mapping for fnr %s. Mapping returned the original fnr!", fnr));
} else {
log.debug("Successfully mapped fnr starting with {}", Strings.padEnd(fnr, 6, ' ').substring(0, 6));
}
output.addMetadata(MapFuncConfig.Param.SNAPSHOT_DATE, result.datasetExtractionSnapshotTime());
output.add(result.snr());
}
SidInfo result = bulkRequest.get(fnrOrSnr).awaitResult()
.orElseThrow(() -> new RuntimeException("SID service did not respond")).get(fnrOrSnr);

createMappingLogsAndOutput(result, isFnr, fnrOrSnr, output);

output.addMetadata(MapFuncConfig.Param.SNAPSHOT_DATE, result.datasetExtractionSnapshotTime());
output.add(isFnr ? result.snr() : result.fnr());

} catch (LocalSidService.NoSidMappingFoundException e) {
log.warn("No SID-mapping found for fnr starting with {}", Strings.padEnd(fnr, 6, ' ').substring(0, 6));
output.addWarning(String.format("No SID-mapping found for fnr %s", fnr));
output.add(fnr);
log.warn(isFnr ? NO_MATCHING_FNR : NO_MATCHING_SNR, Strings.padEnd(fnrOrSnr, 6, ' ').substring(0, 6));
output.addWarning(isFnr ? String.format("No SID-mapping found for fnr %s", fnrOrSnr.substring(0, 6)) : String.format("No SID-mapping found for snr %s", fnrOrSnr.substring(0, 4)));
output.add(fnrOrSnr);
}
}


private void createMappingLogsAndOutput(SidInfo sidInfo, boolean isFnr, String fnrOrSnr, PseudoFuncOutput pseudoFuncOutput) {
//Mapping for fnr
if (isFnr) {
if (sidInfo == null || sidInfo.snr() == null) {
log.warn(NO_MATCHING_FNR, Strings.padEnd(fnrOrSnr, 6, ' ').substring(0, 6));
pseudoFuncOutput.addWarning(String.format("No SID-mapping found for fnr %s", fnrOrSnr.substring(0, 6)));
pseudoFuncOutput.add(fnrOrSnr);
} else if (fnrOrSnr.equals(sidInfo.snr())) {
log.warn(INCORRECT_MATCHING_FNR, Strings.padEnd(fnrOrSnr, 6, ' ').substring(0, 6));
pseudoFuncOutput.addWarning(String.format("Incorrect SID-mapping for fnr %s. Mapping returned the original fnr!", fnrOrSnr.substring(0, 6)));
}
}
//Mapping for snr
else {
if (sidInfo == null || sidInfo.fnr() == null) {
log.warn(NO_MATCHING_SNR, Strings.padEnd(fnrOrSnr, 4, ' ').substring(0, 4));
pseudoFuncOutput.addWarning(String.format("No SID-mapping found for snr %s", fnrOrSnr.substring(0, 4)));
} else if (fnrOrSnr.equals(sidInfo.fnr())) {
log.warn(INCORRECT_MATCHING_SNR, Strings.padEnd(fnrOrSnr, 4, ' ').substring(0, 4));
pseudoFuncOutput.addWarning(String.format("Incorrect SID-mapping for snr %s. Mapping returned the original snr!", fnrOrSnr.substring(0, 4)));
}
}
}

Expand All @@ -120,7 +154,7 @@ private Optional<String> getSnapshot() {
).map(String::valueOf);
}

@Override
@Override
public void setConfig(Map<String, Object> config) {
if (config.containsKey(MapFuncConfig.Param.SNAPSHOT_DATE)) {
SnapshotInfo availableSnapshots = ObservableSubscriber.subscribe(this.sidService.getSnapshots()).awaitResult()
Expand All @@ -142,19 +176,14 @@ public void setConfig(Map<String, Object> config) {
throw new RuntimeException("Invalid date format from SID service");
}
}).toList();
if(availableSnapshotDates.stream().allMatch(requestedSnapshotDate::isBefore)){
if (availableSnapshotDates.stream().allMatch(requestedSnapshotDate::isBefore)) {
throw new InvalidSidSnapshotDateException(String.format("Requested date is of an earlier date than all available SID dates. Valid dates are: %s",
String.join(", ", availableSnapshots.items())));
}
}
this.config = config;
}

@Override
public PseudoFuncOutput restore(PseudoFuncInput input) {
return PseudoFuncOutput.of(input.getValues());
}

/**
* A Subscriber that stores the publishers results and provides a latch so can block on completion.
*
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/no/ssb/dlp/pseudo/service/sid/SidService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@

public interface SidService {
Publisher<SidInfo> lookupFnr(String fnr, Optional<String> snapshot);

Publisher<SidInfo> lookupSnr(String fnr, Optional<String> snapshot);
Publisher<SidInfo> lookupSnr(String snr, Optional<String> snapshot);
Publisher<Map<String, SidInfo>> lookupFnr(List<String> fnrList, Optional<String> snapshot);

Publisher<Map<String, SidInfo>> lookupSnr(List<String> snrList, Optional<String> snapshot);
Publisher<MultiSidLookupResponse> lookupMissing(List<String> fnrList, Optional<String> snapshot);

Publisher<SnapshotInfo> getSnapshots();
}
Loading

0 comments on commit ae11184

Please sign in to comment.