Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the License.
*/

package org.apache.wayang.giraph.operators;
Expand All @@ -31,35 +31,37 @@
import org.apache.wayang.giraph.execution.GiraphExecutor;
import org.apache.wayang.giraph.platform.GiraphPlatform;
import org.apache.wayang.java.channels.StreamChannel;
import org.apache.giraph.conf.GiraphConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

/**
* Test For GiraphPageRank
* Test for GiraphPageRankOperator
*/
class GiraphPagaRankOperatorTest {
class GiraphPageRankOperatorTest {

private static GiraphExecutor giraphExecutor;
private GiraphExecutor giraphExecutor;

@BeforeEach
void setUp() {
giraphExecutor = mock(GiraphExecutor.class);
GiraphConfiguration mockConfig = mock(GiraphConfiguration.class);
when(giraphExecutor.getConfiguration()).thenReturn(mockConfig);
doNothing().when(giraphExecutor).execute(any(), any());
}

//TODO Validate the mock of GiraphExecutor
@Disabled
@Test
void testExecution() throws IOException {
// Ensure that the GraphChiPlatform is initialized.
GiraphPlatform.getInstance();
final Configuration configuration = new Configuration();
Giraph.plugin().configure(configuration);

final GiraphPageRankOperator giraphPageRankOperator = new GiraphPageRankOperator(20);

final Job job = mock(Job.class);
Expand All @@ -68,27 +70,33 @@ void testExecution() throws IOException {

final ExecutionOperator outputOperator = mock(ExecutionOperator.class);
when(outputOperator.getNumOutputs()).thenReturn(1);

FileChannel.Instance inputChannelInstance =
(FileChannel.Instance) new FileChannel(FileChannel.HDFS_TSV_DESCRIPTOR)
.createInstance(giraphExecutor, null, -1);
inputChannelInstance.addPath(this.getClass().getResource("/test.edgelist.input").toString());
inputChannelInstance.getLineage().collectAndMark();

final ExecutionOperator inputOperator = mock(ExecutionOperator.class);
when(inputOperator.getNumOutputs()).thenReturn(1);
StreamChannel.Instance outputFileChannelInstance =
StreamChannel.Instance outputChannelInstance =
(StreamChannel.Instance) StreamChannel.DESCRIPTOR
.createChannel(giraphPageRankOperator.getOutput(), configuration)
.createInstance(giraphExecutor, null, -1);

final DefaultOptimizationContext optimizationContext = new DefaultOptimizationContext(job);
final OptimizationContext.OperatorContext operatorContext = optimizationContext.addOneTimeOperator(giraphPageRankOperator);
final OptimizationContext.OperatorContext operatorContext =
optimizationContext.addOneTimeOperator(giraphPageRankOperator);

giraphPageRankOperator.execute(
new ChannelInstance[]{inputChannelInstance},
new ChannelInstance[]{outputFileChannelInstance},
new ChannelInstance[]{outputChannelInstance},
giraphExecutor,
operatorContext
);

// Verify executor interactions
verify(giraphExecutor, times(1)).execute(any(), any());

// Assert output channel creation
assertNotNull(outputChannelInstance);
}
}
Loading