Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch changes to specific fields in MongoDB change stream with Spring Data Mongodb [DATAMONGO-2587] #3439

Closed
spring-projects-issues opened this issue Jul 14, 2020 · 28 comments
Assignees
Labels
in: aggregation-framework Aggregation framework support type: bug A general bug

Comments

@spring-projects-issues
Copy link

Tita opened DATAMONGO-2587 and commented

Hello,

Im working  on an application that implement stream change for mongodb (Version 4.2.8)   , uning springboot mongodb reactive  and reactor kafka  to send all updated events to Kafka. i used an aggregation pipeline  to catch some updates fields on  my collection cdatabase, but unfortunately it does not work  ... here an example of my  filter aggregation .

  

 public static TypedAggregation<ProductStoreStock> getAggregatForUpdatedSpecificFields1() {
		
    	Criteria StkAvailableForSaleCriteria = where("updateDescription.updatedFields.stockAvailableForSale.lastUpdateDate").exists(true)  
                                             .and("updateDescription.updatedFields.stockAvailableForSale.value").exists(true);

	Criteria updateOperationTypeCrit = where(AggregationPipelineConstant.OPERATION_TYPE).is(OperationType.UPDATE.getValue());			
        Criteria criteria = new Criteria().andOperator(StkAvailableForSaleCriteria, updateOperationTypeCrit);
	return Aggregation.newAggregation(ProductStoreStock.class, match(criteria));
	}
 

and my filter is called by this watcher :

public ChangeStreamWithFilterAndProjection<ProductStoreStock> getWatcher() {
	    
		return mongoConfig.getReactiveMongoTemplate().changeStream(ProductStoreStock.class)
                    .watchCollection(CollectionName.PRODUCT_STORE_STOCK.getCollectionName())
                    .filter(AggregationUtils.getAggregatForUpdatedSpecificFields1()); 
	}

I get not change events. So here is the question: how can I define the Criteria to match updates on  updatedField.x.y.z ??

So, I tried to remove my stock filter while keeping only the operation type as 'update' and log of one of these change events looks like:

 

   "updateDescription=UpdateDescription"{
      "removedFields="[

      ],
      "updatedFields="{
         "lastUpdateDate":{
            "$date":"2020-07-13T14:17:04.655Z"
         },
         "stockAvailableForSale.lastUpdateDate":{
            "$date":"2020-07-13T14:17:04.655Z"
         },
         "stockAvailableForSale.value":-5.05,
         "stockAvailableImmediate.lastUpdateDate":{
            "$date":"2020-07-13T14:17:04.655Z"
         },
         "stockAvailableImmediate.value":-5.05
      }
   },

 
I changed my filter using matchElemnt, so that it looks like this  :
 

 public static TypedAggregation<ProductStoreStock> getAggregatForUpdatedSpecificFields1() {
	
		Criteria StkAvailableForSaleCriteria = where("updateDescription")
                         .elemMatch(where("updatedFields").exists(true))
                          .elemMatch(where("stockAvailableForSale.lastUpdateDate").exists(true));
    	
	Criteria updateOperationTypeCrit = where(AggregationPipelineConstant.OPERATION_TYPE).is(OperationType.UPDATE.getValue());			
        Criteria criteria = new Criteria().andOperator(StkAvailableForSaleCriteria, updateOperationTypeCrit);
	return Aggregation.newAggregation(ProductStoreStock.class, match(criteria));
	}
 

Result ->  No update event is catched !! 
Anyone can help me ?

Thanks in advance.

 


Attachments:

@spring-projects-issues
Copy link
Author

Tita commented

I use the latest version of spring boot as well as mongodb reactive.

attached the pom.xml file

[^pom.xml]

@spring-projects-issues
Copy link
Author

Tita commented

when I'm in debug mode ,  exactly on  org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext.doPrefix(Document)  method , i have the following result :

