Skip to content

[realppl 7] realppl integration with remote/local and unit tests #14853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: wuandy/RealPpl_6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions Firestore/Example/Firestore.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Firestore/Example/Tests/API/FIRQuerySnapshotTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#import "Firestore/Source/API/FIRQuerySnapshot+Internal.h"
#import "Firestore/Source/API/FIRSnapshotMetadata+Internal.h"

#include "Firestore/core/src/core/pipeline_util.h"
#include "Firestore/core/src/core/query.h"
#include "Firestore/core/src/core/view_snapshot.h"
#include "Firestore/core/src/model/document.h"
Expand Down Expand Up @@ -101,7 +102,8 @@ - (void)testIncludeMetadataChanges {

std::shared_ptr<Firestore> firestore = FSTTestFirestore().wrapped;
core::Query query = Query("foo");
ViewSnapshot viewSnapshot(query, newDocuments, oldDocuments, std::move(documentChanges),
ViewSnapshot viewSnapshot(core::QueryOrPipeline(query), newDocuments, oldDocuments,
std::move(documentChanges),
/*mutated_keys=*/DocumentKeySet(),
/*from_cache=*/false,
/*sync_state_changed=*/true,
Expand Down
3 changes: 2 additions & 1 deletion Firestore/Example/Tests/API/FSTAPIHelpers.mm
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#import "Firestore/Source/API/FIRSnapshotMetadata+Internal.h"
#import "Firestore/Source/API/FSTUserDataReader.h"

#include "Firestore/core/src/core/pipeline_util.h"
#include "Firestore/core/src/core/view_snapshot.h"
#include "Firestore/core/src/model/document.h"
#include "Firestore/core/src/model/document_set.h"
Expand Down Expand Up @@ -148,7 +149,7 @@
}
newDocuments = newDocuments.insert(doc);
}
ViewSnapshot viewSnapshot{Query(path),
ViewSnapshot viewSnapshot{core::QueryOrPipeline(Query(path)),
newDocuments,
oldDocuments,
std::move(documentChanges),
Expand Down
2 changes: 1 addition & 1 deletion Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ bool IsOpen() const override {
}

void WatchQuery(const TargetData& query) override {
LOG_DEBUG("WatchQuery: %s: %s, %s", query.target_id(), query.target().ToString(),
LOG_DEBUG("WatchQuery: %s: %s, %s", query.target_id(), query.target_or_pipeline().ToString(),
query.resume_token().ToString());

// Snapshot version is ignored on the wire
Expand Down
62 changes: 31 additions & 31 deletions Firestore/Example/Tests/SpecTests/FSTSpecTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -842,36 +842,36 @@ - (void)validateExpectedState:(nullable NSDictionary *)expectedState {
}
if (expectedState[@"activeTargets"]) {
__block ActiveTargetMap expectedActiveTargets;
[expectedState[@"activeTargets"]
enumerateKeysAndObjectsUsingBlock:^(NSString *targetIDString, NSDictionary *queryData,
BOOL *) {
TargetId targetID = [targetIDString intValue];
NSArray *queriesJson = queryData[@"queries"];
std::vector<TargetData> queries;
for (id queryJson in queriesJson) {
Query query = [self parseQuery:queryJson];

QueryPurpose purpose = QueryPurpose::Listen;
if ([queryData objectForKey:@"targetPurpose"] != nil) {
purpose = [self parseQueryPurpose:queryData[@"targetPurpose"]];
}

TargetData target_data(query.ToTarget(), targetID, 0, purpose);
if ([queryData objectForKey:@"resumeToken"] != nil) {
target_data = target_data.WithResumeToken(
MakeResumeToken(queryData[@"resumeToken"]), SnapshotVersion::None());
} else {
target_data = target_data.WithResumeToken(
ByteString(), [self parseVersion:queryData[@"readTime"]]);
}

if ([queryData objectForKey:@"expectedCount"] != nil) {
target_data = target_data.WithExpectedCount([queryData[@"expectedCount"] intValue]);
}
queries.push_back(std::move(target_data));
}
expectedActiveTargets[targetID] = std::move(queries);
}];
[expectedState[@"activeTargets"] enumerateKeysAndObjectsUsingBlock:^(NSString *targetIDString,
NSDictionary *queryData,
BOOL *) {
TargetId targetID = [targetIDString intValue];
NSArray *queriesJson = queryData[@"queries"];
std::vector<TargetData> queries;
for (id queryJson in queriesJson) {
Query query = [self parseQuery:queryJson];

QueryPurpose purpose = QueryPurpose::Listen;
if ([queryData objectForKey:@"targetPurpose"] != nil) {
purpose = [self parseQueryPurpose:queryData[@"targetPurpose"]];
}

TargetData target_data(core::TargetOrPipeline(query.ToTarget()), targetID, 0, purpose);
if ([queryData objectForKey:@"resumeToken"] != nil) {
target_data = target_data.WithResumeToken(MakeResumeToken(queryData[@"resumeToken"]),
SnapshotVersion::None());
} else {
target_data = target_data.WithResumeToken(ByteString(),
[self parseVersion:queryData[@"readTime"]]);
}

if ([queryData objectForKey:@"expectedCount"] != nil) {
target_data = target_data.WithExpectedCount([queryData[@"expectedCount"] intValue]);
}
queries.push_back(std::move(target_data));
}
expectedActiveTargets[targetID] = std::move(queries);
}];
[self.driver setExpectedActiveTargets:std::move(expectedActiveTargets)];
}
}
Expand Down Expand Up @@ -982,7 +982,7 @@ - (void)validateActiveTargets {
const TargetData &actual = found->second;

XCTAssertEqual(actual.purpose(), targetData.purpose());
XCTAssertEqual(actual.target(), targetData.target());
XCTAssertEqual(actual.target_or_pipeline(), targetData.target_or_pipeline());
XCTAssertEqual(actual.target_id(), targetData.target_id());
XCTAssertEqual(actual.snapshot_version(), targetData.snapshot_version());
XCTAssertEqual(actual.resume_token(), targetData.resume_token());
Expand Down
25 changes: 13 additions & 12 deletions Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm
Original file line number Diff line number Diff line change
Expand Up @@ -478,18 +478,19 @@ - (FSTOutstandingWrite *)receiveWriteError:(int)errorCode

- (TargetId)addUserListenerWithQuery:(Query)query options:(ListenOptions)options {
// TODO(dimond): Change spec tests to verify isFromCache on snapshots
auto listener = QueryListener::Create(
query, options, [self, query](const StatusOr<ViewSnapshot> &maybe_snapshot) {
FSTQueryEvent *event = [[FSTQueryEvent alloc] init];
event.query = query;
if (maybe_snapshot.ok()) {
[event setViewSnapshot:maybe_snapshot.ValueOrDie()];
} else {
event.error = MakeNSError(maybe_snapshot.status());
}

[self.events addObject:event];
});
auto listener =
QueryListener::Create(core::QueryOrPipeline(query), options,
[self, query](const StatusOr<ViewSnapshot> &maybe_snapshot) {
FSTQueryEvent *event = [[FSTQueryEvent alloc] init];
event.query = query;
if (maybe_snapshot.ok()) {
[event setViewSnapshot:maybe_snapshot.ValueOrDie()];
} else {
event.error = MakeNSError(maybe_snapshot.status());
}

[self.events addObject:event];
});
_queryListeners[query] = listener;
TargetId targetID;
_workerQueue->EnqueueBlocking([&] { targetID = _eventManager->AddQueryListener(listener); });
Expand Down
5 changes: 3 additions & 2 deletions Firestore/Source/API/FIRQuery.mm
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "Firestore/core/src/core/firestore_client.h"
#include "Firestore/core/src/core/listen_options.h"
#include "Firestore/core/src/core/order_by.h"
#include "Firestore/core/src/core/pipeline_util.h"
#include "Firestore/core/src/core/query.h"
#include "Firestore/core/src/model/document_key.h"
#include "Firestore/core/src/model/field_path.h"
Expand Down Expand Up @@ -228,8 +229,8 @@ - (void)getDocumentsWithSource:(FIRFirestoreSource)publicSource
auto async_listener = AsyncEventListener<ViewSnapshot>::Create(
firestore->client()->user_executor(), std::move(view_listener));

std::shared_ptr<QueryListener> query_listener =
firestore->client()->ListenToQuery(query, internalOptions, async_listener);
std::shared_ptr<QueryListener> query_listener = firestore->client()->ListenToQuery(
core::QueryOrPipeline(query), internalOptions, async_listener);

return [[FSTListenerRegistration alloc]
initWithRegistration:absl::make_unique<QueryListenerRegistration>(firestore->client(),
Expand Down
1 change: 1 addition & 0 deletions Firestore/core/src/api/api_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Firestore;
class ListenerRegistration;
class Pipeline;
class PipelineSnapshot;
class RealtimePipeline;
class Query;
class QuerySnapshot;
class Settings;
Expand Down
4 changes: 2 additions & 2 deletions Firestore/core/src/api/document_reference.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ std::unique_ptr<ListenerRegistration> DocumentReference::AddSnapshotListener(

core::Query query(key_.path());
std::shared_ptr<QueryListener> query_listener =
firestore_->client()->ListenToQuery(std::move(query), options,
async_listener);
firestore_->client()->ListenToQuery(
core::QueryOrPipeline(std::move(query)), options, async_listener);

return absl::make_unique<QueryListenerRegistration>(
firestore_->client(), std::move(async_listener),
Expand Down
4 changes: 4 additions & 0 deletions Firestore/core/src/api/expressions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ google_firestore_v1_Value Constant::to_proto() const {
return *model::DeepClone(*value_).release();
}

const google_firestore_v1_Value& Constant::value() const {
return *value_;
}

std::unique_ptr<core::EvaluableExpr> Constant::ToEvaluable() const {
return std::make_unique<core::CoreConstant>(
std::make_unique<Constant>(*this));
Expand Down
2 changes: 2 additions & 0 deletions Firestore/core/src/api/expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class Constant : public Expr {
}
google_firestore_v1_Value to_proto() const override;

const google_firestore_v1_Value& value() const;

std::unique_ptr<core::EvaluableExpr> ToEvaluable() const override;

private:
Expand Down
8 changes: 8 additions & 0 deletions Firestore/core/src/api/ordering.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ class Ordering {
return expr_.get();
}

const std::shared_ptr<Expr> expr_shared() const {
return expr_;
}

Direction direction() const {
return direction_;
}

Ordering WithReversedDirection() const {
return Ordering(expr_, direction_ == ASCENDING ? DESCENDING : ASCENDING);
}

google_firestore_v1_Value to_proto() const;

private:
Expand Down
4 changes: 2 additions & 2 deletions Firestore/core/src/api/query_core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ std::unique_ptr<ListenerRegistration> Query::AddSnapshotListener(
firestore_->client()->user_executor(), std::move(view_listener));

std::shared_ptr<QueryListener> query_listener =
firestore_->client()->ListenToQuery(this->query(), options,
async_listener);
firestore_->client()->ListenToQuery(core::QueryOrPipeline(this->query()),
options, async_listener);

return absl::make_unique<QueryListenerRegistration>(
firestore_->client(), std::move(async_listener),
Expand Down
3 changes: 2 additions & 1 deletion Firestore/core/src/api/query_snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ void QuerySnapshot::ForEachChange(
// Special case the first snapshot because index calculation is easy and
// fast. Also all changes on the first snapshot are adds so there are also
// no metadata-only changes to filter out.
DocumentComparator doc_comparator = snapshot_.query().Comparator();
DocumentComparator doc_comparator =
snapshot_.query_or_pipeline().Comparator();
absl::optional<Document> last_document;
size_t index = 0;
for (const DocumentViewChange& change : snapshot_.document_changes()) {
Expand Down
35 changes: 25 additions & 10 deletions Firestore/core/src/api/realtime_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <utility>

#include "Firestore/core/src/core/pipeline_util.h"
#include "Firestore/core/src/remote/serializer.h"

namespace firebase {
Expand All @@ -27,16 +28,35 @@ namespace api {

RealtimePipeline::RealtimePipeline(
std::vector<std::shared_ptr<EvaluableStage>> stages,
remote::Serializer serializer)
: stages_(std::move(stages)), serializer_(serializer) {
std::unique_ptr<remote::Serializer> serializer)
: stages_(std::move(stages)), serializer_(std::move(serializer)) {
this->rewritten_stages_ = core::RewriteStages(this->stages());
}

RealtimePipeline::RealtimePipeline(const RealtimePipeline& other)
: stages_(other.stages_),
rewritten_stages_(other.rewritten_stages_),
serializer_(std::make_unique<remote::Serializer>(
other.serializer_->database_id())) {
}

RealtimePipeline& RealtimePipeline::operator=(const RealtimePipeline& other) {
if (this != &other) {
stages_ = other.stages_;
rewritten_stages_ = other.rewritten_stages_;
serializer_ =
std::make_unique<remote::Serializer>(other.serializer_->database_id());
}
return *this;
}

RealtimePipeline RealtimePipeline::AddingStage(
std::shared_ptr<EvaluableStage> stage) {
auto copy = std::vector<std::shared_ptr<EvaluableStage>>(this->stages_);
copy.push_back(stage);

return {copy, serializer_};
return {copy,
std::make_unique<remote::Serializer>(serializer_->database_id())};
}

const std::vector<std::shared_ptr<EvaluableStage>>& RealtimePipeline::stages()
Expand All @@ -49,13 +69,8 @@ RealtimePipeline::rewritten_stages() const {
return this->rewritten_stages_;
}

void RealtimePipeline::SetRewrittentStages(
std::vector<std::shared_ptr<EvaluableStage>> stages) {
this->rewritten_stages_ = std::move(stages);
}

EvaluateContext RealtimePipeline::evaluate_context() {
return EvaluateContext(&serializer_);
EvaluateContext RealtimePipeline::evaluate_context() const {
return EvaluateContext(serializer_.get());
}

} // namespace api
Expand Down
19 changes: 10 additions & 9 deletions Firestore/core/src/api/realtime_pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,37 @@
#define FIRESTORE_CORE_SRC_API_REALTIME_PIPELINE_H_

#include <memory>
#include <utility>
#include <vector>

#include "Firestore/core/src/api/firestore.h"
#include "Firestore/core/src/api/pipeline_snapshot.h"
#include "Firestore/core/src/api/stages.h"
#include "Firestore/core/src/remote/serializer.h"

namespace firebase {
namespace firestore {
namespace remote {
class Serializer;
} // namespace remote

namespace api {

class RealtimePipeline {
public:
RealtimePipeline(std::vector<std::shared_ptr<EvaluableStage>> stages,
remote::Serializer serializer);
std::unique_ptr<remote::Serializer> serializer);

RealtimePipeline(const RealtimePipeline& other);
RealtimePipeline& operator=(const RealtimePipeline& other);

RealtimePipeline AddingStage(std::shared_ptr<EvaluableStage> stage);

const std::vector<std::shared_ptr<EvaluableStage>>& stages() const;
const std::vector<std::shared_ptr<EvaluableStage>>& rewritten_stages() const;

void SetRewrittentStages(std::vector<std::shared_ptr<EvaluableStage>>);

EvaluateContext evaluate_context();
EvaluateContext evaluate_context() const;

private:
std::vector<std::shared_ptr<EvaluableStage>> stages_;
std::vector<std::shared_ptr<EvaluableStage>> rewritten_stages_;
remote::Serializer serializer_;
std::unique_ptr<remote::Serializer> serializer_;
};

} // namespace api
Expand Down
Loading
Loading