@@ -31,7 +31,8 @@ namespace {
3131void mergeInto (
3232 type::Schema& dst,
3333 type::Schema&& src,
34- std::unordered_set<type::ProgramId>& includedPrograms) {
34+ std::unordered_set<type::ProgramId>& includedPrograms,
35+ bool allowDuplicateDefinitionKeys) {
3536 for (auto & program : *src.programs ()) {
3637 auto id = *program.id ();
3738 if (!includedPrograms.insert (id).second ) {
@@ -51,7 +52,8 @@ void mergeInto(
5152 dst.definitionsMap ()->insert (
5253 std::make_move_iterator (src.definitionsMap ()->begin ()),
5354 std::make_move_iterator (src.definitionsMap ()->end ()));
54- if (dst.definitionsMap ()->size () != ndefs + src.definitionsMap ()->size ()) {
55+ if (!allowDuplicateDefinitionKeys &&
56+ dst.definitionsMap ()->size () != ndefs + src.definitionsMap ()->size ()) {
5557 throw std::runtime_error (" DefinitionKey collision" );
5658 }
5759}
@@ -85,7 +87,11 @@ SchemaRegistry::Ptr SchemaRegistry::getMergedSchema() {
8587 mergedSchema_ = std::make_shared<type::Schema>();
8688 for (auto & [name, data] : base_.rawSchemas_ ) {
8789 if (auto schema = readSchema (data.data )) {
88- mergeInto (*mergedSchema_, std::move (*schema), includedPrograms_);
90+ mergeInto (
91+ *mergedSchema_,
92+ std::move (*schema),
93+ includedPrograms_,
94+ /* allowDuplicateDefinitionKeys*/ false );
8995 }
9096 }
9197
@@ -98,7 +104,11 @@ SchemaRegistry::Ptr SchemaRegistry::getMergedSchema() {
98104 }
99105
100106 if (auto schema = readSchema (data)) {
101- mergeInto (*mergedSchema_, std::move (*schema), includedPrograms_);
107+ mergeInto (
108+ *mergedSchema_,
109+ std::move (*schema),
110+ includedPrograms_,
111+ /* allowDuplicateDefinitionKeys*/ false );
102112 }
103113 };
104114
@@ -113,7 +123,11 @@ type::Schema SchemaRegistry::mergeSchemas(
113123
114124 for (const auto & data : schemas) {
115125 if (auto schema = readSchema (data)) {
116- mergeInto (mergedSchema, std::move (*schema), includedPrograms);
126+ mergeInto (
127+ mergedSchema,
128+ std::move (*schema),
129+ includedPrograms,
130+ /* allowDuplicateDefinitionKeys*/ false );
117131 }
118132 }
119133
@@ -125,7 +139,18 @@ type::Schema SchemaRegistry::mergeSchemas(std::vector<type::Schema>&& schemas) {
125139 std::unordered_set<type::ProgramId> includedPrograms;
126140
127141 for (auto & schema : schemas) {
128- mergeInto (mergedSchema, std::move (schema), includedPrograms);
142+ /*
143+ * allowDuplicateDefinitionKeys is true here because this is called by
144+ * MultiplexAsyncProcessor, which may hold services with a shared base
145+ * service.
146+ * Additionally, since this function accepts deserialized schemas
147+ * those schemas were probably already checked for duplicates earlier.
148+ */
149+ mergeInto (
150+ mergedSchema,
151+ std::move (schema),
152+ includedPrograms,
153+ /* allowDuplicateDefinitionKeys*/ true );
129154 }
130155
131156 return mergedSchema;
0 commit comments