"Document"{
{
      "$and="[
         "Document"         {
                     {           "updateDescription.updatedFields.stockAvailableForSale.lastUpdateDate=Document" {
                       {
                     "$exists=true"
                  }
               }
            }
         },
         "Document"         {
{
               "operationType=update"
            }
         }
      ]
   }
}
 ```
 

There is a problem on my criteria filter ?

Thanks on advance

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

Do you have code (maybe a failing test reproducing the issue) that we can run? This would help us delimit the cause.
We'd really appreciate if you could use code block markup, instead of images, for source snippets. Thank you!

@spring-projects-issues
Copy link
Author

Tita commented

I did a faster test which gives an overall idea on an update of a collection and it is not a real collection which i have in project .

  • the test collection example is : stockAvailableForSale.java
 package com.xxxx.xxxxxxx;

import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document
public class stockAvailableForSale
{

    @Id
    private ObjectId id;
    private final String x;
    private final String y;

    public stockAvailableForSale(String x, String y)
    {
        this.x = x;
        this.y = y;
    }

    public ObjectId getId()
    {
        return id;
    }

    public void setId(ObjectId id)
    {
        this.id = id;
    }

    public String getX()
    {
        return x;
    }

    public String getY()
    {
        return y;
    }

}
 
  • and my test service is :
 package com.xxxx.xxxxx;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
public class StockAvailableForSaleTestService
{

    @Autowired
    private final StockAvailableForSaleRepository repo;

    @Autowired
    private final ReactiveMongoTemplate mongoTemplate;

    public StockAvailableForSaleTestService(StockAvailableForSaleRepository repo, ReactiveMongoTemplate mongoTemplate) {
        this.repo = repo;
        this.mongoTemplate = mongoTemplate;
    }

    public void init()
    {
        stockAvailableForSale stkAvailable = new stockAvailableForSale("test1", "test2");
        repo.insert(stkAvailable).subscribe();
        stkAvailable = new stockAvailableForSale("test3", "test4");
        repo.insert(stkAvailable).subscribe();
        stkAvailable = new stockAvailableForSale("test5", "test6");
        repo.insert(stkAvailable).subscribe();
        repo.findAll().flatMap(stockAvailableForSale -> this.updateObject(stockAvailableForSale)).subscribe();
    }

    private Mono<stockAvailableForSale> updateObject(stockAvailableForSale stockAvailableForSale)
    {
        stockAvailableForSale newStkAvailable = new stockAvailableForSale(stockAvailableForSale.getX(), "new-Test");
        newStkAvailable.setId(stockAvailableForSale.getId());
        mongoTemplate.updateFirst(
            new Query().addCriteria(Criteria.where("_id").is(stockAvailableForSale.getId())),
            new Update().set("toto", "toto"),
            stockAvailableForSale.class
        ).subscribe();
        mongoTemplate.updateFirst(
            new Query().addCriteria(Criteria.where("_id").is(stockAvailableForSale.getId())),
            new Update().set("titi", "titi"),
            stockAvailableForSale.class
        ).subscribe();
        return repo.save(newStkAvailable);

    }

}
  • and this must creates an update in the change stream .. but unfortunately  this is not the case ..

@spring-projects-issues
Copy link
Author

Tita commented

Is that enough for you the test I sent you?

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

Thanks for the details!
I think using the dot notation to filter Documents in the ChangeStream (like in the snippet below) can solve the problem.

@Test
void changeStreamFilterOnUpdateDescriptionUpdatedFields() throws InterruptedException {
	BlockingQueue<ChangeStreamEvent<StockAvailableForSale>> documents = new LinkedBlockingQueue<>(100);
	ChangeStreamOptions options = ChangeStreamOptions.builder()
			.fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
			.filter(
				newAggregation(match(where("operationType").is("update").and("updateDescription.updatedFields.data").exists(true)))
			)
			.build();
	Disposable disposable = template.changeStream(options, StockAvailableForSale.class) //
			.doOnNext(documents::add).subscribe();
	Thread.sleep(500); // just give it some time to link to the collection.
	StockAvailableForSale stkAvailable1 = new StockAvailableForSale("test1", "test2");
	StockAvailableForSale stkAvailable2 = new StockAvailableForSale("test3", "test4");
	StockAvailableForSale stkAvailable3 = new StockAvailableForSale("test5", "test6");
	// none of those show up due to the operation type filter
	Flux.merge(template.save(stkAvailable1), template.save(stkAvailable2).delayElement(Duration.ofMillis(50)),
			template.save(stkAvailable3).delayElement(Duration.ofMillis(100))) //
			.as(StepVerifier::create) //
			.expectNextCount(3) //
			.verifyComplete();
	// won't show up cause of 'updateDescription.updatedFields.data : $exists'
	template.update(StockAvailableForSale.class)
			.matching(where("_id").is(stkAvailable1.getId()))
			.apply(new Update().set("spring", "rocks!"))
			.first()
			.then().as(StepVerifier::create).verifyComplete();
	// that one will match
	template.update(StockAvailableForSale.class)
			.matching(where("_id").is(stkAvailable2.getId()))
			.apply(new Update().set("data", "MongoDB"))
			.first()
			.then().as(StepVerifier::create).verifyComplete();
	Thread.sleep(500);
	disposable.dispose();
	assertThat(documents).hasSize(1);
	assertThat(documents.take().getBody().getId()).isEqualTo(stkAvailable2.getId());
}

@spring-projects-issues
Copy link
Author

Tita commented

Thanks for your return, but ithink that the probem on my code that i don't use "UPDATE_LOOKUP" for my fullDocumentLookup.
Tachnically, i can't set this to my watcher using a different method than you .

public ChangeStreamWithFilterAndProjection<ProductStoreStock> getWatcher() {
	    
		return mongoConfig.getReactiveMongoTemplate().changeStream(ProductStoreStock.class)
                    .watchCollection(CollectionName.PRODUCT_STORE_STOCK.getCollectionName())
                    .filter(AggregationUtils.getAggregatForUpdatedSpecificFields1()); 
	}

Please, Could you tell me how to add the lookup in my watcher ??
N.B : I used what you just told me, but still no event is returned

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

that would be

reactiveMongoTemplate.changeStream(DomainType.class)
		.withOptions(ChangeStreamOptionsBuilder::returnFullDocumentOnUpdate)
		...

@spring-projects-issues
Copy link
Author

Tita commented

unfortunately even if i add the bellow option 

reactiveMongoTemplate.changeStream(DomainType.class).withOptions(ChangeStreamOptionsBuilder::returnFullDocumentOnUpdate)

it doesn't work !!

@spring-projects-issues
Copy link
Author

Tita commented

Hello Christoph Strobl,

When i use only  updateDescription.updatedFields prefix, i get all my changes evcent,  :

public static TypedAggregation<ProductStoreStock> getAggregatForUpdatedSpecificFields3() {
    	    	
		Criteria criteria = where(AggregationPipelineConstant.OPERATION_TYPE).in(OperationType.UPDATE.getValue()).and("updateDescription.updatedFields").exists(true);	
		return Aggregation.newAggregation(ProductStoreStock.class, match(criteria));
		
	}

 

and the output change stream documents is like this :

"ChangeStreamDocument"{
   "operationType=OperationType"{
      "value="      "update"
   },
   "resumeToken="{
      "_data":"825F1058A7000000022B022C0100x54424324354354343234SJDF85035B2F46409E439DD1A3E0C87246465F02E016FBE64000004"
   },
   "namespace=give.productStoreStock",
   "destinationNamespace=null",
   "fullDocument=Document"{
{
         "_id=Document"{
{
               storeID=899,
               productID=12050226
            }
         },
         lastUpdateDate=Thu Jul 16 15:39:51 CEST 2020,
         "mvtCashOut=Document"{
{
               lastUpdateDate=Thu Jul 16 15:39:51 CEST 2020,
               "movements="[
                  "Document"                  {
{
                        value=-1.01,
                        lastAccessDateSource=Wed Jul 15 11:00:00 CEST 2020
                     }
                  }
               ]
            }
         },
         "stockAvailableForSale=Document"{
{
               lastUpdateDate=Thu Jul 16 15:39:51 CEST 2020,
               value=-1.01
            }
         },
         "stockAvailableImmediate=Document"{
{
               lastUpdateDate=Thu Jul 16 15:39:51 CEST 2020,
               value=-1.01
            }
         },
         "stockPhysicalInSaleArea=Document"{
{
               lastUpdateDate=Thu Jul 16 15:39:51 CEST 2020,
               value=-1.01
            }
         }
      }
   },
   "documentKey="{
      "_id":{
         "storeID":899,
         "productID":12050226
      }
   },
   "clusterTime=Timestamp"{
      value=6850072507513307138,
      seconds=1594906791,
      inc=2
   },
   "updateDescription=UpdateDescription"{
      "removedFields="[

      ],
      "updatedFields="{
         "lastUpdateDate":{
            "$date":"2020-07-16T13:39:51.407Z"
         },
         "stockAvailableForSale.lastUpdateDate":{
            "$date":"2020-07-16T13:39:51.407Z"
         },
         "stockAvailableForSale.value":-1.01,
         "stockAvailableImmediate.lastUpdateDate":{
            "$date":"2020-07-16T13:39:51.407Z"
         },
         "stockAvailableImmediate.value":-1.01,
         "stockPhysicalInSaleArea.lastUpdateDate":{
            "$date":"2020-07-16T13:39:51.407Z"
         },
         "stockPhysicalInSaleArea.value":-1.01
      }
   },
   "txnNumber=null",
   "lsid=null"
}

 

I think using the dot notation to filter Documents in the ChangeStream  doesn't work when using  updateDescription.updatedFields.data  ! ??

Thnaks,

 

@spring-projects-issues
Copy link
Author

Tita commented

with the following criteria :

 {code:json borderStyle=solid}
public static TypedAggregation<ProductStoreStock> getAggregatForUpdatedSpecificFields3() {

	Criteria criteria = where(AggregationPipelineConstant.OPERATION_TYPE).is(OperationType.UPDATE.getValue())
								.and("updateDescription.updatedFields").exists(true)
								.and("stockAvailableForSale.lastUpdateDate").exists(true);	
	return Aggregation.newAggregation(ProductStoreStock.class, match(criteria));
	
}

It does not work at all, the criteria recover  all updated documents but not modified documents  with field which I modified data

@spring-projects-issues
Copy link
Author

Tita commented

Hello Christoph Strobl

with this criteria :

 {code:json borderStyle=solid}
public static TypedAggregation<ProductStoreStock> getAggregatForUpdatedSpecificFields3() {

	Criteria criteria = where(AggregationPipelineConstant.OPERATION_TYPE).is(OperationType.UPDATE.getValue())
								.and("updateDescription.updatedFields").exists(true)
								.and("stockAvailableForSale.lastUpdateDate").exists(true);	
	return Aggregation.newAggregation(ProductStoreStock.class, match(criteria));
	
}

When I debugged. I found out that, when we pass anything other than the 5 keys mentioned above, then the code prepends *fullDocument*. to the key. So when I passed *updateDescription.updatedFields.stockAvailableForSale.lastUpdateDate*, it was converted to *fullDocument.stockAvailableForSale.lastUpdateDate* which is not present and hence it fails.

 I believe the problem is this function in the code
  
```java 
PrefixingDelegatingAggregationOperationContext::prefixKey

