03 กรกฎาคม 2568

GCP Cloud Dataflow use-case patterns

GCP Cloud Dataflow use-case patterns

Part 1: https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-1

Part 2: https://cloud.google.com/blog/products/data-analytics/guide-to-common-cloud-dataflow-use-case-patterns-part-2


Generate random data and write to 2 MongoDB collections

public static void main(String[] args) throws Exception {
MongoDBExportOptions options = PipelineOptionsFactory
.fromArgs(args).as(MongoDBExportOptions.class);

Pipeline pipeline = Pipeline.create(options);

// total documents to be written
int totalDocs = 1000000;

// write dummy doc1 to collection1
PCollection<Document> col1 = pipeline
.apply("Generate sequence", GenerateSequence.from(0).to(totalDocs))
.apply("Generate dummy doc1", ParDo.of(new GenerateDoc1Fn()));
.apply("Write data to collection2",
MongoDbIO.write()
.withUri("xxx")
.withDatabase("db")
.withCollection("collection1");

PCollection<Document> col2 = pipeline
.apply("Generate sequence", GenerateSequence.from(0).to(totalDocs))
.apply("Generate dummy doc2", ParDo.of(new GenerateDoc2Fn()));
.apply("Write data to collection2",
MongoDbIO.write()
.withUri("xxx")
.withDatabase("db")
.withCollection("collection2");
}


public static class GenerateDoc1Fn extends DoFn<Long, Document> {
@ProcessElement
public void processElement(@Element Long index, OutputReceiver<Document> out) {
out.output(new Document()
.append("doc1_field", "doc1_value"));
}
}

public static class GenerateDoc2Fn extends DoFn<Long, Document> {
@ProcessElement
public void processElement(@Element Long index, OutputReceiver<Document> out) {
out.output(new Document()
.append("doc2_field", "doc2_value"));
}
}

ไม่มีความคิดเห็น:

แสดงความคิดเห็น

บทความยอดนิยม (ล่าสุด)

บทความยอดนิยม (1 ปีย้อนหลัง)