diff --git a/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala b/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala index 479b67524..2e06204ba 100644 --- a/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala +++ b/src/scala/com/twitter/interaction_graph/scio/agg_negative/InteractionGraphNegativeJob.scala @@ -49,17 +49,30 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO val endTs = opts.interval.getEndMillis // read input datasets + + // We only count blocks in the past 180 days to make sure we don't fall + // into any long-term cancellation effects in a tweeter. + // As an example, some accounts may have higher count of blocks because + // what they said is controversial in one period of time but it might be + // not in other (for example, @RWMaloneMD or others) val blocks: SCollection[InteractionGraphRawInput] = GraphUtil.getFlockFeatures( readSnapshot(FlockBlocksEdgesScalaDataset, sc), FeatureName.NumBlocks, endTs) + .filter(_.age < 180) + // We only count mutes in the past 180 days to make sure we don't fall + // into any long-term cancellation effects in a tweeter. + // As an example, some accounts may have higher count of blocks because + // what they said is controversial in one period of time but it might be + // not in other (for example, @RWMaloneMD or others) val mutes: SCollection[InteractionGraphRawInput] = GraphUtil.getFlockFeatures( readSnapshot(FlockMutesEdgesScalaDataset, sc), FeatureName.NumMutes, endTs) + .filter(_.age < 180) val abuseReports: SCollection[InteractionGraphRawInput] = GraphUtil.getFlockFeatures(