!error.png|thumbnail!
Seeing the attached pom.xml I use the latest version of the spring boot 2.3.1.RELEASE and mongodb-driver-core-4.0.4 ..

Thanks on advance for your help

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

In case you want to match stockAvailableForSale.lastUpdateDate against the update document instead of the fullDocument, use the updateDescription.updatedFields prefix updateDescription.updatedFields.stockAvailableForSale.lastUpdateDate.
and(...) does not nest paths!

In case the trouble continues, please provide a minimal sample that reproduces the issue (a repo on Github if possible). I do not see this going anywhere without running the code

@spring-projects-issues
Copy link
Author

Tita commented

Hello Christoph Strobl

Please find attached publisher repository.

https://github.com/youter59/publisher

Thank you in advance for your help

@spring-projects-issues
Copy link
Author

Tita commented

Hello Christoph Strobl,
Any news about this problem ? Are you still on it ?

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

Please be patient, this is not the only issue/project we're working on.
Thanks for understanding

@spring-projects-issues
Copy link
Author

Tita commented

Okay !!
I am really sorry ..

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

Based on the sample you provided I created a simple reproducer. Removing extends Document from Stock allowed the test to pass.

@spring-projects-issues
Copy link
Author

Tita commented

Hello Christoph Strobl,
Thanks for your return, but it doesn't work for me ..

