From 956632656de69efc55b69b181529f067bc7ca53c Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Tue, 23 Sep 2025 14:27:43 +1000 Subject: [PATCH 01/13] nearly works --- .../execution/DataLoaderDispatchStrategy.java | 4 + .../graphql/execution/ExecutionStrategy.java | 1 + .../PerLevelDataLoaderDispatchStrategy.java | 376 ++++++++++-------- 3 files changed, 221 insertions(+), 160 deletions(-) diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index b3f837cd5c..a169b1cb93 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -40,6 +40,10 @@ default void executeObjectOnFieldValuesInfo(List fieldValueInfoL } + default void fieldCompleted(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { + + } + default void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) { } diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index c1272df44b..e15d27616b 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -638,6 +638,7 @@ private FieldValueInfo completeField(GraphQLFieldDefinition fieldDef, ExecutionC ); FieldValueInfo fieldValueInfo = completeValue(executionContext, newParameters); + executionContext.getDataLoaderDispatcherStrategy().fieldCompleted(fieldValueInfo, parameters); ctxCompleteField.onDispatched(); if (fieldValueInfo.isFutureValue()) { CompletableFuture executionResultFuture = fieldValueInfo.getFieldValueFuture(); diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index bd971f656d..0ca4a50c32 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -36,7 +36,7 @@ public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStr private final Profiler profiler; - private final Map deferredCallStackMap = new ConcurrentHashMap<>(); + private final Map alternativeCallContextMap = new ConcurrentHashMap<>(); private static class ChainedDLStack { @@ -138,30 +138,41 @@ private static class CallStack { /** * A general overview of teh tracked data: * There are three aspects tracked per level: - * - number of execute object calls (executeObject) - * - number of fetches - * - number of sub selections finished fetching + * - number of expected and happened execute object calls (executeObject) + * - number of expected and happened fetches + * - number of happened sub selections finished fetching *

- * The level for an execute object call is the level of the field in the query: for - * { a {b {c}}} the level of a is 1, b is 2 and c is not an object + * The level for an execute object call is the level of sub selection of the object: for + * { a {b {c}}} the level of "execute object a" is 2 *

* For fetches the level is the level of the field fetched *

* For sub selections finished it is the level of the fields inside the sub selection: * {a1 { b c} a2 } the level of {a1 a2} is 1, the level of {b c} is 2 *

+ * The main aspect for when a level is ready is when all expected fetch call happened, meaning + * we can dispatch this level as all data loaders in this level have been called + * (if the number of expected fetches is correct). *

- * A finished subselection means we can predict the number of execute object calls in the same level as the subselection: + * The number of expected fetches is increased with every executeObject (based on the number of subselection + * fields for the execute object). + * Execute Object a (on level 2) with { a {f1 f2 f3} } means we expect 3 fetches on level 2. + *

+ * A finished subselection means we can predict the number of execute object calls in the next level as the subselection: * { a {x} b {y} } - * If a is a list of 3 objects and b is a list of 2 objects we expect 3 + 2 = 5 execute object calls on the level 1 to be happening + * If a is a list of 3 objects and b is a list of 2 objects we expect 3 + 2 = 5 execute object calls on the level 2 to be happening + *

+ * The finished sub selection is the only "cross level" event: a finished sub selections impacts the expected execute + * object calls on the next level. *

- * An executed object call again means we can predict the number of fetches in the next level: - * Execute Object a with { a {f1 f2 f3} } means we expect 3 fetches on level 2. *

* This means we know a level is ready to be dispatched if: - * - all subselections done in the parent level - * - all execute objects calls in the parent level are done * - all expected fetched happened in the current level + * - all expected execute objects calls happened in the current level (because they inform the expected fetches) + * - all expected sub selections happened in the parent level (because they inform the expected execute object in the current level). + * The expected sub selections are equal to the expected object calls (in the parent level) + * - All expected sub selections happened in the parent parent level (again: meaning #happenedSubSelections == #expectedExecuteObjectCalls) + * - And so until the first level */ private final LevelMap expectedFetchCountPerLevel = new LevelMap(); @@ -169,17 +180,17 @@ private static class CallStack { // an object call means a sub selection of a field of type object/interface/union // the number of fields for sub selections increases the expected fetch count for this level - private final LevelMap expectedExecuteObjectCallsPerLevel = new LevelMap(); - private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); +// private final LevelMap expectedExecuteObjectCallsPerLevel = new LevelMap(); +// private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); + + private final LevelMap happenedCompleteFieldPerLevel = new LevelMap(); // this means one sub selection has been fully fetched // and the expected execute objects calls for the next level have been calculated - private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap(); +// private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap(); private final Set dispatchedLevels = ConcurrentHashMap.newKeySet(); - // all levels that are ready to be dispatched - private int highestReadyLevel; public ChainedDLStack chainedDLStack = new ChainedDLStack(); @@ -188,7 +199,7 @@ private static class CallStack { public CallStack() { // in the first level there is only one sub selection, // so we only expect one execute object call (which is actually an executionStrategy call) - expectedExecuteObjectCallsPerLevel.set(0, 1); +// expectedExecuteObjectCallsPerLevel.set(1, 1); } @@ -200,7 +211,7 @@ void clearExpectedFetchCount() { expectedFetchCountPerLevel.clear(); } - void increaseFetchCount(int level) { + void increaseHappenedFetchCount(int level) { fetchCountPerLevel.increment(level, 1); } @@ -210,35 +221,40 @@ void clearFetchCount() { } void increaseExpectedExecuteObjectCalls(int level, int count) { - expectedExecuteObjectCallsPerLevel.increment(level, count); - } - - void clearExpectedObjectCalls() { - expectedExecuteObjectCallsPerLevel.clear(); +// expectedExecuteObjectCallsPerLevel.increment(level, count); } - void increaseHappenedExecuteObjectCalls(int level) { - happenedExecuteObjectCallsPerLevel.increment(level, 1); - } - - void clearHappenedExecuteObjectCalls() { - happenedExecuteObjectCallsPerLevel.clear(); - } +// void clearExpectedObjectCalls() { +// expectedExecuteObjectCallsPerLevel.clear(); +// } +// +// void increaseHappenedExecuteObjectCalls(int level) { +// happenedExecuteObjectCallsPerLevel.increment(level, 1); +// } +// +// void clearHappenedExecuteObjectCalls() { +// happenedExecuteObjectCallsPerLevel.clear(); +// } - void increaseHappenedOnFieldValueCalls(int level) { - happenedOnFieldValueCallsPerLevel.increment(level, 1); - } +// void increaseHappenedOnFieldValueCalls(int level) { +// happenedOnFieldValueCallsPerLevel.increment(level, 1); +// } +// +// void clearHappenedOnFieldValueCalls() { +// happenedOnFieldValueCallsPerLevel.clear(); +// } - void clearHappenedOnFieldValueCalls() { - happenedOnFieldValueCallsPerLevel.clear(); - } +// boolean allExecuteObjectCallsHappened(int level) { +// return happenedExecuteObjectCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level); +// } - boolean allExecuteObjectCallsHappened(int level) { - return happenedExecuteObjectCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level); - } +// boolean allSubSelectionsFetchingHappened(int level) { +// return happenedOnFieldValueCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level); +// } +// - boolean allSubSelectionsFetchingHappened(int subSelectionLevel) { - return happenedOnFieldValueCallsPerLevel.get(subSelectionLevel) == expectedExecuteObjectCallsPerLevel.get(subSelectionLevel - 1); + boolean allFieldsCompleted(int level) { + return fetchCountPerLevel.get(level) == happenedCompleteFieldPerLevel.get(level); } boolean allFetchesHappened(int level) { @@ -254,9 +270,9 @@ public String toString() { return "CallStack{" + "expectedFetchCountPerLevel=" + expectedFetchCountPerLevel + ", fetchCountPerLevel=" + fetchCountPerLevel + - ", expectedExecuteObjectCallsPerLevel=" + expectedExecuteObjectCallsPerLevel + - ", happenedExecuteObjectCallsPerLevel=" + happenedExecuteObjectCallsPerLevel + - ", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel + +// ", expectedExecuteObjectCallsPerLevel=" + expectedExecuteObjectCallsPerLevel + +// ", happenedExecuteObjectCallsPerLevel=" + happenedExecuteObjectCallsPerLevel + +// ", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel + ", dispatchedLevels" + dispatchedLevels + '}'; } @@ -267,6 +283,11 @@ public void setDispatchedLevel(int level) { Assert.assertShouldNeverHappen("level " + level + " already dispatched"); } } + + public void clearHappenedCompleteFields() { + this.happenedCompleteFieldPerLevel.clear(); + + } } public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) { @@ -283,30 +304,30 @@ public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) { @Override public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { Assert.assertTrue(parameters.getExecutionStepInfo().getPath().isRootPath()); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(0, fieldCount, initialCallStack); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, fieldCount, initialCallStack); } @Override public void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); resetCallStack(callStack); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(0, 1, callStack); + // field count is always 1 for serial execution + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, 1, callStack); } - @Override - public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { - CallStack callStack = getCallStack(parameters); - // the root fields are the root sub selection on level 1 - onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1, callStack); - } +// @Override +// public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { +// CallStack callStack = getCallStack(parameters); +// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1, callStack); +// } - @Override - public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { - CallStack callStack = getCallStack(parameters); - synchronized (callStack) { - callStack.increaseHappenedOnFieldValueCalls(1); - } - } +// @Override +// public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { +// CallStack callStack = getCallStack(parameters); +// synchronized (callStack) { +// callStack.increaseHappenedOnFieldValueCalls(1); +// } +// } private CallStack getCallStack(ExecutionStrategyParameters parameters) { return getCallStack(parameters.getDeferredCallContext()); @@ -316,16 +337,19 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC if (alternativeCallContext == null) { return this.initialCallStack; } else { - return deferredCallStackMap.computeIfAbsent(alternativeCallContext, k -> { + return alternativeCallContextMap.computeIfAbsent(alternativeCallContext, k -> { CallStack callStack = new CallStack(); int startLevel = alternativeCallContext.getStartLevel(); int fields = alternativeCallContext.getFields(); - // we make sure that startLevel-1 is considered done - callStack.expectedExecuteObjectCallsPerLevel.set(0, 0); // set to 1 in the constructor of CallStack - callStack.expectedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); - callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); - callStack.highestReadyLevel = startLevel - 1; - callStack.increaseExpectedFetchCount(startLevel, fields); + System.out.println("startLevel for new callstack " + startLevel); + // we make sure that startLevel is considered done + for (int i = 1; i <= startLevel; i++) { + callStack.increaseExpectedFetchCount(startLevel, 1); + callStack.increaseHappenedFetchCount(1); + if (i < startLevel) { + callStack.happenedCompleteFieldPerLevel.set(startLevel, 1); + } + } return callStack; }); } @@ -335,25 +359,25 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { CallStack callStack = getCallStack(parameters); int curLevel = parameters.getPath().getLevel(); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, fieldCount, callStack); + increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel + 1, fieldCount, callStack); } - @Override - public void executeObjectOnFieldValuesInfo - (List fieldValueInfoList, ExecutionStrategyParameters parameters) { - // the level of the sub selection that is fully fetched is one level more than parameters level - int curLevel = parameters.getPath().getLevel() + 1; - CallStack callStack = getCallStack(parameters); - onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, callStack); - } +// @Override +// public void executeObjectOnFieldValuesInfo +// (List fieldValueInfoList, ExecutionStrategyParameters parameters) { +// // the level of the sub selection that is fully fetched is one level more than parameters level +// int curLevel = parameters.getPath().getLevel() + 1; +// CallStack callStack = getCallStack(parameters); +// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, callStack); +// } @Override public void newSubscriptionExecution(FieldValueInfo fieldValueInfo, AlternativeCallContext alternativeCallContext) { CallStack callStack = getCallStack(alternativeCallContext); - callStack.increaseFetchCount(1); + callStack.increaseHappenedFetchCount(1); callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); - onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, 1, callStack); +// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, 1, callStack); } @Override @@ -366,79 +390,106 @@ public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo Assert.assertNotNull(parameters.getDeferredCallContext()); ready = callStack.deferredFragmentRootFieldsFetched.size() == parameters.getDeferredCallContext().getFields(); } - if (ready) { - int curLevel = parameters.getPath().getLevel(); - onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, curLevel, callStack); - } - } - - @Override - public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { - CallStack callStack = getCallStack(parameters); - // the level of the sub selection that is errored is one level more than parameters level - int curLevel = parameters.getPath().getLevel() + 1; - synchronized (callStack) { - callStack.increaseHappenedOnFieldValueCalls(curLevel); - } +// if (ready) { +// int curLevel = parameters.getPath().getLevel(); +// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, curLevel, callStack); +// } } +// @Override +// public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { +// CallStack callStack = getCallStack(parameters); +// // the level of the sub selection that is errored is one level more than parameters level +// int curLevel = parameters.getPath().getLevel() + 1; +// synchronized (callStack) { +// callStack.increaseHappenedOnFieldValueCalls(curLevel); +// } +// } +// private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, int fieldCount, CallStack callStack) { synchronized (callStack) { - callStack.increaseHappenedExecuteObjectCalls(curLevel); - callStack.increaseExpectedFetchCount(curLevel + 1, fieldCount); + callStack.increaseExpectedFetchCount(curLevel, fieldCount); } } private void resetCallStack(CallStack callStack) { synchronized (callStack) { callStack.clearDispatchLevels(); - callStack.clearExpectedObjectCalls(); +// callStack.clearExpectedObjectCalls(); + callStack.clearHappenedCompleteFields(); callStack.clearExpectedFetchCount(); callStack.clearFetchCount(); - callStack.clearHappenedExecuteObjectCalls(); - callStack.clearHappenedOnFieldValueCalls(); - callStack.expectedExecuteObjectCallsPerLevel.set(0, 1); - callStack.highestReadyLevel = 0; +// callStack.clearHappenedExecuteObjectCalls(); +// callStack.clearHappenedOnFieldValueCalls(); +// callStack.expectedExecuteObjectCallsPerLevel.set(1, 1); callStack.chainedDLStack.clear(); } } - private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, - int subSelectionLevel, - CallStack callStack) { - Integer dispatchLevel; +// private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, +// int subSelectionLevel, +// CallStack callStack) { +// Integer dispatchLevel; +// synchronized (callStack) { +// dispatchLevel = handleSubSelectionFetched(fieldValueInfoList, subSelectionLevel, callStack); +// } +// // the handle on field values check for the next level if it is ready +// if (dispatchLevel != null) { +// dispatch(dispatchLevel, callStack); +// } +// } + + @Override + public void fieldCompleted(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters parameters) { + int level = parameters.getPath().getLevel(); +// System.out.println("field completed at level: " + level + " at: " + parameters.getPath()); + CallStack callStack = getCallStack(parameters); + int currentLevel = parameters.getPath().getLevel() + 1; synchronized (callStack) { - dispatchLevel = handleSubSelectionFetched(fieldValueInfoList, subSelectionLevel, callStack); + callStack.happenedCompleteFieldPerLevel.increment(level, 1); } - // the handle on field values check for the next level if it is ready - if (dispatchLevel != null) { - dispatch(dispatchLevel, callStack); + while (true) { + boolean levelReady; + synchronized (callStack) { + if (callStack.dispatchedLevels.contains(currentLevel)) { + break; + } + levelReady = dispatchIfNeeded(currentLevel, callStack); + } + if (levelReady) { + dispatch(currentLevel, callStack); + } else { + break; + } + currentLevel++; } } // // thread safety: called with callStack.lock // - private @Nullable Integer handleSubSelectionFetched(List fieldValueInfos, int subSelectionLevel, CallStack - callStack) { - callStack.increaseHappenedOnFieldValueCalls(subSelectionLevel); - int expectedOnObjectCalls = getObjectCountForList(fieldValueInfos); - // we expect on the level of the current sub selection #expectedOnObjectCalls execute object calls - callStack.increaseExpectedExecuteObjectCalls(subSelectionLevel, expectedOnObjectCalls); - // maybe the object calls happened already (because the DataFetcher return directly values synchronously) - // therefore we check the next levels if they are ready - // this means we could skip some level because the higher level is also already ready, - // which means there is nothing to dispatch on these levels: if x and x+1 is ready, it means there are no - // data loaders used on x - // - // if data loader chaining is disabled (the old algo) the level we dispatch is not really relevant as - // we dispatch the whole registry anyway - - return getHighestReadyLevel(subSelectionLevel + 1, callStack); - } +// private @Nullable Integer handleSubSelectionFetched(List fieldValueInfos, int subSelectionLevel, CallStack +// callStack) { +// System.out.println("sub selection fetched at level :" + subSelectionLevel); +// callStack.increaseHappenedOnFieldValueCalls(subSelectionLevel); +// int expectedOnObjectCalls = getObjectCountForList(fieldValueInfos); +// // we expect on the next level of the current sub selection #expectedOnObjectCalls execute object calls +// callStack.increaseExpectedExecuteObjectCalls(subSelectionLevel + 1, expectedOnObjectCalls); +// +// // maybe the object calls happened already (because the DataFetcher return directly values synchronously) +// // therefore we check the next levels if they are ready +// // if data loader chaining is disabled (the old algo) the level we dispatch is not really relevant as +// // we dispatch the whole registry anyway +// +// if (checkLevelImpl(subSelectionLevel + 1, callStack)) { +// return subSelectionLevel + 1; +// } else { +// return null; +// } +// } /** * the amount of (non nullable) objects that will require an execute object call @@ -466,7 +517,8 @@ public void fieldFetched(ExecutionContext executionContext, int level = executionStrategyParameters.getPath().getLevel(); boolean dispatchNeeded; synchronized (callStack) { - callStack.increaseFetchCount(level); + System.out.println("field fetched at level " + level); + callStack.increaseHappenedFetchCount(level); dispatchNeeded = dispatchIfNeeded(level, callStack); } if (dispatchNeeded) { @@ -480,7 +532,7 @@ public void fieldFetched(ExecutionContext executionContext, // thread safety : called with callStack.lock // private boolean dispatchIfNeeded(int level, CallStack callStack) { - boolean ready = checkLevelBeingReady(level, callStack); + boolean ready = checkLevelImpl(level, callStack); if (ready) { callStack.setDispatchedLevel(level); return true; @@ -491,50 +543,53 @@ private boolean dispatchIfNeeded(int level, CallStack callStack) { // // thread safety: called with callStack.lock // - private @Nullable Integer getHighestReadyLevel(int startFrom, CallStack callStack) { - int curLevel = callStack.highestReadyLevel; - while (true) { - if (!checkLevelImpl(curLevel + 1, callStack)) { - callStack.highestReadyLevel = curLevel; - return curLevel >= startFrom ? curLevel : null; - } - curLevel++; - } - } - - private boolean checkLevelBeingReady(int level, CallStack callStack) { - Assert.assertTrue(level > 0); - if (level <= callStack.highestReadyLevel) { - return true; - } - - for (int i = callStack.highestReadyLevel + 1; i <= level; i++) { - if (!checkLevelImpl(i, callStack)) { - return false; - } - } - callStack.highestReadyLevel = level; - return true; - } +// private @Nullable Integer getHighestReadyLevel(int startFrom, CallStack callStack) { +// while (true) { +// if (!checkLevelImpl(curLevel + 1, callStack)) { +// callStack.highestReadyLevel = curLevel; +// return curLevel >= startFrom ? curLevel : null; +// } +// curLevel++; +// } +// } + +// private boolean checkLevelBeingReady(int level, CallStack callStack) { +// Assert.assertTrue(level > 0); +// +// for (int i = callStack.highestReadyLevel + 1; i <= level; i++) { +// if (!checkLevelImpl(i, callStack)) { +// return false; +// } +// } +// callStack.highestReadyLevel = level; +// return true; +// } private boolean checkLevelImpl(int level, CallStack callStack) { + System.out.println("checkLevelImpl " + level); // a level with zero expectations can't be ready if (callStack.expectedFetchCountPerLevel.get(level) == 0) { return false; } - // first we make sure that the expected fetch count is correct - // by verifying that the parent level all execute object + sub selection were fetched - if (!callStack.allExecuteObjectCallsHappened(level - 1)) { - return false; - } - if (level > 1 && !callStack.allSubSelectionsFetchingHappened(level - 1)) { - return false; - } - // the main check: all fetches must have happened + // all fetches happened if (!callStack.allFetchesHappened(level)) { return false; } +// // the fetch count is actually correct because all execute object happened +// if (!callStack.allFieldsCompleted(level-1)) { +// return false; +// } + // the expected execute object call is correct because all sub selections got fetched on the parent + // and their parent and their parent etc + int levelTmp = level - 1; + while (levelTmp >= 1) { + if (!callStack.allFieldsCompleted(levelTmp)) { + return false; + } + levelTmp--; + } + System.out.println("check ready " + level); return true; } @@ -545,6 +600,7 @@ void dispatch(int level, CallStack callStack) { dispatchAll(dataLoaderRegistry, level); return; } +// System.out.println("dispatching " + level); dispatchDLCFImpl(level, callStack, true, false); } From 07b160083f32bcd2191cbeda654b69108be73e4f Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Tue, 23 Sep 2025 21:22:57 +1000 Subject: [PATCH 02/13] works except dataloader in defer --- .../instrumentation/dataloader/LevelMap.java | 2 +- .../PerLevelDataLoaderDispatchStrategy.java | 30 +++++++------------ ...eferExecutionSupportIntegrationTest.groovy | 2 ++ .../dataloader/DeferWithDataLoaderTest.groovy | 7 +++++ .../dataloader/LevelMapTest.groovy | 5 ++-- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/LevelMap.java b/src/main/java/graphql/execution/instrumentation/dataloader/LevelMap.java index ddf46f643b..45fdca37b9 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/LevelMap.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/LevelMap.java @@ -57,7 +57,7 @@ public String toString() { StringBuilder result = new StringBuilder(); result.append("IntMap["); for (int i = 0; i < countsByLevel.length; i++) { - result.append("level=").append(i).append(",count=").append(countsByLevel[i]).append(" "); + result.append("[level=").append(i).append(",count=").append(countsByLevel[i]).append("] "); } result.append("]"); return result.toString(); diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 0ca4a50c32..311843eaf3 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -175,15 +175,18 @@ private static class CallStack { * - And so until the first level */ + private final LevelMap expectedFetchCountPerLevel = new LevelMap(); private final LevelMap fetchCountPerLevel = new LevelMap(); + // happened complete field implies that the expected fetch field is correct + // because completion triggers all needed executeObject, which determines the expected fetch field count + private final LevelMap happenedCompleteFieldPerLevel = new LevelMap(); // an object call means a sub selection of a field of type object/interface/union // the number of fields for sub selections increases the expected fetch count for this level // private final LevelMap expectedExecuteObjectCallsPerLevel = new LevelMap(); // private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); - private final LevelMap happenedCompleteFieldPerLevel = new LevelMap(); // this means one sub selection has been fully fetched // and the expected execute objects calls for the next level have been calculated @@ -339,17 +342,11 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC } else { return alternativeCallContextMap.computeIfAbsent(alternativeCallContext, k -> { CallStack callStack = new CallStack(); + System.out.println("new callstack: " + callStack); int startLevel = alternativeCallContext.getStartLevel(); int fields = alternativeCallContext.getFields(); - System.out.println("startLevel for new callstack " + startLevel); - // we make sure that startLevel is considered done - for (int i = 1; i <= startLevel; i++) { - callStack.increaseExpectedFetchCount(startLevel, 1); - callStack.increaseHappenedFetchCount(1); - if (i < startLevel) { - callStack.happenedCompleteFieldPerLevel.set(startLevel, 1); - } - } + callStack.expectedFetchCountPerLevel.set(1, 1); + callStack.fetchCountPerLevel.set(1, 1); return callStack; }); } @@ -445,12 +442,12 @@ private void resetCallStack(CallStack callStack) { @Override public void fieldCompleted(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters parameters) { int level = parameters.getPath().getLevel(); -// System.out.println("field completed at level: " + level + " at: " + parameters.getPath()); + System.out.println("field completed at level: " + level + " at: " + parameters.getPath()); CallStack callStack = getCallStack(parameters); - int currentLevel = parameters.getPath().getLevel() + 1; synchronized (callStack) { callStack.happenedCompleteFieldPerLevel.increment(level, 1); } + int currentLevel = parameters.getPath().getLevel() + 1; while (true) { boolean levelReady; synchronized (callStack) { @@ -517,7 +514,7 @@ public void fieldFetched(ExecutionContext executionContext, int level = executionStrategyParameters.getPath().getLevel(); boolean dispatchNeeded; synchronized (callStack) { - System.out.println("field fetched at level " + level); + System.out.println("field fetched at level " + level + " - " + executionStrategyParameters.getPath()); callStack.increaseHappenedFetchCount(level); dispatchNeeded = dispatchIfNeeded(level, callStack); } @@ -576,12 +573,7 @@ private boolean checkLevelImpl(int level, CallStack callStack) { if (!callStack.allFetchesHappened(level)) { return false; } -// // the fetch count is actually correct because all execute object happened -// if (!callStack.allFieldsCompleted(level-1)) { -// return false; -// } - // the expected execute object call is correct because all sub selections got fetched on the parent - // and their parent and their parent etc + int levelTmp = level - 1; while (levelTmp >= 1) { if (!callStack.allFieldsCompleted(levelTmp)) { diff --git a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy index b3b522d90b..8afa04300f 100644 --- a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy @@ -23,6 +23,7 @@ import org.dataloader.DataLoader import org.dataloader.DataLoaderFactory import org.dataloader.DataLoaderRegistry import org.reactivestreams.Publisher +import spock.lang.Ignore import spock.lang.Specification import spock.lang.Unroll @@ -1690,6 +1691,7 @@ class DeferExecutionSupportIntegrationTest extends Specification { } + @Ignore("tmp not working") def "dataloader used inside defer"() { given: def query = ''' diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy index 362206f64b..5df5837bf0 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy @@ -11,6 +11,7 @@ import org.awaitility.Awaitility import org.dataloader.BatchLoader import org.dataloader.DataLoaderFactory import org.dataloader.DataLoaderRegistry +import spock.lang.Ignore import spock.lang.RepeatUntilFailure import spock.lang.Specification @@ -60,6 +61,7 @@ class DeferWithDataLoaderTest extends Specification { } } + @Ignore def "query with single deferred field"() { given: def query = getQuery(true, false) @@ -104,6 +106,7 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 3 } + @Ignore def "multiple fields on same defer block"() { given: def query = """ @@ -177,6 +180,7 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 0 } + @Ignore def "query with nested deferred fields"() { given: def query = getQuery(true, true) @@ -228,6 +232,7 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 9 } + @Ignore def "query with top-level deferred field"() { given: def query = """ @@ -291,6 +296,7 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 0 } + @Ignore def "query with multiple deferred fields"() { given: def query = getExpensiveQuery(true) @@ -348,6 +354,7 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1 } + @Ignore @RepeatUntilFailure(maxAttempts = 50, ignoreRest = false) def "dataloader in initial result and chained dataloader inside nested defer block"() { given: diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/LevelMapTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/LevelMapTest.groovy index 8ff6bece5b..1f0069fb19 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/LevelMapTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/LevelMapTest.groovy @@ -1,7 +1,6 @@ package graphql.execution.instrumentation.dataloader import spock.lang.Specification -import graphql.AssertException class LevelMapTest extends Specification { @@ -92,13 +91,13 @@ class LevelMapTest extends Specification { sut.increment(0, 42) then: - sut.toString() == "IntMap[level=0,count=42 ]" + sut.toString() == "IntMap[[level=0,count=42] ]" when: sut.increment(1, 1) then: - sut.toString() == "IntMap[level=0,count=42 level=1,count=1 ]" + sut.toString() == "IntMap[[level=0,count=42] [level=1,count=1] ]" } def "can get outside of its size"() { From 56a39dfbe8a1ec08a5dfaa1e87d0c3fdd1e87623 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 11:32:22 +1000 Subject: [PATCH 03/13] working with simpler mode except batching, defer, subscription --- .../graphql/execution/ExecutionStrategy.java | 4 +- .../PerLevelDataLoaderDispatchStrategy.java | 398 ++++++------------ 2 files changed, 126 insertions(+), 276 deletions(-) diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index e15d27616b..8d29152b4a 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -207,11 +207,11 @@ protected Object executeObject(ExecutionContext executionContext, ExecutionStrat List fieldNames = parameters.getFields().getKeys(); DeferredExecutionSupport deferredExecutionSupport = createDeferredExecutionSupport(executionContext, parameters); + List fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames); + dataLoaderDispatcherStrategy.executeObject(executionContext, parameters, fieldsExecutedOnInitialResult.size()); Async.CombinedBuilder resolvedFieldFutures = getAsyncFieldValueInfo(executionContext, parameters, deferredExecutionSupport); CompletableFuture> overallResult = new CompletableFuture<>(); - List fieldsExecutedOnInitialResult = deferredExecutionSupport.getNonDeferredFieldNames(fieldNames); - dataLoaderDispatcherStrategy.executeObject(executionContext, parameters, fieldsExecutedOnInitialResult.size()); BiConsumer, Throwable> handleResultsConsumer = buildFieldValueMap(fieldsExecutedOnInitialResult, overallResult, executionContext); resolveObjectCtx.onDispatched(); diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 311843eaf3..4a7dde97f0 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -177,20 +177,9 @@ private static class CallStack { private final LevelMap expectedFetchCountPerLevel = new LevelMap(); - private final LevelMap fetchCountPerLevel = new LevelMap(); - // happened complete field implies that the expected fetch field is correct - // because completion triggers all needed executeObject, which determines the expected fetch field count - private final LevelMap happenedCompleteFieldPerLevel = new LevelMap(); - - // an object call means a sub selection of a field of type object/interface/union - // the number of fields for sub selections increases the expected fetch count for this level -// private final LevelMap expectedExecuteObjectCallsPerLevel = new LevelMap(); -// private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); - - - // this means one sub selection has been fully fetched - // and the expected execute objects calls for the next level have been calculated -// private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap(); + private final LevelMap happenedFetchCountPerLevel = new LevelMap(); + private final LevelMap happenedCompletionFinishedCountPerLevel = new LevelMap(); + private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); private final Set dispatchedLevels = ConcurrentHashMap.newKeySet(); @@ -200,69 +189,8 @@ private static class CallStack { private final List deferredFragmentRootFieldsFetched = new ArrayList<>(); public CallStack() { - // in the first level there is only one sub selection, - // so we only expect one execute object call (which is actually an executionStrategy call) -// expectedExecuteObjectCallsPerLevel.set(1, 1); - } - - - void increaseExpectedFetchCount(int level, int count) { - expectedFetchCountPerLevel.increment(level, count); - } - - void clearExpectedFetchCount() { - expectedFetchCountPerLevel.clear(); - } - - void increaseHappenedFetchCount(int level) { - fetchCountPerLevel.increment(level, 1); - } - - - void clearFetchCount() { - fetchCountPerLevel.clear(); - } - - void increaseExpectedExecuteObjectCalls(int level, int count) { -// expectedExecuteObjectCallsPerLevel.increment(level, count); - } - -// void clearExpectedObjectCalls() { -// expectedExecuteObjectCallsPerLevel.clear(); -// } -// -// void increaseHappenedExecuteObjectCalls(int level) { -// happenedExecuteObjectCallsPerLevel.increment(level, 1); -// } -// -// void clearHappenedExecuteObjectCalls() { -// happenedExecuteObjectCallsPerLevel.clear(); -// } - -// void increaseHappenedOnFieldValueCalls(int level) { -// happenedOnFieldValueCallsPerLevel.increment(level, 1); -// } -// -// void clearHappenedOnFieldValueCalls() { -// happenedOnFieldValueCallsPerLevel.clear(); -// } - -// boolean allExecuteObjectCallsHappened(int level) { -// return happenedExecuteObjectCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level); -// } - -// boolean allSubSelectionsFetchingHappened(int level) { -// return happenedOnFieldValueCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level); -// } -// - - boolean allFieldsCompleted(int level) { - return fetchCountPerLevel.get(level) == happenedCompleteFieldPerLevel.get(level); } - boolean allFetchesHappened(int level) { - return fetchCountPerLevel.get(level) == expectedFetchCountPerLevel.get(level); - } void clearDispatchLevels() { dispatchedLevels.clear(); @@ -272,7 +200,7 @@ void clearDispatchLevels() { public String toString() { return "CallStack{" + "expectedFetchCountPerLevel=" + expectedFetchCountPerLevel + - ", fetchCountPerLevel=" + fetchCountPerLevel + + ", fetchCountPerLevel=" + happenedFetchCountPerLevel + // ", expectedExecuteObjectCallsPerLevel=" + expectedExecuteObjectCallsPerLevel + // ", happenedExecuteObjectCallsPerLevel=" + happenedExecuteObjectCallsPerLevel + // ", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel + @@ -287,8 +215,13 @@ public void setDispatchedLevel(int level) { } } - public void clearHappenedCompleteFields() { - this.happenedCompleteFieldPerLevel.clear(); + public void clear() { + dispatchedLevels.clear(); + happenedExecuteObjectCallsPerLevel.clear(); + expectedFetchCountPerLevel.clear(); + happenedFetchCountPerLevel.clear(); + happenedCompletionFinishedCountPerLevel.clear(); + } } @@ -307,7 +240,11 @@ public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) { @Override public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { Assert.assertTrue(parameters.getExecutionStepInfo().getPath().isRootPath()); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, fieldCount, initialCallStack); +// System.out.println("execution strategy started"); + synchronized (initialCallStack) { + initialCallStack.happenedExecuteObjectCallsPerLevel.set(0, 1); + initialCallStack.expectedFetchCountPerLevel.set(1, fieldCount); + } } @Override @@ -315,146 +252,67 @@ public void executionSerialStrategy(ExecutionContext executionContext, Execution CallStack callStack = getCallStack(parameters); resetCallStack(callStack); // field count is always 1 for serial execution - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, 1, callStack); - } - -// @Override -// public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { -// CallStack callStack = getCallStack(parameters); -// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1, callStack); -// } - -// @Override -// public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { -// CallStack callStack = getCallStack(parameters); -// synchronized (callStack) { -// callStack.increaseHappenedOnFieldValueCalls(1); -// } -// } - - private CallStack getCallStack(ExecutionStrategyParameters parameters) { - return getCallStack(parameters.getDeferredCallContext()); - } - - private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallContext) { - if (alternativeCallContext == null) { - return this.initialCallStack; - } else { - return alternativeCallContextMap.computeIfAbsent(alternativeCallContext, k -> { - CallStack callStack = new CallStack(); - System.out.println("new callstack: " + callStack); - int startLevel = alternativeCallContext.getStartLevel(); - int fields = alternativeCallContext.getFields(); - callStack.expectedFetchCountPerLevel.set(1, 1); - callStack.fetchCountPerLevel.set(1, 1); - return callStack; - }); + synchronized (callStack) { + callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); + callStack.expectedFetchCountPerLevel.set(1, 1); } } @Override - public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { + public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); - int curLevel = parameters.getPath().getLevel(); - increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel + 1, fieldCount, callStack); - } - -// @Override -// public void executeObjectOnFieldValuesInfo -// (List fieldValueInfoList, ExecutionStrategyParameters parameters) { -// // the level of the sub selection that is fully fetched is one level more than parameters level -// int curLevel = parameters.getPath().getLevel() + 1; -// CallStack callStack = getCallStack(parameters); -// onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, callStack); -// } - +// System.out.println("1st level fields completed"); + onCompletionFinished(0, callStack); - @Override - public void newSubscriptionExecution(FieldValueInfo fieldValueInfo, AlternativeCallContext alternativeCallContext) { - CallStack callStack = getCallStack(alternativeCallContext); - callStack.increaseHappenedFetchCount(1); - callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); -// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, 1, callStack); } @Override - public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable - throwable, ExecutionStrategyParameters parameters) { + public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); - boolean ready; - synchronized (callStack) { - callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); - Assert.assertNotNull(parameters.getDeferredCallContext()); - ready = callStack.deferredFragmentRootFieldsFetched.size() == parameters.getDeferredCallContext().getFields(); - } -// if (ready) { -// int curLevel = parameters.getPath().getLevel(); -// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, curLevel, callStack); -// } + onCompletionFinished(0, callStack); } -// @Override -// public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { -// CallStack callStack = getCallStack(parameters); -// // the level of the sub selection that is errored is one level more than parameters level -// int curLevel = parameters.getPath().getLevel() + 1; -// synchronized (callStack) { -// callStack.increaseHappenedOnFieldValueCalls(curLevel); -// } -// } -// - private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, - int fieldCount, - CallStack callStack) { + @Override + public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { + CallStack callStack = getCallStack(parameters); + int curLevel = parameters.getPath().getLevel(); +// System.out.println("execute object " + curLevel + " at " + parameters.getPath() ); synchronized (callStack) { - callStack.increaseExpectedFetchCount(curLevel, fieldCount); + callStack.happenedExecuteObjectCallsPerLevel.increment(curLevel, 1); + callStack.expectedFetchCountPerLevel.increment(curLevel + 1, fieldCount); } } - private void resetCallStack(CallStack callStack) { - synchronized (callStack) { - callStack.clearDispatchLevels(); -// callStack.clearExpectedObjectCalls(); - callStack.clearHappenedCompleteFields(); - callStack.clearExpectedFetchCount(); - callStack.clearFetchCount(); -// callStack.clearHappenedExecuteObjectCalls(); -// callStack.clearHappenedOnFieldValueCalls(); -// callStack.expectedExecuteObjectCallsPerLevel.set(1, 1); - callStack.chainedDLStack.clear(); - } + @Override + public void executeObjectOnFieldValuesInfo + (List fieldValueInfoList, ExecutionStrategyParameters parameters) { + int curLevel = parameters.getPath().getLevel(); + CallStack callStack = getCallStack(parameters); +// System.out.println("completion finished at " + curLevel + " at " + parameters.getPath() ); + onCompletionFinished(curLevel, callStack); } -// private void onFieldValuesInfoDispatchIfNeeded(List fieldValueInfoList, -// int subSelectionLevel, -// CallStack callStack) { -// Integer dispatchLevel; -// synchronized (callStack) { -// dispatchLevel = handleSubSelectionFetched(fieldValueInfoList, subSelectionLevel, callStack); -// } -// // the handle on field values check for the next level if it is ready -// if (dispatchLevel != null) { -// dispatch(dispatchLevel, callStack); -// } -// } - @Override - public void fieldCompleted(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters parameters) { - int level = parameters.getPath().getLevel(); - System.out.println("field completed at level: " + level + " at: " + parameters.getPath()); + public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); + int curLevel = parameters.getPath().getLevel(); + onCompletionFinished(curLevel, callStack); + } + + private void onCompletionFinished(int level, CallStack callStack) { synchronized (callStack) { - callStack.happenedCompleteFieldPerLevel.increment(level, 1); + callStack.happenedCompletionFinishedCountPerLevel.increment(level, 1); } - int currentLevel = parameters.getPath().getLevel() + 1; + // on completion might mark multiple higher levels as ready + int currentLevel = level + 2; while (true) { boolean levelReady; synchronized (callStack) { if (callStack.dispatchedLevels.contains(currentLevel)) { break; } - levelReady = dispatchIfNeeded(currentLevel, callStack); + levelReady = markLevelAsDispatchedIfReady(currentLevel, callStack); } if (levelReady) { dispatch(currentLevel, callStack); @@ -463,47 +321,9 @@ public void fieldCompleted(FieldValueInfo fieldValueInfo, ExecutionStrategyParam } currentLevel++; } - } - // -// thread safety: called with callStack.lock -// -// private @Nullable Integer handleSubSelectionFetched(List fieldValueInfos, int subSelectionLevel, CallStack -// callStack) { -// System.out.println("sub selection fetched at level :" + subSelectionLevel); -// callStack.increaseHappenedOnFieldValueCalls(subSelectionLevel); -// int expectedOnObjectCalls = getObjectCountForList(fieldValueInfos); -// // we expect on the next level of the current sub selection #expectedOnObjectCalls execute object calls -// callStack.increaseExpectedExecuteObjectCalls(subSelectionLevel + 1, expectedOnObjectCalls); -// -// // maybe the object calls happened already (because the DataFetcher return directly values synchronously) -// // therefore we check the next levels if they are ready -// // if data loader chaining is disabled (the old algo) the level we dispatch is not really relevant as -// // we dispatch the whole registry anyway -// -// if (checkLevelImpl(subSelectionLevel + 1, callStack)) { -// return subSelectionLevel + 1; -// } else { -// return null; -// } -// } - - /** - * the amount of (non nullable) objects that will require an execute object call - */ - private int getObjectCountForList(List fieldValueInfos) { - int result = 0; - for (FieldValueInfo fieldValueInfo : fieldValueInfos) { - if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.OBJECT) { - result += 1; - } else if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.LIST) { - result += getObjectCountForList(fieldValueInfo.getFieldValueInfos()); - } - } - return result; } - @Override public void fieldFetched(ExecutionContext executionContext, ExecutionStrategyParameters executionStrategyParameters, @@ -514,9 +334,8 @@ public void fieldFetched(ExecutionContext executionContext, int level = executionStrategyParameters.getPath().getLevel(); boolean dispatchNeeded; synchronized (callStack) { - System.out.println("field fetched at level " + level + " - " + executionStrategyParameters.getPath()); - callStack.increaseHappenedFetchCount(level); - dispatchNeeded = dispatchIfNeeded(level, callStack); + callStack.happenedFetchCountPerLevel.increment(level, 1); + dispatchNeeded = markLevelAsDispatchedIfReady(level, callStack); } if (dispatchNeeded) { dispatch(level, callStack); @@ -525,11 +344,67 @@ public void fieldFetched(ExecutionContext executionContext, } - // -// thread safety : called with callStack.lock + @Override + public void newSubscriptionExecution(FieldValueInfo fieldValueInfo, AlternativeCallContext alternativeCallContext) { + CallStack callStack = getCallStack(alternativeCallContext); + synchronized (callStack) { + callStack.happenedFetchCountPerLevel.increment(1, 1); + } + callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); +// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, 1, callStack); + } + + @Override + public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable + throwable, ExecutionStrategyParameters parameters) { + CallStack callStack = getCallStack(parameters); + boolean ready; + synchronized (callStack) { + callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); + Assert.assertNotNull(parameters.getDeferredCallContext()); + ready = callStack.deferredFragmentRootFieldsFetched.size() == parameters.getDeferredCallContext().getFields(); + } +// if (ready) { +// int curLevel = parameters.getPath().getLevel(); +// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, curLevel, callStack); +// } + } + // - private boolean dispatchIfNeeded(int level, CallStack callStack) { - boolean ready = checkLevelImpl(level, callStack); + + private CallStack getCallStack(ExecutionStrategyParameters parameters) { + return getCallStack(parameters.getDeferredCallContext()); + } + + private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallContext) { + if (alternativeCallContext == null) { + return this.initialCallStack; + } else { + return alternativeCallContextMap.computeIfAbsent(alternativeCallContext, k -> { + CallStack callStack = new CallStack(); +// System.out.println("new callstack: " + callStack); + int startLevel = alternativeCallContext.getStartLevel(); + int fields = alternativeCallContext.getFields(); + callStack.expectedFetchCountPerLevel.set(1, 1); + callStack.happenedFetchCountPerLevel.set(1, 1); + return callStack; + }); + } + } + + + private void resetCallStack(CallStack callStack) { + synchronized (callStack) { + callStack.clear(); + callStack.chainedDLStack.clear(); + } + } + +// + + private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { + boolean ready = isLevelReady(level, callStack); +// System.out.println("markLevelAsDispatchedIfReady level: " + level + " ready: " + ready); if (ready) { callStack.setDispatchedLevel(level); return true; @@ -537,62 +412,37 @@ private boolean dispatchIfNeeded(int level, CallStack callStack) { return false; } - // -// thread safety: called with callStack.lock -// -// private @Nullable Integer getHighestReadyLevel(int startFrom, CallStack callStack) { -// while (true) { -// if (!checkLevelImpl(curLevel + 1, callStack)) { -// callStack.highestReadyLevel = curLevel; -// return curLevel >= startFrom ? curLevel : null; -// } -// curLevel++; -// } -// } - -// private boolean checkLevelBeingReady(int level, CallStack callStack) { -// Assert.assertTrue(level > 0); -// -// for (int i = callStack.highestReadyLevel + 1; i <= level; i++) { -// if (!checkLevelImpl(i, callStack)) { -// return false; -// } -// } -// callStack.highestReadyLevel = level; -// return true; -// } - private boolean checkLevelImpl(int level, CallStack callStack) { - System.out.println("checkLevelImpl " + level); + private boolean isLevelReady(int level, CallStack callStack) { // a level with zero expectations can't be ready - if (callStack.expectedFetchCountPerLevel.get(level) == 0) { + int expectedFetchCount = callStack.expectedFetchCountPerLevel.get(level); + if (expectedFetchCount == 0) { return false; } - // all fetches happened - if (!callStack.allFetchesHappened(level)) { + if (expectedFetchCount != callStack.happenedFetchCountPerLevel.get(level)) { return false; } - - int levelTmp = level - 1; - while (levelTmp >= 1) { - if (!callStack.allFieldsCompleted(levelTmp)) { - return false; - } - levelTmp--; + if (level == 1) { + // for the root fields we just expect that they were all fetched + return true; } - System.out.println("check ready " + level); - return true; + + // we expect that parent has been dispatched and that all parents fields are completed + // all parent fields completed means all parent parent on completions finished calls must have happened + return callStack.dispatchedLevels.contains(level - 1) && + callStack.happenedExecuteObjectCallsPerLevel.get(level - 2) == callStack.happenedCompletionFinishedCountPerLevel.get(level - 2); + } void dispatch(int level, CallStack callStack) { +// System.out.println("dispatching at " + level); if (!enableDataLoaderChaining) { profiler.oldStrategyDispatchingAll(level); DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); dispatchAll(dataLoaderRegistry, level); return; } -// System.out.println("dispatching " + level); dispatchDLCFImpl(level, callStack, true, false); } From 0436e5d9155acc5548089e7b9f99939b287c0f93 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 12:15:55 +1000 Subject: [PATCH 04/13] subscription works --- .../execution/DataLoaderDispatchStrategy.java | 5 ++- .../SubscriptionExecutionStrategy.java | 4 +- .../PerLevelDataLoaderDispatchStrategy.java | 37 +++++++++++++------ 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index a169b1cb93..88ec0270ec 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -61,8 +61,11 @@ default void fieldFetched(ExecutionContext executionContext, } + default void newSubscriptionExecution(AlternativeCallContext alternativeCallContext) { - default void newSubscriptionExecution(FieldValueInfo fieldValueInfo, AlternativeCallContext alternativeCallContext) { + } + + default void subscriptionEventCompletionDone(AlternativeCallContext alternativeCallContext) { } } diff --git a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java index d2da978471..50cfcb4bca 100644 --- a/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java +++ b/src/main/java/graphql/execution/SubscriptionExecutionStrategy.java @@ -168,9 +168,11 @@ private CompletableFuture executeSubscriptionEvent(ExecutionCon i13nFieldParameters, executionContext.getInstrumentationState() )); + + executionContext.getDataLoaderDispatcherStrategy().newSubscriptionExecution(newParameters.getDeferredCallContext()); Object fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, newParameters, eventPayload); FieldValueInfo fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue); - executionContext.getDataLoaderDispatcherStrategy().newSubscriptionExecution(fieldValueInfo, newParameters.getDeferredCallContext()); + executionContext.getDataLoaderDispatcherStrategy().subscriptionEventCompletionDone(newParameters.getDeferredCallContext()); CompletableFuture overallResult = fieldValueInfo .getFieldValueFuture() .thenApply(val -> new ExecutionResultImpl(val, newParameters.getDeferredCallContext().getErrors())) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 4a7dde97f0..cefd0fa0da 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -277,7 +277,7 @@ public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrate public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { CallStack callStack = getCallStack(parameters); int curLevel = parameters.getPath().getLevel(); -// System.out.println("execute object " + curLevel + " at " + parameters.getPath() ); +// System.out.println("execute object " + curLevel + " at " + parameters.getPath() + " with callstack " + callStack.hashCode()); synchronized (callStack) { callStack.happenedExecuteObjectCallsPerLevel.increment(curLevel, 1); callStack.expectedFetchCountPerLevel.increment(curLevel + 1, fieldCount); @@ -332,6 +332,7 @@ public void fieldFetched(ExecutionContext executionContext, Supplier dataFetchingEnvironment) { CallStack callStack = getCallStack(executionStrategyParameters); int level = executionStrategyParameters.getPath().getLevel(); +// System.out.println("field fetched at: " + level + " path: " + executionStrategyParameters.getPath() + " callStack: " + callStack.hashCode()); boolean dispatchNeeded; synchronized (callStack) { callStack.happenedFetchCountPerLevel.increment(level, 1); @@ -345,13 +346,22 @@ public void fieldFetched(ExecutionContext executionContext, @Override - public void newSubscriptionExecution(FieldValueInfo fieldValueInfo, AlternativeCallContext alternativeCallContext) { + public void newSubscriptionExecution(AlternativeCallContext alternativeCallContext) { + CallStack callStack = new CallStack(); + alternativeCallContextMap.put(alternativeCallContext, callStack); + + } + + @Override + public void subscriptionEventCompletionDone(AlternativeCallContext alternativeCallContext) { CallStack callStack = getCallStack(alternativeCallContext); + // this means the single root field is completed (it was never "fetched" because it is + // the event payload) and we can mark level 1 (root fields) as dispatched and level 0 as completed synchronized (callStack) { - callStack.happenedFetchCountPerLevel.increment(1, 1); + callStack.dispatchedLevels.add(1); + callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); } - callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); -// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, 1, callStack); + onCompletionFinished(0, callStack); } @Override @@ -382,11 +392,17 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC } else { return alternativeCallContextMap.computeIfAbsent(alternativeCallContext, k -> { CallStack callStack = new CallStack(); -// System.out.println("new callstack: " + callStack); - int startLevel = alternativeCallContext.getStartLevel(); - int fields = alternativeCallContext.getFields(); - callStack.expectedFetchCountPerLevel.set(1, 1); - callStack.happenedFetchCountPerLevel.set(1, 1); +// System.out.println("new callstack : " + callStack.hashCode()); + // for subscriptions there is only root field which is already fetched +// callStack.expectedFetchCountPerLevel.set(1, 1); +// callStack.happenedFetchCountPerLevel.set(1, 1); +// // the level 0 is done +// callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); +// callStack.happenedCompletionFinishedCountPerLevel.set(0, 1); +// // level is 1 already dispatched +// callStack.setDispatchedLevel(1); +// int startLevel = alternativeCallContext.getStartLevel(); +// int fields = alternativeCallContext.getFields(); return callStack; }); } @@ -400,7 +416,6 @@ private void resetCallStack(CallStack callStack) { } } -// private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { boolean ready = isLevelReady(level, callStack); From b25724b28582c893d612c8ee6f00b06a22abc037 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 13:37:58 +1000 Subject: [PATCH 05/13] fix defer support --- .../PerLevelDataLoaderDispatchStrategy.java | 41 +++++++++++-------- ...eferExecutionSupportIntegrationTest.groovy | 2 - .../dataloader/DeferWithDataLoaderTest.groovy | 7 ---- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index cefd0fa0da..b1ef445de2 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -186,7 +186,7 @@ private static class CallStack { public ChainedDLStack chainedDLStack = new ChainedDLStack(); - private final List deferredFragmentRootFieldsFetched = new ArrayList<>(); + private int deferredFragmentRootFieldsCompleted; public CallStack() { } @@ -370,14 +370,14 @@ public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo CallStack callStack = getCallStack(parameters); boolean ready; synchronized (callStack) { - callStack.deferredFragmentRootFieldsFetched.add(fieldValueInfo); + callStack.deferredFragmentRootFieldsCompleted++; Assert.assertNotNull(parameters.getDeferredCallContext()); - ready = callStack.deferredFragmentRootFieldsFetched.size() == parameters.getDeferredCallContext().getFields(); + ready = callStack.deferredFragmentRootFieldsCompleted == parameters.getDeferredCallContext().getFields(); } -// if (ready) { -// int curLevel = parameters.getPath().getLevel(); -// onFieldValuesInfoDispatchIfNeeded(callStack.deferredFragmentRootFieldsFetched, curLevel, callStack); -// } + if (ready) { + onCompletionFinished(parameters.getDeferredCallContext().getStartLevel() - 1, callStack); + } + } // @@ -393,16 +393,21 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC return alternativeCallContextMap.computeIfAbsent(alternativeCallContext, k -> { CallStack callStack = new CallStack(); // System.out.println("new callstack : " + callStack.hashCode()); - // for subscriptions there is only root field which is already fetched -// callStack.expectedFetchCountPerLevel.set(1, 1); -// callStack.happenedFetchCountPerLevel.set(1, 1); -// // the level 0 is done -// callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); -// callStack.happenedCompletionFinishedCountPerLevel.set(0, 1); -// // level is 1 already dispatched -// callStack.setDispatchedLevel(1); -// int startLevel = alternativeCallContext.getStartLevel(); -// int fields = alternativeCallContext.getFields(); + // on which level the fields are + int startLevel = k.getStartLevel(); + // how many fields are deferred on this level + int fields = k.getFields(); + if (startLevel > 1) { + // parent level is considered dispatched all fields completed + callStack.dispatchedLevels.add(startLevel - 1); + callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 2, 1); + callStack.happenedCompletionFinishedCountPerLevel.set(startLevel - 2, 1); + } + // the parent will have one completion therefore we set the expectation to 1 + callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); + + // for the current level we set the fetch expectations + callStack.expectedFetchCountPerLevel.set(startLevel, fields); return callStack; }); } @@ -419,7 +424,7 @@ private void resetCallStack(CallStack callStack) { private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { boolean ready = isLevelReady(level, callStack); -// System.out.println("markLevelAsDispatchedIfReady level: " + level + " ready: " + ready); +// System.out.println("markLevelAsDispatchedIfReady level: " + level + " ready: " + ready + " callstack: " + callStack.hashCode()); if (ready) { callStack.setDispatchedLevel(level); return true; diff --git a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy index 8afa04300f..b3b522d90b 100644 --- a/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy +++ b/src/test/groovy/graphql/execution/incremental/DeferExecutionSupportIntegrationTest.groovy @@ -23,7 +23,6 @@ import org.dataloader.DataLoader import org.dataloader.DataLoaderFactory import org.dataloader.DataLoaderRegistry import org.reactivestreams.Publisher -import spock.lang.Ignore import spock.lang.Specification import spock.lang.Unroll @@ -1691,7 +1690,6 @@ class DeferExecutionSupportIntegrationTest extends Specification { } - @Ignore("tmp not working") def "dataloader used inside defer"() { given: def query = ''' diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy index 5df5837bf0..362206f64b 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy @@ -11,7 +11,6 @@ import org.awaitility.Awaitility import org.dataloader.BatchLoader import org.dataloader.DataLoaderFactory import org.dataloader.DataLoaderRegistry -import spock.lang.Ignore import spock.lang.RepeatUntilFailure import spock.lang.Specification @@ -61,7 +60,6 @@ class DeferWithDataLoaderTest extends Specification { } } - @Ignore def "query with single deferred field"() { given: def query = getQuery(true, false) @@ -106,7 +104,6 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 3 } - @Ignore def "multiple fields on same defer block"() { given: def query = """ @@ -180,7 +177,6 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 0 } - @Ignore def "query with nested deferred fields"() { given: def query = getQuery(true, true) @@ -232,7 +228,6 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 9 } - @Ignore def "query with top-level deferred field"() { given: def query = """ @@ -296,7 +291,6 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 0 } - @Ignore def "query with multiple deferred fields"() { given: def query = getExpensiveQuery(true) @@ -354,7 +348,6 @@ class DeferWithDataLoaderTest extends Specification { batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1 } - @Ignore @RepeatUntilFailure(maxAttempts = 50, ignoreRest = false) def "dataloader in initial result and chained dataloader inside nested defer block"() { given: From 117ebe180ac1a5e10c1f03bf8f80b3703c45e10b Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 13:58:14 +1000 Subject: [PATCH 06/13] cleanup --- .../PerLevelDataLoaderDispatchStrategy.java | 101 +++--------------- 1 file changed, 13 insertions(+), 88 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index b1ef445de2..de253cc9e6 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -134,48 +134,6 @@ public void clear() { private static class CallStack { - - /** - * A general overview of teh tracked data: - * There are three aspects tracked per level: - * - number of expected and happened execute object calls (executeObject) - * - number of expected and happened fetches - * - number of happened sub selections finished fetching - *

- * The level for an execute object call is the level of sub selection of the object: for - * { a {b {c}}} the level of "execute object a" is 2 - *

- * For fetches the level is the level of the field fetched - *

- * For sub selections finished it is the level of the fields inside the sub selection: - * {a1 { b c} a2 } the level of {a1 a2} is 1, the level of {b c} is 2 - *

- * The main aspect for when a level is ready is when all expected fetch call happened, meaning - * we can dispatch this level as all data loaders in this level have been called - * (if the number of expected fetches is correct). - *

- * The number of expected fetches is increased with every executeObject (based on the number of subselection - * fields for the execute object). - * Execute Object a (on level 2) with { a {f1 f2 f3} } means we expect 3 fetches on level 2. - *

- * A finished subselection means we can predict the number of execute object calls in the next level as the subselection: - * { a {x} b {y} } - * If a is a list of 3 objects and b is a list of 2 objects we expect 3 + 2 = 5 execute object calls on the level 2 to be happening - *

- * The finished sub selection is the only "cross level" event: a finished sub selections impacts the expected execute - * object calls on the next level. - *

- *

- * This means we know a level is ready to be dispatched if: - * - all expected fetched happened in the current level - * - all expected execute objects calls happened in the current level (because they inform the expected fetches) - * - all expected sub selections happened in the parent level (because they inform the expected execute object in the current level). - * The expected sub selections are equal to the expected object calls (in the parent level) - * - All expected sub selections happened in the parent parent level (again: meaning #happenedSubSelections == #expectedExecuteObjectCalls) - * - And so until the first level - */ - - private final LevelMap expectedFetchCountPerLevel = new LevelMap(); private final LevelMap happenedFetchCountPerLevel = new LevelMap(); private final LevelMap happenedCompletionFinishedCountPerLevel = new LevelMap(); @@ -192,37 +150,14 @@ public CallStack() { } - void clearDispatchLevels() { - dispatchedLevels.clear(); - } - - @Override - public String toString() { - return "CallStack{" + - "expectedFetchCountPerLevel=" + expectedFetchCountPerLevel + - ", fetchCountPerLevel=" + happenedFetchCountPerLevel + -// ", expectedExecuteObjectCallsPerLevel=" + expectedExecuteObjectCallsPerLevel + -// ", happenedExecuteObjectCallsPerLevel=" + happenedExecuteObjectCallsPerLevel + -// ", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel + - ", dispatchedLevels" + dispatchedLevels + - '}'; - } - - - public void setDispatchedLevel(int level) { - if (!dispatchedLevels.add(level)) { - Assert.assertShouldNeverHappen("level " + level + " already dispatched"); - } - } - public void clear() { dispatchedLevels.clear(); happenedExecuteObjectCallsPerLevel.clear(); expectedFetchCountPerLevel.clear(); happenedFetchCountPerLevel.clear(); happenedCompletionFinishedCountPerLevel.clear(); - - + deferredFragmentRootFieldsCompleted = 0; + chainedDLStack.clear(); } } @@ -240,7 +175,6 @@ public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) { @Override public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { Assert.assertTrue(parameters.getExecutionStepInfo().getPath().isRootPath()); -// System.out.println("execution strategy started"); synchronized (initialCallStack) { initialCallStack.happenedExecuteObjectCallsPerLevel.set(0, 1); initialCallStack.expectedFetchCountPerLevel.set(1, fieldCount); @@ -250,10 +184,10 @@ public void executionStrategy(ExecutionContext executionContext, ExecutionStrate @Override public void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); - resetCallStack(callStack); - // field count is always 1 for serial execution + callStack.clear(); synchronized (callStack) { callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); + // field count is always 1 for serial execution callStack.expectedFetchCountPerLevel.set(1, 1); } } @@ -261,7 +195,6 @@ public void executionSerialStrategy(ExecutionContext executionContext, Execution @Override public void executionStrategyOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); -// System.out.println("1st level fields completed"); onCompletionFinished(0, callStack); } @@ -277,7 +210,6 @@ public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrate public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { CallStack callStack = getCallStack(parameters); int curLevel = parameters.getPath().getLevel(); -// System.out.println("execute object " + curLevel + " at " + parameters.getPath() + " with callstack " + callStack.hashCode()); synchronized (callStack) { callStack.happenedExecuteObjectCallsPerLevel.increment(curLevel, 1); callStack.expectedFetchCountPerLevel.increment(curLevel + 1, fieldCount); @@ -289,7 +221,6 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa (List fieldValueInfoList, ExecutionStrategyParameters parameters) { int curLevel = parameters.getPath().getLevel(); CallStack callStack = getCallStack(parameters); -// System.out.println("completion finished at " + curLevel + " at " + parameters.getPath() ); onCompletionFinished(curLevel, callStack); } @@ -332,7 +263,6 @@ public void fieldFetched(ExecutionContext executionContext, Supplier dataFetchingEnvironment) { CallStack callStack = getCallStack(executionStrategyParameters); int level = executionStrategyParameters.getPath().getLevel(); -// System.out.println("field fetched at: " + level + " path: " + executionStrategyParameters.getPath() + " callStack: " + callStack.hashCode()); boolean dispatchNeeded; synchronized (callStack) { callStack.happenedFetchCountPerLevel.increment(level, 1); @@ -380,7 +310,6 @@ public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo } -// private CallStack getCallStack(ExecutionStrategyParameters parameters) { return getCallStack(parameters.getDeferredCallContext()); @@ -391,14 +320,18 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC return this.initialCallStack; } else { return alternativeCallContextMap.computeIfAbsent(alternativeCallContext, k -> { + /* + This is only for handling deferred cases. Subscription cases will also get a new callStack, but + it is explicitly created in `newSubscriptionExecution`. + The reason we are doing this lazily is, because we don't have explicit startDeferred callback. + */ CallStack callStack = new CallStack(); -// System.out.println("new callstack : " + callStack.hashCode()); // on which level the fields are int startLevel = k.getStartLevel(); // how many fields are deferred on this level int fields = k.getFields(); if (startLevel > 1) { - // parent level is considered dispatched all fields completed + // parent level is considered dispatched and all fields completed callStack.dispatchedLevels.add(startLevel - 1); callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 2, 1); callStack.happenedCompletionFinishedCountPerLevel.set(startLevel - 2, 1); @@ -414,19 +347,12 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC } - private void resetCallStack(CallStack callStack) { - synchronized (callStack) { - callStack.clear(); - callStack.chainedDLStack.clear(); - } - } - - private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { boolean ready = isLevelReady(level, callStack); -// System.out.println("markLevelAsDispatchedIfReady level: " + level + " ready: " + ready + " callstack: " + callStack.hashCode()); if (ready) { - callStack.setDispatchedLevel(level); + if (!callStack.dispatchedLevels.add(level)) { + Assert.assertShouldNeverHappen("level " + level + " already dispatched"); + } return true; } return false; @@ -456,7 +382,6 @@ private boolean isLevelReady(int level, CallStack callStack) { } void dispatch(int level, CallStack callStack) { -// System.out.println("dispatching at " + level); if (!enableDataLoaderChaining) { profiler.oldStrategyDispatchingAll(level); DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); From a5d29a6a4f5d68fb2ab43f1d9fecf45b293ce336 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 14:01:12 +1000 Subject: [PATCH 07/13] cleanup --- .../java/graphql/execution/DataLoaderDispatchStrategy.java | 4 ---- src/main/java/graphql/execution/ExecutionStrategy.java | 1 - 2 files changed, 5 deletions(-) diff --git a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java index 88ec0270ec..8763d650f2 100644 --- a/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/DataLoaderDispatchStrategy.java @@ -40,10 +40,6 @@ default void executeObjectOnFieldValuesInfo(List fieldValueInfoL } - default void fieldCompleted(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) { - - } - default void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) { } diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index 8d29152b4a..a6061a15ce 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -638,7 +638,6 @@ private FieldValueInfo completeField(GraphQLFieldDefinition fieldDef, ExecutionC ); FieldValueInfo fieldValueInfo = completeValue(executionContext, newParameters); - executionContext.getDataLoaderDispatcherStrategy().fieldCompleted(fieldValueInfo, parameters); ctxCompleteField.onDispatched(); if (fieldValueInfo.isFutureValue()) { CompletableFuture executionResultFuture = fieldValueInfo.getFieldValueFuture(); From 2686c32e64cbd240b63ef7e7e345e19daec9e654 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 19:32:43 +1000 Subject: [PATCH 08/13] no per level tracking anymore --- .../performance/DataLoaderPerformance.java | 36 +++++++------- .../PerLevelDataLoaderDispatchStrategy.java | 48 ++++++++++++------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/src/jmh/java/performance/DataLoaderPerformance.java b/src/jmh/java/performance/DataLoaderPerformance.java index d816367716..20e144abd8 100644 --- a/src/jmh/java/performance/DataLoaderPerformance.java +++ b/src/jmh/java/performance/DataLoaderPerformance.java @@ -4,7 +4,6 @@ import graphql.ExecutionInput; import graphql.ExecutionResult; import graphql.GraphQL; -import graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys; import graphql.schema.DataFetcher; import graphql.schema.GraphQLSchema; import graphql.schema.idl.RuntimeWiring; @@ -15,28 +14,16 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderRegistry; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -@State(Scope.Benchmark) -@Warmup(iterations = 2, time = 5) -@Measurement(iterations = 3) -@Fork(2) public class DataLoaderPerformance { static Owner o1 = new Owner("O-1", "Andi", List.of("P-1", "P-2", "P-3")); @@ -573,9 +560,6 @@ public void setup() { } - @Benchmark - @BenchmarkMode(Mode.AverageTime) - @OutputTimeUnit(TimeUnit.MILLISECONDS) public void executeRequestWithDataLoaders(MyState myState, Blackhole blackhole) { DataLoader ownerDL = DataLoaderFactory.newDataLoader(ownerBatchLoader); DataLoader petDL = DataLoaderFactory.newDataLoader(petBatchLoader); @@ -587,14 +571,30 @@ public void executeRequestWithDataLoaders(MyState myState, Blackhole blackhole) .dataLoaderRegistry(registry) // .profileExecution(true) .build(); - executionInput.getGraphQLContext().put(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING, true); +// executionInput.getGraphQLContext().put(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING, true); ExecutionResult execute = myState.graphQL.execute(executionInput); // ProfilerResult profilerResult = executionInput.getGraphQLContext().get(ProfilerResult.PROFILER_CONTEXT_KEY); -// System.out.println(profilerResult.shortSummaryMap()); +// System.out.println("execute: " + execute); Assert.assertTrue(execute.isDataPresent()); Assert.assertTrue(execute.getErrors().isEmpty()); blackhole.consume(execute); } + public static void main(String[] args) { + DataLoaderPerformance dataLoaderPerformance = new DataLoaderPerformance(); + MyState myState = new MyState(); + myState.setup(); + Blackhole blackhole = new Blackhole("Today's password is swordfish. I understand instantiating Blackholes directly is dangerous."); + for (int i = 0; i < 1; i++) { + dataLoaderPerformance.executeRequestWithDataLoaders(myState, blackhole); + } +// System.out.println(PerLevelDataLoaderDispatchStrategy.fieldFetchedCount); +// System.out.println(PerLevelDataLoaderDispatchStrategy.onCompletionFinishedCount); +// System.out.println(PerLevelDataLoaderDispatchStrategy.isReadyCounter); +// System.out.println(Duration.ofNanos(PerLevelDataLoaderDispatchStrategy.isReadyCounterNS.get()).toMillis()); + + + } + } diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index de253cc9e6..89af3014fa 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -17,6 +17,7 @@ import org.jspecify.annotations.Nullable; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -139,7 +140,7 @@ private static class CallStack { private final LevelMap happenedCompletionFinishedCountPerLevel = new LevelMap(); private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); - private final Set dispatchedLevels = ConcurrentHashMap.newKeySet(); + private final Set dispatchedLevels = new LinkedHashSet<>(); public ChainedDLStack chainedDLStack = new ChainedDLStack(); @@ -228,9 +229,11 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); int curLevel = parameters.getPath().getLevel(); +// System.out.println("completion finished for level " + curLevel); onCompletionFinished(curLevel, callStack); } + private void onCompletionFinished(int level, CallStack callStack) { synchronized (callStack) { callStack.happenedCompletionFinishedCountPerLevel.increment(level, 1); @@ -241,6 +244,7 @@ private void onCompletionFinished(int level, CallStack callStack) { boolean levelReady; synchronized (callStack) { if (callStack.dispatchedLevels.contains(currentLevel)) { +// System.out.println("failed because already dispatched"); break; } levelReady = markLevelAsDispatchedIfReady(currentLevel, callStack); @@ -248,6 +252,7 @@ private void onCompletionFinished(int level, CallStack callStack) { if (levelReady) { dispatch(currentLevel, callStack); } else { +// System.out.println("failed because level not ready"); break; } currentLevel++; @@ -255,6 +260,7 @@ private void onCompletionFinished(int level, CallStack callStack) { } + @Override public void fieldFetched(ExecutionContext executionContext, ExecutionStrategyParameters executionStrategyParameters, @@ -263,12 +269,20 @@ public void fieldFetched(ExecutionContext executionContext, Supplier dataFetchingEnvironment) { CallStack callStack = getCallStack(executionStrategyParameters); int level = executionStrategyParameters.getPath().getLevel(); - boolean dispatchNeeded; - synchronized (callStack) { - callStack.happenedFetchCountPerLevel.increment(level, 1); - dispatchNeeded = markLevelAsDispatchedIfReady(level, callStack); + boolean dispatchNeeded = false; +// System.out.println("field fetched for level " + level + " path: " + executionStrategyParameters.getPath()); + AlternativeCallContext deferredCallContext = executionStrategyParameters.getDeferredCallContext(); + if (level == 1 || (deferredCallContext != null && level == deferredCallContext.getStartLevel())) { + synchronized (callStack) { + callStack.happenedFetchCountPerLevel.increment(level, 1); + dispatchNeeded = callStack.expectedFetchCountPerLevel.get(level) == callStack.happenedFetchCountPerLevel.get(level); + if (dispatchNeeded) { + callStack.dispatchedLevels.add(level); + } + } } if (dispatchNeeded) { +// System.out.println("Success field fetch"); dispatch(level, callStack); } @@ -361,27 +375,25 @@ private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { private boolean isLevelReady(int level, CallStack callStack) { // a level with zero expectations can't be ready - int expectedFetchCount = callStack.expectedFetchCountPerLevel.get(level); - if (expectedFetchCount == 0) { - return false; - } +// int expectedFetchCount = callStack.expectedFetchCountPerLevel.get(level); +// if (expectedFetchCount == 0) { +// return false; +// } - if (expectedFetchCount != callStack.happenedFetchCountPerLevel.get(level)) { - return false; - } - if (level == 1) { - // for the root fields we just expect that they were all fetched - return true; - } +// if (expectedFetchCount != callStack.happenedFetchCountPerLevel.get(level)) { +// return false; +// } // we expect that parent has been dispatched and that all parents fields are completed // all parent fields completed means all parent parent on completions finished calls must have happened + int happenedExecuteObjectCalls = callStack.happenedExecuteObjectCallsPerLevel.get(level - 2); return callStack.dispatchedLevels.contains(level - 1) && - callStack.happenedExecuteObjectCallsPerLevel.get(level - 2) == callStack.happenedCompletionFinishedCountPerLevel.get(level - 2); + happenedExecuteObjectCalls > 0 && happenedExecuteObjectCalls == callStack.happenedCompletionFinishedCountPerLevel.get(level - 2); } void dispatch(int level, CallStack callStack) { +// System.out.println("dispatching " + level); if (!enableDataLoaderChaining) { profiler.oldStrategyDispatchingAll(level); DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); @@ -392,9 +404,11 @@ void dispatch(int level, CallStack callStack) { } private void dispatchAll(DataLoaderRegistry dataLoaderRegistry, int level) { +// System.out.println("dispatch level " + level); for (DataLoader dataLoader : dataLoaderRegistry.getDataLoaders()) { dataLoader.dispatch().whenComplete((objects, throwable) -> { if (objects != null && objects.size() > 0) { +// System.out.println("dispatching " + objects.size() + " objects for level " + level); Assert.assertNotNull(dataLoader.getName()); profiler.batchLoadedOldStrategy(dataLoader.getName(), level, objects.size()); } From a67e56bf9560538f39a45b3ce9bff728f37622fb Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 19:43:10 +1000 Subject: [PATCH 09/13] cleanup --- .../PerLevelDataLoaderDispatchStrategy.java | 41 +++++-------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 89af3014fa..4f0b8c0813 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -135,8 +135,8 @@ public void clear() { private static class CallStack { - private final LevelMap expectedFetchCountPerLevel = new LevelMap(); - private final LevelMap happenedFetchCountPerLevel = new LevelMap(); + private int expectedFirstLevelFetchCount; + private int happenedFirstLevelFetchCount; private final LevelMap happenedCompletionFinishedCountPerLevel = new LevelMap(); private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); @@ -154,8 +154,8 @@ public CallStack() { public void clear() { dispatchedLevels.clear(); happenedExecuteObjectCallsPerLevel.clear(); - expectedFetchCountPerLevel.clear(); - happenedFetchCountPerLevel.clear(); + expectedFirstLevelFetchCount = 0; + happenedFirstLevelFetchCount = 0; happenedCompletionFinishedCountPerLevel.clear(); deferredFragmentRootFieldsCompleted = 0; chainedDLStack.clear(); @@ -178,7 +178,7 @@ public void executionStrategy(ExecutionContext executionContext, ExecutionStrate Assert.assertTrue(parameters.getExecutionStepInfo().getPath().isRootPath()); synchronized (initialCallStack) { initialCallStack.happenedExecuteObjectCallsPerLevel.set(0, 1); - initialCallStack.expectedFetchCountPerLevel.set(1, fieldCount); + initialCallStack.expectedFirstLevelFetchCount = fieldCount; } } @@ -189,7 +189,7 @@ public void executionSerialStrategy(ExecutionContext executionContext, Execution synchronized (callStack) { callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); // field count is always 1 for serial execution - callStack.expectedFetchCountPerLevel.set(1, 1); + initialCallStack.expectedFirstLevelFetchCount = 1; } } @@ -213,13 +213,11 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa int curLevel = parameters.getPath().getLevel(); synchronized (callStack) { callStack.happenedExecuteObjectCallsPerLevel.increment(curLevel, 1); - callStack.expectedFetchCountPerLevel.increment(curLevel + 1, fieldCount); } } @Override - public void executeObjectOnFieldValuesInfo - (List fieldValueInfoList, ExecutionStrategyParameters parameters) { + public void executeObjectOnFieldValuesInfo(List fieldValueInfoList, ExecutionStrategyParameters parameters) { int curLevel = parameters.getPath().getLevel(); CallStack callStack = getCallStack(parameters); onCompletionFinished(curLevel, callStack); @@ -229,7 +227,6 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); int curLevel = parameters.getPath().getLevel(); -// System.out.println("completion finished for level " + curLevel); onCompletionFinished(curLevel, callStack); } @@ -238,13 +235,11 @@ private void onCompletionFinished(int level, CallStack callStack) { synchronized (callStack) { callStack.happenedCompletionFinishedCountPerLevel.increment(level, 1); } - // on completion might mark multiple higher levels as ready int currentLevel = level + 2; while (true) { boolean levelReady; synchronized (callStack) { if (callStack.dispatchedLevels.contains(currentLevel)) { -// System.out.println("failed because already dispatched"); break; } levelReady = markLevelAsDispatchedIfReady(currentLevel, callStack); @@ -252,7 +247,6 @@ private void onCompletionFinished(int level, CallStack callStack) { if (levelReady) { dispatch(currentLevel, callStack); } else { -// System.out.println("failed because level not ready"); break; } currentLevel++; @@ -270,19 +264,17 @@ public void fieldFetched(ExecutionContext executionContext, CallStack callStack = getCallStack(executionStrategyParameters); int level = executionStrategyParameters.getPath().getLevel(); boolean dispatchNeeded = false; -// System.out.println("field fetched for level " + level + " path: " + executionStrategyParameters.getPath()); AlternativeCallContext deferredCallContext = executionStrategyParameters.getDeferredCallContext(); if (level == 1 || (deferredCallContext != null && level == deferredCallContext.getStartLevel())) { synchronized (callStack) { - callStack.happenedFetchCountPerLevel.increment(level, 1); - dispatchNeeded = callStack.expectedFetchCountPerLevel.get(level) == callStack.happenedFetchCountPerLevel.get(level); + callStack.happenedFirstLevelFetchCount++; + dispatchNeeded = callStack.expectedFirstLevelFetchCount == callStack.happenedFirstLevelFetchCount; if (dispatchNeeded) { callStack.dispatchedLevels.add(level); } } } if (dispatchNeeded) { -// System.out.println("Success field fetch"); dispatch(level, callStack); } @@ -354,7 +346,7 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); // for the current level we set the fetch expectations - callStack.expectedFetchCountPerLevel.set(startLevel, fields); + callStack.expectedFirstLevelFetchCount = fields; return callStack; }); } @@ -374,16 +366,6 @@ private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { private boolean isLevelReady(int level, CallStack callStack) { - // a level with zero expectations can't be ready -// int expectedFetchCount = callStack.expectedFetchCountPerLevel.get(level); -// if (expectedFetchCount == 0) { -// return false; -// } - -// if (expectedFetchCount != callStack.happenedFetchCountPerLevel.get(level)) { -// return false; -// } - // we expect that parent has been dispatched and that all parents fields are completed // all parent fields completed means all parent parent on completions finished calls must have happened int happenedExecuteObjectCalls = callStack.happenedExecuteObjectCallsPerLevel.get(level - 2); @@ -393,7 +375,6 @@ private boolean isLevelReady(int level, CallStack callStack) { } void dispatch(int level, CallStack callStack) { -// System.out.println("dispatching " + level); if (!enableDataLoaderChaining) { profiler.oldStrategyDispatchingAll(level); DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry(); @@ -404,11 +385,9 @@ void dispatch(int level, CallStack callStack) { } private void dispatchAll(DataLoaderRegistry dataLoaderRegistry, int level) { -// System.out.println("dispatch level " + level); for (DataLoader dataLoader : dataLoaderRegistry.getDataLoaders()) { dataLoader.dispatch().whenComplete((objects, throwable) -> { if (objects != null && objects.size() > 0) { -// System.out.println("dispatching " + objects.size() + " objects for level " + level); Assert.assertNotNull(dataLoader.getName()); profiler.batchLoadedOldStrategy(dataLoader.getName(), level, objects.size()); } From 2da8fef1aac9b313b09cb6b3d86932592bfbb1ee Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 20:56:51 +1000 Subject: [PATCH 10/13] no synchronized/locking anymore --- .../PerLevelDataLoaderDispatchStrategy.java | 158 +++++++++++------- 1 file changed, 102 insertions(+), 56 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 4f0b8c0813..2cafbaa09e 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -17,12 +17,12 @@ import org.jspecify.annotations.Nullable; import java.util.ArrayList; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -135,29 +135,75 @@ public void clear() { private static class CallStack { - private int expectedFirstLevelFetchCount; - private int happenedFirstLevelFetchCount; - private final LevelMap happenedCompletionFinishedCountPerLevel = new LevelMap(); - private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap(); + static class StateForLevel { + private final int happenedCompletionFinishedCount; + private final int happenedExecuteObjectCalls; - private final Set dispatchedLevels = new LinkedHashSet<>(); + public StateForLevel() { + this.happenedCompletionFinishedCount = 0; + this.happenedExecuteObjectCalls = 0; + } + + public StateForLevel(int happenedCompletionFinishedCount, int happenedExecuteObjectCalls) { + this.happenedCompletionFinishedCount = happenedCompletionFinishedCount; + this.happenedExecuteObjectCalls = happenedExecuteObjectCalls; + } + + public StateForLevel(StateForLevel other) { + this.happenedCompletionFinishedCount = other.happenedCompletionFinishedCount; + this.happenedExecuteObjectCalls = other.happenedExecuteObjectCalls; + } + + public StateForLevel copy() { + return new StateForLevel(this); + } + + public StateForLevel increaseHappenedCompletionFinishedCount() { + return new StateForLevel(happenedCompletionFinishedCount + 1, happenedExecuteObjectCalls); + } + + public StateForLevel increaseHappenedExecuteObjectCalls() { + return new StateForLevel(happenedCompletionFinishedCount, happenedExecuteObjectCalls + 1); + } + + } + + private final Object firstLevelDataLock = new Object() { + }; + private volatile int expectedFirstLevelFetchCount; + private final AtomicInteger happenedFirstLevelFetchCount = new AtomicInteger(); + + + private final Map> stateForLevelMap = new ConcurrentHashMap<>(); + + private final Set dispatchedLevels = ConcurrentHashMap.newKeySet(); public ChainedDLStack chainedDLStack = new ChainedDLStack(); - private int deferredFragmentRootFieldsCompleted; + private final AtomicInteger deferredFragmentRootFieldsCompleted = new AtomicInteger(); public CallStack() { } + public StateForLevel get(int level) { + AtomicReference dataPerLevelAtomicReference = stateForLevelMap.computeIfAbsent(level, __ -> new AtomicReference<>(new StateForLevel())); + return Assert.assertNotNull(dataPerLevelAtomicReference.get()); + } + + public boolean tryUpdateLevel(int level, StateForLevel oldData, StateForLevel newData) { + AtomicReference dataPerLevelAtomicReference = Assert.assertNotNull(stateForLevelMap.get(level)); + return dataPerLevelAtomicReference.compareAndSet(oldData, newData); + } + + public void clear() { dispatchedLevels.clear(); - happenedExecuteObjectCallsPerLevel.clear(); + stateForLevelMap.clear(); expectedFirstLevelFetchCount = 0; - happenedFirstLevelFetchCount = 0; - happenedCompletionFinishedCountPerLevel.clear(); - deferredFragmentRootFieldsCompleted = 0; + happenedFirstLevelFetchCount.set(0); + deferredFragmentRootFieldsCompleted.set(0); chainedDLStack.clear(); } } @@ -176,21 +222,20 @@ public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) { @Override public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { Assert.assertTrue(parameters.getExecutionStepInfo().getPath().isRootPath()); - synchronized (initialCallStack) { - initialCallStack.happenedExecuteObjectCallsPerLevel.set(0, 1); - initialCallStack.expectedFirstLevelFetchCount = fieldCount; - } + // no concurrency access happening + CallStack.StateForLevel currentState = initialCallStack.get(0); + initialCallStack.tryUpdateLevel(0, currentState, new CallStack.StateForLevel(0, 1)); + initialCallStack.expectedFirstLevelFetchCount = fieldCount; } @Override public void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); callStack.clear(); - synchronized (callStack) { - callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); - // field count is always 1 for serial execution - initialCallStack.expectedFirstLevelFetchCount = 1; - } + CallStack.StateForLevel currentState = initialCallStack.get(0); + initialCallStack.tryUpdateLevel(0, currentState, new CallStack.StateForLevel(0, 1)); + // field count is always 1 for serial execution + initialCallStack.expectedFirstLevelFetchCount = 1; } @Override @@ -211,8 +256,12 @@ public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrate public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, int fieldCount) { CallStack callStack = getCallStack(parameters); int curLevel = parameters.getPath().getLevel(); - synchronized (callStack) { - callStack.happenedExecuteObjectCallsPerLevel.increment(curLevel, 1); + while (true) { + CallStack.StateForLevel currentState = callStack.get(curLevel); + if (callStack.tryUpdateLevel(curLevel, currentState, currentState.increaseHappenedExecuteObjectCalls())) { + return; + } + } } @@ -232,18 +281,20 @@ public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyPa private void onCompletionFinished(int level, CallStack callStack) { - synchronized (callStack) { - callStack.happenedCompletionFinishedCountPerLevel.increment(level, 1); + while (true) { + CallStack.StateForLevel currentState = callStack.get(level); + if (callStack.tryUpdateLevel(level, currentState, currentState.increaseHappenedCompletionFinishedCount())) { + break; + } } + int currentLevel = level + 2; while (true) { boolean levelReady; - synchronized (callStack) { - if (callStack.dispatchedLevels.contains(currentLevel)) { - break; - } - levelReady = markLevelAsDispatchedIfReady(currentLevel, callStack); + if (callStack.dispatchedLevels.contains(currentLevel)) { + break; } + levelReady = markLevelAsDispatchedIfReady(currentLevel, callStack); if (levelReady) { dispatch(currentLevel, callStack); } else { @@ -263,21 +314,14 @@ public void fieldFetched(ExecutionContext executionContext, Supplier dataFetchingEnvironment) { CallStack callStack = getCallStack(executionStrategyParameters); int level = executionStrategyParameters.getPath().getLevel(); - boolean dispatchNeeded = false; AlternativeCallContext deferredCallContext = executionStrategyParameters.getDeferredCallContext(); if (level == 1 || (deferredCallContext != null && level == deferredCallContext.getStartLevel())) { - synchronized (callStack) { - callStack.happenedFirstLevelFetchCount++; - dispatchNeeded = callStack.expectedFirstLevelFetchCount == callStack.happenedFirstLevelFetchCount; - if (dispatchNeeded) { - callStack.dispatchedLevels.add(level); - } + int happenedFirstLevelFetchCount = callStack.happenedFirstLevelFetchCount.incrementAndGet(); + if (happenedFirstLevelFetchCount == callStack.expectedFirstLevelFetchCount) { + callStack.dispatchedLevels.add(level); + dispatch(level, callStack); } } - if (dispatchNeeded) { - dispatch(level, callStack); - } - } @@ -293,9 +337,12 @@ public void subscriptionEventCompletionDone(AlternativeCallContext alternativeCa CallStack callStack = getCallStack(alternativeCallContext); // this means the single root field is completed (it was never "fetched" because it is // the event payload) and we can mark level 1 (root fields) as dispatched and level 0 as completed - synchronized (callStack) { - callStack.dispatchedLevels.add(1); - callStack.happenedExecuteObjectCallsPerLevel.set(0, 1); + callStack.dispatchedLevels.add(1); + while (true) { + CallStack.StateForLevel currentState = callStack.get(0); + if (callStack.tryUpdateLevel(0, currentState, currentState.increaseHappenedExecuteObjectCalls())) { + break; + } } onCompletionFinished(0, callStack); } @@ -304,13 +351,9 @@ public void subscriptionEventCompletionDone(AlternativeCallContext alternativeCa public void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) { CallStack callStack = getCallStack(parameters); - boolean ready; - synchronized (callStack) { - callStack.deferredFragmentRootFieldsCompleted++; - Assert.assertNotNull(parameters.getDeferredCallContext()); - ready = callStack.deferredFragmentRootFieldsCompleted == parameters.getDeferredCallContext().getFields(); - } - if (ready) { + int deferredFragmentRootFieldsCompleted = callStack.deferredFragmentRootFieldsCompleted.incrementAndGet(); + Assert.assertNotNull(parameters.getDeferredCallContext()); + if (deferredFragmentRootFieldsCompleted == parameters.getDeferredCallContext().getFields()) { onCompletionFinished(parameters.getDeferredCallContext().getStartLevel() - 1, callStack); } @@ -339,11 +382,13 @@ private CallStack getCallStack(@Nullable AlternativeCallContext alternativeCallC if (startLevel > 1) { // parent level is considered dispatched and all fields completed callStack.dispatchedLevels.add(startLevel - 1); - callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 2, 1); - callStack.happenedCompletionFinishedCountPerLevel.set(startLevel - 2, 1); + CallStack.StateForLevel stateForLevel = callStack.get(startLevel - 2); + CallStack.StateForLevel newStateForLevel = stateForLevel.increaseHappenedExecuteObjectCalls().increaseHappenedCompletionFinishedCount(); + callStack.tryUpdateLevel(startLevel - 2, stateForLevel, newStateForLevel); } // the parent will have one completion therefore we set the expectation to 1 - callStack.happenedExecuteObjectCallsPerLevel.set(startLevel - 1, 1); + CallStack.StateForLevel stateForLevel = callStack.get(startLevel - 1); + callStack.tryUpdateLevel(startLevel - 1, stateForLevel, stateForLevel.increaseHappenedExecuteObjectCalls()); // for the current level we set the fetch expectations callStack.expectedFirstLevelFetchCount = fields; @@ -357,7 +402,8 @@ private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { boolean ready = isLevelReady(level, callStack); if (ready) { if (!callStack.dispatchedLevels.add(level)) { - Assert.assertShouldNeverHappen("level " + level + " already dispatched"); + // meaning another thread came before here + return false; } return true; } @@ -368,9 +414,9 @@ private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { private boolean isLevelReady(int level, CallStack callStack) { // we expect that parent has been dispatched and that all parents fields are completed // all parent fields completed means all parent parent on completions finished calls must have happened - int happenedExecuteObjectCalls = callStack.happenedExecuteObjectCallsPerLevel.get(level - 2); + int happenedExecuteObjectCalls = callStack.get(level - 2).happenedExecuteObjectCalls; return callStack.dispatchedLevels.contains(level - 1) && - happenedExecuteObjectCalls > 0 && happenedExecuteObjectCalls == callStack.happenedCompletionFinishedCountPerLevel.get(level - 2); + happenedExecuteObjectCalls > 0 && happenedExecuteObjectCalls == callStack.get(level - 2).happenedCompletionFinishedCount; } From 009fcb8434b9f6ed4d3912fb87cb861566c9429e Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Wed, 24 Sep 2025 22:14:11 +1000 Subject: [PATCH 11/13] cleanup and comment --- .../dataloader/PerLevelDataLoaderDispatchStrategy.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index 2cafbaa09e..a1b32bcd65 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -169,8 +169,6 @@ public StateForLevel increaseHappenedExecuteObjectCalls() { } - private final Object firstLevelDataLock = new Object() { - }; private volatile int expectedFirstLevelFetchCount; private final AtomicInteger happenedFirstLevelFetchCount = new AtomicInteger(); @@ -288,6 +286,10 @@ private void onCompletionFinished(int level, CallStack callStack) { } } + // due to synchronous DataFetcher the completion calls on higher levels + // can happen before the completion calls on lower level + // this means sometimes a lower level completion means multiple levels are ready + // hence this loop here until a level is not ready or already dispatched int currentLevel = level + 2; while (true) { boolean levelReady; From ab06e3032b18bf92f9829f9e0ece870b45196de0 Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 25 Sep 2025 06:54:19 +1000 Subject: [PATCH 12/13] add Thread.onSpinWait to indicate active waiting Add assertion --- .../dataloader/PerLevelDataLoaderDispatchStrategy.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index a1b32bcd65..adedf3c11b 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -97,6 +97,7 @@ public StateForLevel(@Nullable DataLoaderInvocation dataLoaderInvocation, if (currentStateRef.compareAndSet(currentState, newState)) { return currentState; } + Thread.onSpinWait(); } } @@ -124,6 +125,7 @@ public boolean newDataLoaderInvocation(DataLoaderInvocation dataLoaderInvocation if (currentStateRef.compareAndSet(currentState, newState)) { return newDelayedInvocation; } + Thread.onSpinWait(); } } @@ -259,7 +261,7 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa if (callStack.tryUpdateLevel(curLevel, currentState, currentState.increaseHappenedExecuteObjectCalls())) { return; } - + Thread.onSpinWait(); } } @@ -284,6 +286,7 @@ private void onCompletionFinished(int level, CallStack callStack) { if (callStack.tryUpdateLevel(level, currentState, currentState.increaseHappenedCompletionFinishedCount())) { break; } + Thread.onSpinWait(); } // due to synchronous DataFetcher the completion calls on higher levels @@ -345,6 +348,7 @@ public void subscriptionEventCompletionDone(AlternativeCallContext alternativeCa if (callStack.tryUpdateLevel(0, currentState, currentState.increaseHappenedExecuteObjectCalls())) { break; } + Thread.onSpinWait(); } onCompletionFinished(0, callStack); } @@ -404,7 +408,7 @@ private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { boolean ready = isLevelReady(level, callStack); if (ready) { if (!callStack.dispatchedLevels.add(level)) { - // meaning another thread came before here + // meaning another thread came before us, so they will take care of dispatching return false; } return true; @@ -414,6 +418,7 @@ private boolean markLevelAsDispatchedIfReady(int level, CallStack callStack) { private boolean isLevelReady(int level, CallStack callStack) { + Assert.assertTrue(level > 1); // we expect that parent has been dispatched and that all parents fields are completed // all parent fields completed means all parent parent on completions finished calls must have happened int happenedExecuteObjectCalls = callStack.get(level - 2).happenedExecuteObjectCalls; From 2eecad7ba6305d0d7ad02214de5154d9ac55f4ce Mon Sep 17 00:00:00 2001 From: Andreas Marek Date: Thu, 25 Sep 2025 06:59:14 +1000 Subject: [PATCH 13/13] remove Thread.onSpinWait again as not clear this is needed or has maybe negative effects --- .../dataloader/PerLevelDataLoaderDispatchStrategy.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java index adedf3c11b..943bae4ffd 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java @@ -97,7 +97,6 @@ public StateForLevel(@Nullable DataLoaderInvocation dataLoaderInvocation, if (currentStateRef.compareAndSet(currentState, newState)) { return currentState; } - Thread.onSpinWait(); } } @@ -125,7 +124,6 @@ public boolean newDataLoaderInvocation(DataLoaderInvocation dataLoaderInvocation if (currentStateRef.compareAndSet(currentState, newState)) { return newDelayedInvocation; } - Thread.onSpinWait(); } } @@ -261,7 +259,6 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa if (callStack.tryUpdateLevel(curLevel, currentState, currentState.increaseHappenedExecuteObjectCalls())) { return; } - Thread.onSpinWait(); } } @@ -286,7 +283,6 @@ private void onCompletionFinished(int level, CallStack callStack) { if (callStack.tryUpdateLevel(level, currentState, currentState.increaseHappenedCompletionFinishedCount())) { break; } - Thread.onSpinWait(); } // due to synchronous DataFetcher the completion calls on higher levels @@ -348,7 +344,6 @@ public void subscriptionEventCompletionDone(AlternativeCallContext alternativeCa if (callStack.tryUpdateLevel(0, currentState, currentState.increaseHappenedExecuteObjectCalls())) { break; } - Thread.onSpinWait(); } onCompletionFinished(0, callStack); }