I don't understand why on the unit test, it works very well, but in my code, it does't !!
Namely, it was the same same watcher with the same aggregat pipeline.

the problem is still there ..

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

Sorry to hear. Maybe I'm missing something - please use the reproducer test as a starting point and modify it to match your case. We might be able to figure it out when comparing the two then

@spring-projects-issues
Copy link
Author

Tita commented

Do you find a problem in my implementation code?
I think your test matches to my logic well (same watcher and aggregate)

@spring-projects-issues
Copy link
Author

Christoph Strobl commented

As I said, I removed extends Document from the Stock class cause the mapping layer will store an empty document for this type because org.bson.Document is a MongoDB native type.

@spring-projects-issues
Copy link
Author

Tita commented

Yes I saw your remark about that and I removed it. Even that the problem persists !!

@spring-projects-issues spring-projects-issues added status: waiting-for-feedback We need additional information before we can continue type: bug A general bug in: aggregation-framework Aggregation framework support labels Dec 30, 2020
@spring-projects-issues
Copy link
Author

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

@spring-projects-issues spring-projects-issues added the status: feedback-reminder We've sent a reminder that we need additional information before we can continue label Jan 6, 2021
@spring-projects-issues
Copy link
Author

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.

@spring-projects-issues spring-projects-issues removed status: waiting-for-feedback We need additional information before we can continue status: feedback-reminder We've sent a reminder that we need additional information before we can continue labels Jan 13, 2021
@robertolosanno-e2x
Copy link

robertolosanno-e2x commented Oct 23, 2024

It worked for me using this option

AggregationOperation matchOperation = new MatchOperation(
                Criteria.where("updateDescription.updatedFields").exists(true)
                        .andOperator(
                                Criteria.where("updateDescription.updatedFields.lastUpdateDate").exists(true)
                        )
        );

@bboze
Copy link

bboze commented Oct 29, 2024

Hi, Looks that filter can not recognize cases when update is made on subelement, e.g. update of stockAvailableForSale.value directly instead of update of whole stockAvailableForSale filed.

If using provided reproducer we make change on line 45 using this code:

.apply(new Update().set("stockAvailableForSale.value", 100D))

test will not pass because filter will not match
UpdateDescription{updatedFields={"stockAvailableForSale.value": 100.0}}

Only if whole stockAvailableForSale object/field is updated filter can match. And in that case it does not matter if we filter whole stockAvailableForSale or stockAvailableForSale.value, both will match.
UpdateDescription{updatedFields={"stockAvailableForSale": {"value": 100.0, "lastUpdateDate": {"$date": "2024-10-29T11:34:52.013Z"}}}}

This behavior does not help in my case because I need to monitor changes only of specific subelements.

@bboze
Copy link

bboze commented Nov 19, 2024

Worked for me with $getField

new Criteria().orOperator(
	where("updateDescription.updatedFields.fieldA").exists(true),
	expr(MongoExpression.create("{ $getField: {field: \"fieldB.subfield1\", input: \"$updateDescription.updatedFields\"}}"))
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: aggregation-framework Aggregation framework support type: bug A general bug
Projects
None yet
Development

No branches or pull requests

4 participants